初試 Kafka - 實作生產者-消費者模式(Producer-Consumer Pattern)

過去工作專案大多使用 RabbitMQ 來處理訊息佇列,RabbitMQ 的幾種模式(Direct, Topic, Fanout)都有使用,因為都可以解決大多數的應用情境,所以就沒有想要去使用 Kafak,畢竟多建立一套服務就需要再多花時間去維護。

一直以來也想要找個時間來玩玩 Kafka,於是就利用週末時間學習怎麼使用 Docker 架設服務,程式開發練習怎麼使用 Confluent.Kafka,這是一篇學習筆記。

Kafka

Apache Kafka 是一個分散式的流處理平台,專為高吞吐量、低延遲的數據流管理而設計。最初由 LinkedIn 開發並開源,現在由 Apache 基金會維護。Kafka 常被用作消息系統、事件流平台,並適合處理大量的即時資料串流(Real-Time Data Streaming)。

即時資料串流(Real-Time Data Streaming)是一種處理和傳輸數據的方式,主要在以低延遲或幾乎無延遲的方式將資料從資料源持續地傳輸到消費者,並進行即時處理和分析。即時資料串流適合需要快速反應的場景,例如即時監控、金融交易、事件驅動的應用程序等。

以下是 Kafka 的一些核心概念和特性:

1. 核心概念

  • Producer(生產者):負責將數據發送到 Kafka,將消息發送到一個或多個主題(Topics)。
  • Consumer(消費者):負責從 Kafka 中讀取資料,可以將消息分配到多個消費者組中。
  • Topic(主題):消息的分類單位。Producer 將消息發送到特定的主題,Consumer 從主題中讀取消息。
  • Partition(分區):每個主題可以分為多個分區,每個分區會將數據存儲在有序且不可變的日志中。分區允許水平擴展和更高的並行處理能力。
  • Broker(節點):Kafka 集群中的服務節點,每個 Broker 可以管理多個主題和分區。
  • Consumer Group(消費者組):每個消費者組中的消費者會共享一個主題的消息,而一條消息只會被消費者組中的一個成員消費,這樣可以保證組內的負載平衡。

2. Kafka 的特性

  • 高吞吐量和低延遲:Kafka 可以處理大量的即時資料串流,適合處理高頻和海量資料。
  • 持久化存儲:Kafka 將消息持久化到磁碟中,並通過日志保留策略控制資料的保留時間。
  • 分散式和擴展性:通過分區,Kafka 可以輕鬆水平擴展,實現高可用性和資料冗餘。
  • 可靠性:Kafka 通過多副本(replication)機制確保資料可靠,並且支持分區內的故障轉移。

3. 典型應用場景

  • 消息代理:Kafka 可以用作高效的消息佇列系統,生產者和消費者通過主題傳遞消息。
  • 事件流處理:用於事件驅動架構中,Kafka 可記錄應用的狀態變化,如交易、日誌、活動流等。
  • 資料管道:Kafka 常被用作資料管道的核心組件,實現從不同來源收集資料並將其發送到各種目標(如 Data Lake、分析系統等)。
  • 實時監控和分析:Kafka 可以幫助實現實時監控,如資料串流分析、告警和通知。

總結

Kafka 是一個強大且靈活的串流處理平台,適合用於需要高吞吐、低延遲的資料串流場景。它的分散式設計和可靠性使其在各種行業中廣泛應用。Kafka 使得構建、擴展和管理實時資料管道更加簡單,是現代資訊基礎設施中的重要組件。

相關連結

 

Kafka vs RabbitMQ

這個應該是常常被拿來討論的議題,但我也只是個 Kafka 初學者,以下就提供幾個連結。

 

使用 Docker 架設 Kafka 服務

使用 docker-compose  架設,使用的 docker images 包含:

  • zookeper
  • kafka
  • cmak
  • kafka-ui
  • redpanda-console

