歡迎光臨
每天分享高質量文章

在Asp.Net Core中整合Kafka

  在我們的業務中,我們通常需要在自己的業務子系統之間相互傳送訊息,一端去傳送訊息另一端去消費當前訊息,這就涉及到使用訊息佇列MQ的一些內容,訊息佇列成熟的框架有多種,這裡你可以讀這篇文章來瞭解這些MQ的不同,這篇文章的主要目的是用來系統講述如何在Asp.Net Core中使用Kafka,整篇文章將介紹如何寫訊息傳送方程式碼、消費方程式碼、配套的工具的使用,希望讀完這篇文章之後對整個訊息的執行機制有一定的理解,在這裡透過一張圖來簡要瞭解一下訊息佇列中的一些概念。

圖一 Kafka訊息佇列

一 安裝NUGET包

在寫程式碼之前首先要做的就是安裝nuget包了,我們這裡使用的是Confluent.Kafka 1.0.0-RC4版本,具體專案要根據具體的時間來確定取用包的版本,這些包可能更新比較快。

圖二 取用Kafka包依賴

二 訊息傳送方(Producer)

1 在專案中新增所有觸發事件的介面 IIntegrationEvent,後面所有的觸發事件都是繼承自這個介面。

///
    /// 整合事件的介面定義
    ///
    public interface IIntegrationEvent {
        string Key { get; set; }
    }

2 定義Kafka生產者

///
    /// Kafka 生產者的 Domain Service
    ///
    public class KafkaProducer : DomainService {
        private readonly IConfiguration _config;
        private readonly ILogger _logger;
        public KafkaProducer(IConfiguration config,
                             ILogger logger) {
            _config = config;
            _logger = logger;
        }
        ///
        /// 傳送事件
        ///
        ///
        public void Produce(IIntegrationEvent @event) {
            var topic = _config.GetValue($”Kafka:Topics:{@event.GetType().Name}”);
            var producerConfig = new ProducerConfig {
                BootstrapServers = _config.GetValue(“Kafka:BootstrapServers”),
                MessageTimeoutMs = _config.GetValue(“Kafka:MessageTimeoutMs”)
            };
            var builder = new ProducerBuilder(producerConfig);
            using (var producer = builder.Build()) {
                try {
                    var json = JsonConvert.SerializeObject(@event);
                    var dr = producer.ProduceAsync(topic, new Message { Key = @event.Key, Value = json }).GetAwaiter().GetResult();
                    _logger.LogDebug(“傳送事件 {0} 到 {1} 成功”, dr.Value, dr.TopicPartitionOffset);
                } catch (ProduceException ex) {
                    _logger.LogError(ex, “傳送事件到 {0} 失敗,原因 {1} “, topic, ex.Error.Reason);
                }
            }
        }
    }

在這裡我們的Producer根據業務的需要定義在領域服務中,這裡面最關鍵的就是Produce方法了,該方法的引數是繼承自IIntegrationEvent 介面的各種各樣事件,在這個方法中,我們獲取配置在appsetting.json中配置的各種Topic以及Kafka伺服器的地址,具體的配置如下方截圖所示。

圖三 配置伺服器地址以及各種Topic

透過當前配置我們就知道我們的訊息要發往何處,然後我們就可以建立一個producer來將我們的事件(實際上是定義的資料結構)序列化成Json,然後透過非同步的方式發送出去,這裡需要註意我們建立的Producer要放在一個using塊中,這樣在建立完成併傳送訊息之後就會釋放當前生產者。這裡如果傳送失敗會在當前日誌中記錄傳送的值以及錯誤的原因從而便於進行除錯。這裡舉出其中的一個事件RepairContractFinishedEvent為例來說明。

///
    /// 維修合同完成的事件
    ///
    public class RepairContractFinishedEvent : IIntegrationEvent {
        public RepairContract RepairContract { get; set; }
        //一個維修合同會對應多個調整單
        public List RepairContractAdjusts { get; set; }
        public string Key { get; set; }
    }

這個裡面RepairContract以及List集合都是我們定義的一種資料結構。

最後我們來看看在具體的領域層中我們該如何觸發此事件的,這裡我們也定義了一個叫做IRepairContractEventManager介面的領域服務,併在裡面定義了一個叫做Finished的介面,然後在RepairContractEventManager中實現該方法。

