[Kafka]Consumer Offset Manager

管理和理解Consumer offset我覺得是入門kafka的首要基礎,不然可能會遇到一些小問題,

如Consumer為什麼一直pull重複資料?Consumer如何從某個offset開始pull資料?

Consumer為什麼不pull資料?..等

目前在kafka cluster架構下,我都一律使用consumer group,

consumer group是kafka所提供的一種容錯、自動平衡、擴充和平行處理的消費機制,

所有consumer加入都使用相同group id,實務上建議number of partition on topic=a multiple of broker,

因為這樣才能達到真正的balance。

管理consumer group中所有的member,kafka透過GroupCoordinator instance來處理,

其中Offset manager(每個broker都有自己的)管理所有offset,所以該information我覺得滿重要的,

相當然我的consumer ap都會帶出該資訊,方便我釐清狀態或debug。

新版(>0.8.2)的kafka不在把offset存在zookeeper,因為zookeeper沒辦法承受大量的讀寫(尤其寫)request,

為了解決這問題,kafka自行建立內部topic(__consumer_offsets)來存放consumer group的offset,

你可以透過下列公式來算出groupcoordinator(那個partition)

consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)

P.S: groupMetadataTopicPartitionCount=50(default)

Kafka選擇讓每個consumer group管理自己的offset,這樣做可以避開broker變成狀態性(影響擴充彈性)、

握手機制來確認消費成功與否,最重要可以省下複雜的資料結構和資源消耗成本,

所以當初在設定consumer.config,我當然開啟enable.auto.commit = true和auto.commit.interval.ms=5000(類似checkpoint),

且kafka會定期保存每個consumer group的offset。

新版的kafka保證以下三件事

  1. producer發送message至相同topic,將保證message順序。
  2. consumer將依照partition上的message順序,依序讀取。
  3. topic上設定replicate N,保證最多允許N-1 server crash且不丟失任何資料(offsets.commit.required.acks=-1)。

知道三大保證後,在來看看Offset 如何commit?

當producer送一條message(序列化成byte[])至broker後,這時broker會讓partition中的leader來處理,

並且確認最少follower都sync完成,這時會通知offset manager紀錄該message的offset,因為設定ack=-1,

所以broker也會確保replicated 的數量已經追上leader,來保證容錯性和無資料遺失。

啟動Consumer訂閱該topic,就會隨機對一個broker發出offsetfetch請求,

由於請求中會包含多個partition的offset訊息,所以接收請求得broker會通知其他broker,

讓該partition的leader來(從broker的leader cache)response對應的offset。

之前我透過 bin/kafka-console-consumer.sh 來ReConsumer,由於這功能我覺得滿實用,

所以我打算包裝kafkacat這強悍的KafkaCLI,並自己簡單寫一個GUI小工具,方便debug或救火使用。

1. Query Metadata

2.Query offset(s) by timestamp(s)

3.Read the last N messages from topic

4.Read message at special timestamp from topic

 

參考

Kafka Consumer Offset Management

[Kafka]ReConsume

[Kafka] consumer offset checker

kafkacat

Manual offsets in Kafka Consumers Example

https://eventador.io/example_kafkacat.html