我的订阅者看起来如下:
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import DeadLetterPolicy
dead_letter_policy = DeadLetterPolicy(
dead_letter_topic='dead_letter_topic',
max_delivery_attempts=5,
)
topic_path = subscriber.topic_path(PROJECT, TOPIC)
subscriber.create_subscription(sub_path, topic_path, dead_letter_policy=dead_letter_policy)
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(PROJECT, SUBSCRIPTION)
def callback(message):
print("Received message: {}".format(message))
print('Attempted:', message.delivery_attempt, 'times')
data = message.data.decode('utf-8')
data_d = json.loads(data)
if data_d["name"] == "some_file.json":
message.nack()
else:
message.ack()
收到的消息如下所示:
Received message: Message {
data: b'{\n "kind": "storage#object",\n "id": "...'
attributes: {
"bucketId": "sample_bucket",
...
}
}
Attempted: 12 times
显然,它尝试了5次以上,为什么我仍然可以从PubSub主题中提取这条消息?以下是订阅信息:
ackDeadlineSeconds: 10
deadLetterPolicy:
deadLetterTopic: projects/someproject/topics/dead_letter
maxDeliveryAttempts: 5
expirationPolicy:
ttl: 2678400s
messageRetentionDuration: 604800s
name: projects/someproject/subscriptions/new_sub
pushConfig: {}
topic: projects/someproject/topics/pubsub_sample
发布于 2020-02-07 18:00:52
通常,如果您没有授予Pub/Sub发布到死信主题或订阅订阅的权限,就会发生这种情况。您需要确保已经运行了以下操作:
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud pubsub topics add-iam-policy-binding <dead letter topic> \
--member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
--role='roles/pubsub.publisher'
gcloud pubsub subscriptions add-iam-policy-binding <subscription with dead letter queue> \
--member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
--role='roles/pubsub.subscriber'
如果写入死信队列主题失败,则Cloud /Sub将继续将消息传递给您的订阅者。
https://stackoverflow.com/questions/60122902
复制相似问题