public class RepairContractEventManager : DomainService, IRepairContractEventManager {
       private readonly KafkaProducer _producer;
       private readonly IRepository _repairContractRepository;
       private readonly IRepository _repairContractAdjustRepository;
       public RepairContractEventManager(KafkaProducer producer,
                                         IRepository repairContractRepository,
                                         IRepository repairContractAdjustRepository) {
           _producer = producer;
           _repairContractRepository = repairContractRepository;
           _repairContractAdjustRepository = repairContractAdjustRepository;
       }
       public void Finished(Guid repairContractId) {
           var repairContract = _repairContractRepository.GetAll()
               .Include(c => c.RepairContractWorkItems).ThenInclude(w => w.Materials)
               .SingleOrDefaultAsync(c => c.Id == repairContractId).GetAwaiter().GetResult();
           var repairContractAdjusts = _repairContractAdjustRepository.GetAll()
               .Include(a => a.WorkItems).ThenInclude(w => w.Materials)
               .Where(a => a.RepairContractId == repairContractId).ToListAsync().GetAwaiter().GetResult();
           var @event = new RepairContractFinishedEvent {
               Key = repairContract?.Code,
               RepairContract = repairContract,
               RepairContractAdjusts = repairContractAdjusts
           };
           _producer.Produce(@event);
       }
   }

 這段程式碼就是組裝RepairContractFinishedEvent的具體實現過程,然後呼叫我們之前建立的KafkaProducer物件然後將訊息發送出去,這樣在需要觸發當前RepairContractFinishedEvent 的地方來註入IRepairContractEventManager介面,然後調對應的Finished方法,這樣就完成了整個訊息的傳送的過程了。

三 檢視訊息的傳送

在傳送完訊息後我們可以到Kafka 叢集 Control Center中查詢我們傳送的所有訊息。選擇其中的一條訊息,雙擊,然後選擇INSPECT來檢視傳送的訊息

圖四 Kafka Control Center中檢視傳送訊息

四 訊息的接收方(Consumer)

在正確建立訊息的傳送方後緊接著就是定義訊息的接收方了,訊息的接收方顧名思義就是消費剛才訊息的一方,這裡的步驟和傳送類似,但是也有很大的不同,訊息的消費方核心是一個後臺服務,並且在單獨的執行緒中監聽來自傳送方的訊息,併進行消費,這裡我們先定義一個叫做KafkaConsumerHostedService的基類,我們具體來看看程式碼。

///
    /// Kafka 消費者的後臺服務基礎類
    ///
    /// 事件型別
    public abstract class KafkaConsumerHostedService : BackgroundService where T : IIntegrationEvent {
        protected readonly IServiceProvider _services;
        protected readonly IConfiguration _config;
        protected readonly ILogger> _logger;
        public KafkaConsumerHostedService(IServiceProvider services, IConfiguration config, ILogger> logger) {
            _services = services;
            _config = config;
            _logger = logger;
        }
        ///
        /// 消費該事件,比如呼叫 Application Service 持久化資料等
        ///
        ///事件內容
        protected abstract void DoWork(T @event);
        ///
        /// 構造 Kafka 消費者實體,監聽指定 Topic,獲得最新的事件
        ///
        ///終止標識
        ///
        protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
            await Task.Factory.StartNew(() => {
                var topic = _config.GetValue($”Kafka:Topics:{typeof(T).Name}”);
                var consumerConfig = new ConsumerConfig {
                    BootstrapServers = _config.GetValue(“Kafka:BootstrapServers”),
                    AutoOffsetReset = AutoOffsetReset.Earliest,
                    GroupId = _config.GetValue(“Application:Name”),
                    EnableAutoCommit = true,
                };
                var builder = new ConsumerBuilder(consumerConfig);
                using (var consumer = builder.Build()) {
                    consumer.Subscribe(topic);
                    while (!stoppingToken.IsCancellationRequested) {
                        try {
                            var result = consumer.Consume(stoppingToken);
                            var @event = JsonConvert.DeserializeObject(result.Value);
                            DoWork(@event);
                            //consumer.StoreOffset(result);
                        } catch (OperationCanceledException ex) {
                            consumer.Close();
                            _logger.LogDebug(ex, “Kafka 消費者結束,退出後臺執行緒”);
                        } catch (AbpValidationException ex) {
                            _logger.LogError(ex, $”Kafka {GetValidationErrorNarrative(ex)}”);
                        } catch (ConsumeException ex) {
                            _logger.LogError(ex, “Kafka 消費者產生異常”);
                        } catch (KafkaException ex) {
                            _logger.LogError(ex, “Kafka 產生異常”);
                        } catch (ValidationException ex) {
                            _logger.LogError(ex, “Kafka 訊息驗證失敗”);
                        } catch (Exception ex) {
                            _logger.LogError(ex, “Kafka 捕獲意外異常”);
                        }
                    }
                }
            }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
        }
        private string GetValidationErrorNarrative(AbpValidationException validationException) {
            var detailBuilder = new StringBuilder();
            detailBuilder.AppendLine(“驗證過程中檢測到以下錯誤”);
            foreach (var validationResult in validationException.ValidationErrors) {
                detailBuilder.AppendFormat(” – {0}”, validationResult.ErrorMessage);
                detailBuilder.AppendLine();
            }
            return detailBuilder.ToString();
        }
    }

