当前位置:首页 » Apache技术知识

apache kafka系列之kafka.common.ConsumerRebalanceFailedException异常解决办法

2015-06-18 18:00 本站整理 浏览(286)


apache kafka系列之kafka.common.ConsumerRebalanceFailedException异常解决办法

http://m.blog.csdn.net/blog/lizhitao/25301387


pache kafka中国社区QQ群:162272557

kafka.common.ConsumerRebalanceFailedException :log-push-record-consumer-group_mobile-pushremind02.lf.xxx.com-1399456594831-99f15e63 can't rebalance after 3 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown
Source)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown Source)
at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown Source)
at com.xxx.mafka.client.consumer.DefaultConsumerProcessor.getKafkaStreams(DefaultConsumerProcessor.java:149)
at com.xxx.mafka.client.consumer.DefaultConsumerProcessor.recvMessage(DefaultConsumerProcessor.java:63)
at com.xxx.service.mobile.push.kafka.MafkaPushRecordConsumer.main(MafkaPushRecordConsumer.java:22)
at com.xxx.service.mobile.push.Bootstrap.main(Bootstrap.java:34)


出现以上问题原因分析:

同一个消费者组(consumer group)有多个consumer先后启动,就是一个消费者组内有多个consumer同时负载消费多个partition数据.
解决办法:
1.配置zk问题(kafka的consumer配置)
zookeeper.session.timeout.ms=5000
zookeeper.connection.timeout.ms=10000
zookeeper.sync.time.ms=2000
在使用高级API过程中,一般出现这个问题是zookeeper.sync.time.ms时间间隔配置过短,不排除有其他原因引起,但笔者遇到一般是这个原因。
给大家解释一下原因:一个消费者组中(consumer数量<partitions数量)每当有consumer发送变化,会触发负载均衡。第一件事就是释放当consumer资源,无则免之,调用ConsumerFetcherThread关闭并释放当前kafka
broker所有连接,释放当前消费的partitons,实际就是删除临时节点(/xxx/consumer/owners/topic-xxx/partitions[0-n]),所有同一个consumer group内所有consumer通过计算获取本consumer要消费的partitions,然后本consumer注册相应临时节点卡位,代表我拥有该partition的消费所有权,其他consumer不能使用。
如果大家理解上面解释,下面就更容易了,当consumer调用Rebalance时,它是按照时间间隔和最大次数采取失败重试原则,每当获取partitions失败后会重试获取。举个例子,假如某个公司有个会议,B部门在某个时间段预订该会议室,但是时间到了去会议室看时,发现A部门还在使用。这时B部门只有等待了,每隔一段时间去询问一下。如果时间过于频繁,则会议室一直会处于占用状态,如果时间间隔设置长点,可能去个2次,A部门就让出来了。
同理,当新consumer加入重新触发rebalance时,已有(old)的consumer会重新计算并释放占用partitions,但是会消耗一定处理时间,此时新(new)consumer去抢占该partitions很有可能就会失败。我们假设设置足够old
consumer释放资源的时间,就不会出现这个问题。
zookeeper.sync.time.ms时间设置过短就会导致old consumer还没有来得及释放资源,new consumer重试失败多次到达阀值就退出了。
zookeeper.sync.time.ms设置时间阀值,要考虑网络环境,服务器性能等因素在内综合衡量。
kafka zk节点存储,请参考:kafka在zookeeper中存储结构