Google PubSub resent messages aren#39;t being processed(Google PubSub重新发送的消息未被处理)
本文介绍了Google PubSub重新发送的消息未被处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我使用了Google PubSub文档中的订阅者示例 我所做的唯一修改是注释掉了对消息的确认。订阅者不再向队列中添加消息,而应根据Google云控制台中设置的间隔重新发送消息。
为什么会发生这种情况,还是我遗漏了什么?
public class SubscriberExample {
use the default project id
private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();
static class MessageReceiverExample implements MessageReceiver {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
messages.offer(message);
//consumer.ack();
}
}
/** Receive messages over a subscription. */
public static void main(String[] args) throws Exception {
// set subscriber id, eg. my-sub
String subscriptionId = args[0];
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(
PROJECT_ID, subscriptionId);
Subscriber subscriber = null;
try {
// create a subscriber bound to the asynchronous message receiver
subscriber = Subscriber.newBuilder(subscriptionName, new MessageReceiverExample()).build();
subscriber.startAsync().awaitRunning();
// Continue to listen to messages
while (true) {
PubsubMessage message = messages.take();
System.out.println("Message Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
}
} finally {
if (subscriber != null) {
subscriber.stopAsync();
}
}
}
}
JAVA
当您不确认消息时,推荐答案客户端库将对该消息调用modifyAckDeadline,直到maxAckExtensionPeriod传递。默认情况下,此值为一小时。因此,如果您不对消息进行确认/取消确认或更改此值,则消息很可能在一小时内不会重新传递。如果要更改最大确认扩展期限,请在构建器上进行设置:
subscriber = Subscriber.newBuilder(subscriptionName, new MessageReceiverExample())
.setMaxAckExtensionPeriod(Duration.ofSeconds(60))
.build();
还值得注意的是,如果不对消息进行ACK或NACK,则flow control可能会阻止更多消息的传递。默认情况下,Java客户端库允许最多1000条消息未完成,即等待ACK或NACK或等待最大ACK扩展期限过去。
这篇关于Google PubSub重新发送的消息未被处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
沃梦达教程
本文标题为:Google PubSub重新发送的消息未被处理
基础教程推荐
猜你喜欢
- 降序排序:Java Map 2022-01-01
- FirebaseListAdapter 不推送聊天应用程序的单个项目 - Firebase-Ui 3.1 2022-01-01
- 设置 bean 时出现 Nullpointerexception 2022-01-01
- 减少 JVM 暂停时间 >1 秒使用 UseConcMarkSweepGC 2022-01-01
- Java:带有char数组的println给出乱码 2022-01-01
- Java Keytool 导入证书后出错,"keytool error: java.io.FileNotFoundException &拒绝访问" 2022-01-01
- 无法使用修饰符“public final"访问 java.util.Ha 2022-01-01
- 在 Libgdx 中处理屏幕的正确方法 2022-01-01
- 如何使用 Java 创建 X509 证书? 2022-01-01
- “未找到匹配项"使用 matcher 的 group 方法时 2022-01-01