kafka預設保存7天(168 hour)的log在disk,處理message過程中可能會出現異常或非預期錯誤(如網路中斷、disk 問題),
這時有可能造成我們資料遺失或不一致,這篇來看看如何重新consume這些資料。
kafka要重新consume某個topic資料,須指定partition和 offset即可,下面簡單示範
目前該consumer group沒有任何Lag。
查看該topic的offset 最大值 (-2 最小值)
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.17.0.3:9092 -topic topic --time -1
Partition 0 :最大33
透過kafka-console-consumer.sh 指定從offset 30 開始再次消費(假設我們從30後就沒有consume資料)
bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.3:9092 --topic topic --consumer.config config/consumer.properties --offset 30 --partition 0
參考