後面三個是 kafka 的管理工具

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"
    volumes:
      - ./zookeeper/data:/var/lib/zookeeper/data
      - ./zookeeper/log:/var/lib/zookeeper/log

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: INTERNAL://:29092,EXTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./kafka/data:/var/lib/kafka/data
      - ./kafka/logs:/var/log/kafka

  cmak:
    image: hlebalbau/kafka-manager:latest
    container_name: cmak
    depends_on:
      - zookeeper
      - kafka
    ports:
      - "9100:9000"
    environment:
      ZK_HOSTS: zookeeper:2181
      APPLICATION_SECRET: letmein
      KAFKA_MANAGER_AUTH_ENABLED: "true"
      KAFKA_MANAGER_USERNAME: admin
      KAFKA_MANAGER_PASSWORD: 1q2w3e4r5t_
    volumes:
      - ./cmak/config:/kafka-manager/conf

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "kafka:29092"
      KAFKA_CLUSTERS_0_ZOOKEEPER: "zookeeper:2181"
    depends_on:
      - kafka
      - zookeeper
    volumes:
      - ./kafka-ui/config:/kafka-ui/config
      - ./kafka-ui/logs:/var/log/kafka-ui

  redpanda-console:
    image: docker.redpanda.com/redpandadata/console:latest
    container_name: redpanda-console
    ports:
      - "8082:8080"
    environment:
      CONSOLE_SERVER_NAME: "Redpanda Console"
      KAFKA_BROKERS: "kafka:29092"
      CONNECT_ENABLED: "false"
      SCHEMAREGISTRY_ENABLED: "false"
    depends_on:
      - kafka

cmak-docker

kafka-ui

Redpanda Console – A UI for Data Streaming

使用 docker-compose up -d 指令啟動服務,啟動完成後並用 docker-compose ps 查看

 

.NET 8 - ASP.NET WebApi  實作 Producer-Consumer

專案裡使用 Confluent.Kafka 這個 NuGet Package 連接 Kafka 並實作 Producer 和 Consumer

confluent-kafka-dotnet (Confluent.Kafka)

簡單的範例程式,所以沒有安裝過多的 NuGet Packages

appsettings.json 設定

將 Producer 與 Consumer 的設定分開來

兩組設定分別會對到兩個類別:KafkaProducerOptions, KafkaConsumerOptions

KafkaProducerOptions.cs

namespace KafkaWebApi.Models;

/// <summary>
/// class KafkaProducerOptions
/// </summary>
public class KafkaProducerOptions
{
    /// <summary>
    /// SectionName - KafkaProducer
    /// </summary>
    public static string SectionName => "KafkaProducerSettings";
    
    /// <summary>
    /// BootstrapServers
    /// </summary>
    public string BootstrapServers { get; set; }

    /// <summary>
    /// Topic
    /// </summary>
    public string Topic { get; set; }

    /// <summary>
    /// GroupId
    /// </summary>
    public string GroupId { get; set; }
}

KafkaConsumerOptions.cs

namespace KafkaWebApi.Models;

/// <summary>
/// class KafkaConsumerOptions
/// </summary>
public class KafkaConsumerOptions
{
    /// <summary>
    /// SectionName - KafkaConsumer
    /// </summary>
    public static string SectionName => "KafkaConsumerSettings";
    
    /// <summary>
    /// BootstrapServers
    /// </summary>
    public string BootstrapServers { get; set; }

    /// <summary>
    /// Topic
    /// </summary>
    public string Topic { get; set; }

    /// <summary>
    /// GroupId
    /// </summary>
    public string GroupId { get; set; }
}

發送端

接著是發送者的介面與類別:IKafkaProducerService, KafkaProducerService

IKafkaProducerService.cs

namespace KafkaWebApi.Services;

/// <summary>
/// interface IKafkaProducerService
/// </summary>
public interface IKafkaProducerService
{
    /// <summary>
    /// 發送訊息
    /// </summary>
    /// <param name="message">The message</param>
    /// <param name="cancellationToken">The cancellationToken</param>
    Task SendMessageAsync(string message, CancellationToken cancellationToken);
}

KafkaProducerService.cs

using Confluent.Kafka;
using KafkaWebApi.Models;
using Microsoft.Extensions.Options;

namespace KafkaWebApi.Services;

/// <summary>
/// class KafkaProducerService
/// </summary>
/// <remarks>
/// 訊息發送服務
/// </remarks>
public class KafkaProducerService : IKafkaProducerService
{
    private readonly ILogger<KafkaProducerService> _logger;

    private readonly TimeProvider _timeProvider;

    private readonly KafkaProducerOptions _kafkaProducerSettings;

    private readonly ProducerConfig _producerConfig;

    /// <summary>
    /// Initializes a new instance of the <see cref="KafkaProducerService"/> class
    /// </summary>
    /// <param name="logger">The logger</param>
    /// <param name="timeProvider">The timeProvider</param>
    /// <param name="kafkaProducerSettings">The kafkaProducerSettings</param>
    /// <param name="producerConfig">The producerConfig</param>
    public KafkaProducerService(ILogger<KafkaProducerService> logger,
                                TimeProvider timeProvider,
                                IOptions<KafkaProducerOptions> kafkaProducerSettings,
                                ProducerConfig producerConfig)
    {
        this._logger = logger;
        this._timeProvider = timeProvider;
        this._kafkaProducerSettings = kafkaProducerSettings.Value;
        this._producerConfig = producerConfig;
    }

