返回

Spring kafka MessageListener 和 max.poll.records

发布时间:2022-05-20 17:54:53 355

我正在使用spring kafka 2.7.8来使用来自kafka的消息。消费者监听器如下

@KafkaListener(topics = "topicName",
            groupId = "groupId",
            containerFactory = "kafkaListenerFactory")
    public void onMessage(ConsumerRecord record) {

   }

上面的onMessage方法一次接收一条消息。

这是否意味着max.poll.spring库将记录设置为1,或者它一次轮询500个记录(默认值),然后该方法逐个接收。

这个问题的原因是,我们经常在prod中同时看到以下错误。在一分钟内收到了多个消费者的所有4个错误。试图了解这是由于卡夫卡代理的间歇性连接问题还是由于负载。请告知。

Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.

seek to current after exception; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets {topic-9=OffsetAndMetadata{offset=2729058, leaderEpoch=null, metadata=''}}

Consumer clientId=consumer-groupName-5, groupId=consumer] Offset commit failed on partition topic-33 at offset 2729191: The coordinator is not aware of this member.

Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records

特别声明:以上内容(图片及文字)均为互联网收集或者用户上传发布,本站仅提供信息存储服务!如有侵权或有涉及法律问题请联系我们。
举报
评论区(1)
按点赞数排序
用户头像