這段程式碼中我們會建立一個consumer,這裡我們會在一個While迴圈中去訂閱特定Topic訊息,這裡的BootstrapServers是和傳送方保持一致,並且也是在當前應用程式中的appsetting.json中進行配置的,而且這裡的consumer.Consume方法是一個阻塞式方法,當傳送方傳送特定事件後,這裡會接收到同樣名稱的Topic的訊息,然後將接收到的Json資料進行反序列化,然後交由後面的DoWork方法進行處理。這裡還是以之前生成者傳送的RepairContractFinished事件為例,這裡也需要定義一個RepairContractFinishedEventHandler來處理生產者傳送的訊息。

public class RepairContractFinishedEventHandler : KafkaConsumerHostedService {
        public RepairContractFinishedEventHandler(IServiceProvider services,
            IConfiguration config, ILogger> logger)
            : base(services, config, logger) {
        }
        ///
        /// 呼叫 Application Service,新增或更新維修合同及關聯物體
        ///
        ///待消費的事件
        protected override void DoWork(RepairContractFinishedEvent @event) {
            using (var scope = _services.CreateScope()) {
                var service = scope.ServiceProvider.GetRequiredService();
                service.AddOrUpdateRepairContract(@event.RepairContract, @event.RepairContractAdjusts);
            }
        }
    }

這裡需要特別註意的是在這裡我麼也需要定義一個繼承自IIntegrationEvent介面的事件,這裡也是定義一種資料結構,並且這裡的資料結構和生成者定義的要保持一致,否則消費方在反序列化的時候會丟失不能夠匹配的資訊。

public class RepairContractFinishedEvent : IIntegrationEvent {
        public RepairContractDto RepairContract { get; set; }
        public List RepairContractAdjusts { get; set; }
        public string Key { get; set; }
    }

另外在DoWork方法中我們也需要註意程式碼也需要用using包裹,從而在消費方消費完後釋放掉當前的應用服務。最後需要註意的就是我們的每一個Handle都是一個後臺服務,我們需要在Asp.Net Core的Startup的ConfigureServices進行配置,從而將當前的後臺服務新增到Asp.Net Core依賴註入容器中。

///
     /// 註冊整合事件的處理器
     ///
     ///
     private void AddIntegrationEventHandlers(IServiceCollection services) {
         services.AddHostedService();
         services.AddHostedService();
         services.AddHostedService();
         services.AddHostedService();
         services.AddHostedService();
         services.AddHostedService();
         services.AddHostedService();
         services.AddHostedService();
         services.AddHostedService();
     }

最後我們也看看我們的appsetting.json的配置檔案關於kafka的配置。

“Kafka”: {
    “BootstrapServers”: “127.0.0.1:9092”,
    “MessageTimeoutMs”: 5000,
    “Topics”: {
      “RepairContractFinishedEvent”: “repair-contract-finished”,
      “AddOrUpdateProductCategoryEvent”: “add-update-product-category”,
      “AddOrUpdateDealerEvent”: “add-update-dealer”,
      “ClaimApproveEvent”: “claim-approve”,
      “ProductTransferDataEvent”: “product-update”,
      “PartUpdateEvent”: “part-update”,
      “VehicleSoldFinishedEvent”: “vehiclesold-finished”,
      “CustomerFinishedEvent”: “customer-update”,
      “VehicleInformationUpdateStatusEvent”: “add-update-vehicle-info”,
      “AddCustomerEvent”: “add-customer”
    }
  },

這裡需要註意的是傳送方和接收方必須保證Topic一致,並且配置的伺服器名稱埠保持一致,這樣才能夠保證訊息的準確傳送和接收。最後對於服務端,這裡推薦一個VSCode的外掛kafka,能夠建立併傳送訊息,這樣就方便我們來傳送我們需要的資料了,這裡同樣需要我們先建立一個.kafka的檔案,然後配置Kafka服務的地址和埠號。

圖五 利用VSCode Kafka外掛傳送訊息

已同步到看一看
贊(0)

分享創造快樂