    private string TopicName => this._kafkaProducerSettings.Topic;

    /// <summary>
    /// 發送訊息
    /// </summary>
    /// <param name="message">The message</param>
    /// <param name="cancellationToken">The cancellationToken</param>
    public async Task SendMessageAsync(string message, CancellationToken cancellationToken)
    {
        using var producer = new ProducerBuilder<string, string>(this._producerConfig).Build();
        try
        {
            var publishMessage = new Message<string, string>
            {
                Key = Guid.NewGuid().ToString(),
                Value = message
            };

            var result = await producer.ProduceAsync(this.TopicName, publishMessage, cancellationToken);
            this._logger.LogInformation("{CurrentDateTime} 訊息已發送到 Topic: {TopicName}, Offset: {Offset}, TopicPartitionOffset: {TopicPartitionOffset}",
                                        $"{this._timeProvider.GetLocalNow().DateTime:HH:mm:ss.fff}",
                                        $"{this.TopicName}",
                                        $"{result.Offset}",
                                        $"{result.TopicPartitionOffset}");
        }
        catch (ProduceException<Null, string> ex)
        {
            this._logger.LogError(ex, "{CurrentDateTime} 發送失敗: {ErrorReason}",
                                  $"{this._timeProvider.GetLocalNow().DateTime:HH:mm:ss}",
                                  $"{ex.Error.Reason}");
        }
    }
}

而在 KafkaController 的 SendMessageAsync 方法裡會透過 KafakaProducerService 發送訊息

using KafkaWebApi.Services;
using Microsoft.AspNetCore.Mvc;

namespace KafkaWebApi.Controllers;

/// <summary>
/// class KafkaController
/// </summary>
[ApiController]
[Route("api/[controller]")]
public class KafkaController : ControllerBase
{
    private readonly IKafkaProducerService _producerService;

    /// <summary>
    /// Initializes a new instance of the <see cref="KafkaController"/> class
    /// </summary>
    /// <param name="producerService">The producer service</param>
    public KafkaController(IKafkaProducerService producerService)
    {
        this._producerService = producerService;
    }

    /// <summary>
    /// 發送訊息
    /// </summary>
    /// <param name="message">The message</param>
    /// <param name="cancellationToken">The cancellationToken</param>
    /// <returns>A task containing the action result</returns>
    [HttpPost("send")]
    public async Task<IActionResult> SendMessageAsync([FromBody] string message, CancellationToken cancellationToken)
    {
        await this._producerService.SendMessageAsync(message, cancellationToken);
        return this.Ok("Message sent to Kafka.");
    }
}

接收端

這裡使用 BackgroudService 來實作:KafkaConsumerBackgroundService

KafkaConsumerBackgroundService.cs

using Confluent.Kafka;
using KafkaWebApi.Models;
using Microsoft.Extensions.Options;

namespace KafkaWebApi.BackgroundServices;

/// <summary>
/// class KafkaConsumerBackgroundService
/// </summary>
/// <remarks>
/// 訊息消費端
/// </remarks>
public class KafkaConsumerBackgroundService : BackgroundService
{
    private readonly ILogger<KafkaConsumerBackgroundService> _logger;

    private readonly TimeProvider _timeProvider;

    private readonly IServiceScopeFactory _serviceScopeFactory;

    /// <summary>
    /// Initializes a new instance of the <see cref="KafkaConsumerBackgroundService"/> class
    /// </summary>
    /// <param name="logger">The logger</param>
    /// <param name="timeProvider">The timeProvider</param>
    /// <param name="serviceScopeFactory">The serviceScopeFactory</param>
    public KafkaConsumerBackgroundService(ILogger<KafkaConsumerBackgroundService> logger,
                                          TimeProvider timeProvider,
                                          IServiceScopeFactory serviceScopeFactory)
    {
        this._logger = logger;
        this._timeProvider = timeProvider;
        this._serviceScopeFactory = serviceScopeFactory;
    }

    /// <summary>
    /// Executes the stopping token
    /// </summary>
    /// <param name="stoppingToken">The stopping token</param>
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        this._logger.LogInformation(
            "{DateTimeNow} {TypeName} is starting.",
            $"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss}",
            this.GetType().Name);

        using var scope = this._serviceScopeFactory.CreateScope();
        var kafkaConsumerSettings = scope.ServiceProvider.GetRequiredService<IOptions<KafkaConsumerOptions>>().Value;
        var consumerConfig = scope.ServiceProvider.GetRequiredService<ConsumerConfig>();

        using var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
        
        consumer.Subscribe(kafkaConsumerSettings.Topic);

