原文:USING AZURE SERVICE BUS QUEUES WITH ASP.NET CORE SERVICES
作者:damienbod[1]
譯文:如何在ASP.NET Core中使用Azure Service Bus Queue
地址:https://www.cnblogs.com/lwqlun/p/10760227.html
作者:Lamond Lu
原始碼:https://github.com/lamondlu/AzureServiceBusMessaging
本文展示瞭如何使用Azure Service Bus Queue, 實現2個ASP.NET Core Api應用之間的訊息傳輸。
配置Azure Service Bus Queue
你可以從官網檔案中瞭解到如何配置一個Azure Service Bus Queue.
https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal
這裡我們使用Queue或者Topic來實現訊息傳輸。Queue是一種訊息傳輸型別,一旦一個訊息被一個消費者接收了,該訊息就會從Queue中被移除。
與Queue不同,Topic提供的是一對多的通訊方式。
架構圖
整個應用的實現如下:
•Api 1負責傳送訊息•Api 2負責監聽Azure Service Bus,並處理接收到的訊息
實現一個Service Bus Queue
這裡我們首先需要引入Microsoft.Azure.ServiceBus[2] 程式集。Microsoft.Azure.ServiceBus[3]是Azure Service Bus的客戶端庫。針對Service Bus的連線字串我們儲存在專案的User Secret中。當部署專案的時候,我們可以使用Azure Key Valut來設定這個Secret值。
在Visual Studio中,右鍵點選API1, API2專案屬性,選擇Manage User Secrets就可以管理當前專案使用的所有私密資訊。
為了傳送向Azure Service Bus Queue傳送訊息,我們需要建立一個SendMessage
方法,並接收一個訊息引數。這裡我們建立了一個我們自己的訊息內容型別MyPayload
, 將當前該MyPayload
物件序列化成Json字串, 新增到一個Message
物件中。
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using System.Text;
using System.Threading.Tasks;
namespace ServiceBusMessaging
{
public class ServiceBusSender
{
private readonly QueueClient _queueClient;
private readonly IConfiguration _configuration;
private const string QUEUE_NAME = "simplequeue";
public ServiceBusSender(IConfiguration configuration)
{
_configuration = configuration;
_queueClient = new QueueClient(
_configuration
.GetConnectionString("ServiceBusConnectionString"),
QUEUE_NAME);
}
public async Task SendMessage(MyPayload payload)
{
string data = JsonConvert.SerializeObject(payload);
Message message = new Message(Encoding.UTF8.GetBytes(data));
await _queueClient.SendAsync(message);
}
}
}
在API 1和API 2中,我們需要將ServiceBusSender
註冊到應用程式的IOC容器中。這裡為了測試方便,我們同時註冊Swagger
服務。
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc();
services.AddScoped();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new Info
{
Version = "v1",
Title = "Payload View API",
});
});
}
接下來,我們就可以在控制器中透過建構式註入的方式使用這個服務了。
在API1中,我們建立一個POST方法,這個方法會將API接收到Payload
物件傳送到Azure Service Bus Queue中。
[HttpPost]
[ProducesResponseType(typeof(Payload), StatusCodes.Status200OK)]
[ProducesResponseType(typeof(Payload), StatusCodes.Status409Conflict)]
public async Task Create([FromBody][Required]Payload request)
{
if (data.Any(d => d.Id == request.Id))
{
return Conflict($"data with id {request.Id} already exists");
}
data.Add(request);
// Send this to the bus for the other services
await _serviceBusSender.SendMessage(new MyPayload
{
Goals = request.Goals,
Name = request.Name,
Delete = false
});
return Ok(request);
}
從Queue中獲取訊息
為了監聽Azure Service Bus Queue, 並處理接收到的訊息,我們建立了一個新類ServiceBusConsumer
,ServiceBusConsumer
實現了IServiceBusConsumer
介面。
Queue的連線字串是使用IConfiguration
讀取的。RegisterOnMessageHandlerAndReceiveMessages
方法負責註冊訊息處理程式ProcessMessagesAsync
處理訊息。ProcessMessagesAsync
方法會將得到的訊息轉換成物件,並呼叫IProcessData
介面完成最終的訊息處理。
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ServiceBusMessaging
{
public interface IServiceBusConsumer
{
void RegisterOnMessageHandlerAndReceiveMessages();
Task CloseQueueAsync();
}
public class ServiceBusConsumer : IServiceBusConsumer
{
private readonly IProcessData _processData;
private readonly IConfiguration _configuration;
private readonly QueueClient _queueClient;
private const string QUEUE_NAME = "simplequeue";
private readonly ILogger _logger;
public ServiceBusConsumer(IProcessData processData,
IConfiguration configuration,
ILogger logger)
{
_processData = processData;
_configuration = configuration;
_logger = logger;
_queueClient = new QueueClient(
_configuration.GetConnectionString("ServiceBusConnectionString"), QUEUE_NAME);
}
public void RegisterOnMessageHandlerAndReceiveMessages()
{
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 1,
AutoComplete = false
};
_queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
}
private async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
var myPayload = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(message.Body));
_processData.Process(myPayload);
await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
}
private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
_logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
_logger.LogDebug($"- Endpoint: {context.Endpoint}");
_logger.LogDebug($"- Entity Path: {context.EntityPath}");
_logger.LogDebug($"- Executing Action: {context.Action}");
return Task.CompletedTask;
}
public async Task CloseQueueAsync()
{
await _queueClient.CloseAsync();
}
}
}
其中IProcessData
介面存在於類庫專案ServiceBusMessaging
中,它是用來處理訊息的。
public interface IProcessData
{
void Process(MyPayload myPayload);
}
在Api 2中,我們建立一個ProcessData
類,它實現了IProcessData
介面。
public class ProcessData : IProcessData
{
public void Process(MyPayload myPayload)
{
DataServiceSimi.Data.Add(new Payload
{
Name = myPayload.Name,
Goals = myPayload.Goals
});
}
}
這裡為了簡單測試,我們建立了一個靜態類
DataServiceSimi
,其中存放了API2中所有儲存Payload
物件。同時,我們還建立了一個新的控制器ViewPayloadMessagesController
,在其中添加了一個GET Action,並傳回了靜態類DataServiceSimi
中的所有資料。
[Route("api/[controller]")]
[ApiController]
public class ViewPayloadMessagesController : ControllerBase
{
[HttpGet]
[ProducesResponseType(StatusCodes.Status200OK)]
public ActionResult> Get()
{
return Ok(DataServiceSimi.Data);
}
}
最後我們還需要將ProcessData
註冊到API2的IOC容器中。
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc();
services.AddSingleton();
services.AddTransient();
}
最終效果
現在我們分別啟用2個Api專案,併在Api 1的Swagger檔案介面,呼叫POST請求,新增一個Payload
操作完成之後,我們訪問Api 2的/api/ViewPayloadMessages, 獲得結果如下,Api 1發出的訊息出現在了Api 2的結果集中,這說明Api 2從Azure Service Bus Queue中獲取了訊息,並儲存在了自己的靜態類DataServiceSimi
中。
References
[1]
damienbod: https://damienbod.com/author/damienbod/[2]
Microsoft.Azure.ServiceBus: https://www.nuget.org/packages/Microsoft.Azure.ServiceBus[3]
Microsoft.Azure.ServiceBus: https://www.nuget.org/packages/Microsoft.Azure.ServiceBus
朋友會在“發現-看一看”看到你“在看”的內容