        try
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                await Task.Run(() => this.ProcessKafkaMessage(consumer, stoppingToken), stoppingToken);
                await Task.Delay(10, stoppingToken); // 短暫延遲避免高頻空輪詢
            }
        }
        finally
        {
            // 檢查取消狀態並僅在未取消時調用 Close()
            if (!stoppingToken.IsCancellationRequested)
            {
                consumer.Close();   // 確保消費者在釋放資源前完成關閉    
            }
        }
    }

    /// <summary>
    /// Stops the cancellation token
    /// </summary>
    /// <param name="cancellationToken">The cancellation token</param>
    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        this._logger.LogInformation(
            "{DateTimeNow} {TypeName} is stopping.",
            $"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss}",
            this.GetType().Name);

        await base.StopAsync(cancellationToken);
    }

    /// <summary>
    /// Processes the kafka message
    /// </summary>
    /// <param name="consumer">The consumer</param>
    /// <param name="stoppingToken">The stoppingToken</param>
    private void ProcessKafkaMessage(IConsumer<string, string> consumer, CancellationToken stoppingToken)
    {
        try
        {
            var consumeResult = consumer.Consume(stoppingToken);

            if (consumeResult is null)
            {
                return;
            }

            this._logger.LogInformation("{CurrentDateTime} 訊息已接收, message-key: {MessageId}, message-value: {MessageValue}",
                                        $"{this._timeProvider.GetLocalNow().DateTime:HH:mm:ss.fff}",
                                        consumeResult.Message.Key,
                                        consumeResult.Message.Value);
        }
        catch (OperationCanceledException)
        {
            consumer.Close();
        }
        catch (Exception ex)
        {
            this._logger.LogError(ex, "{CurrentDateTime} Error occurred: {ExceptionMessage}",
                                  $"{this._timeProvider.GetLocalNow().DateTime:HH:mm:ss}",
                                  $"{ex.Message}");
        }
    }
}

服務註冊

Program.cs

using Confluent.Kafka;
using KafkaWebApi.BackgroundServices;
using KafkaWebApi.Models;
using KafkaWebApi.Services;
using Microsoft.Extensions.Options;

var builder = WebApplication.CreateBuilder(args);

//---------------------------------------------------------
// Add services to the container.

builder.Services.AddSingleton(TimeProvider.System);

builder.Services.AddControllers();

// 讀取 Kafka 配置
builder.Services.Configure<KafkaProducerOptions>(builder.Configuration.GetSection(KafkaProducerOptions.SectionName));
builder.Services.Configure<KafkaConsumerOptions>(builder.Configuration.GetSection(KafkaConsumerOptions.SectionName));

// 使用 Singleton 註冊 ProducerConfig
builder.Services.AddSingleton(serviceProvider =>
{
    var kafkaSettings = serviceProvider.GetRequiredService<IOptions<KafkaProducerOptions>>().Value;
    return new ProducerConfig
    {
        BootstrapServers = kafkaSettings.BootstrapServers
    };
});

// Singleton 註冊 ConsumerConfig
builder.Services.AddSingleton(serviceProvider =>
{
    var kafkaSettings = serviceProvider.GetRequiredService<IOptions<KafkaConsumerOptions>>().Value;
    return new ConsumerConfig
    {
        BootstrapServers = kafkaSettings.BootstrapServers,
        GroupId = kafkaSettings.GroupId,
        AutoOffsetReset = AutoOffsetReset.Earliest
    };
});

// 註冊 Kafka 服務
builder.Services.AddSingleton<IKafkaProducerService, KafkaProducerService>();
builder.Services.AddHostedService<KafkaConsumerBackgroundService>();

// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen(options =>
{
    var basePath = AppContext.BaseDirectory;
    var xmlFiles = Directory.EnumerateFiles(basePath, searchPattern: "*.xml", SearchOption.TopDirectoryOnly);

    foreach (var xmlFile in xmlFiles)
    {
        options.IncludeXmlComments(xmlFile);
    }
});


//---------------------------------------------------------

var app = builder.Build();

// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

app.UseHttpsRedirection();

app.UseAuthorization();

app.MapControllers();

app.Run();

專案目錄結構

 

服務執行觀察

對了!在執行服務之前,先到 Kafka 的管理工具裡先建立 Topic「test-topic」

服務執行後,確認 KafkaConsumerBackgroundService 有執行,然後 WebApi 服務也正常執行

透過 Swagger 發送幾則訊息,確認 Producer 有完成訊息發送、Consumer 也有完成訊息接收

開啟 Redpanda 觀察 test-topic 的內容

完成練習

 

相關連結

以上

純粹是在寫興趣的,用寫程式、寫文章來抒解工作壓力