0%

public class RabbitMQEventBus : BaseEventBus

{

private readonly IConnectionFactory connectionFactory;

private readonly IConnection connection;

private readonly IModel channel;

private readonly string exchangeName;

private readonly string exchangeType;

private readonly string queueName;

private readonly bool autoAck;

private readonly ILogger logger;

private bool disposed;

public RabbitMQEventBus(IConnectionFactory connectionFactory,

ILogger<RabbitMQEventBus> logger,

IEventHandlerExecutionContext context,

string exchangeName,

string exchangeType = ExchangeType.Fanout,

string queueName = null``,

bool autoAck = false``)

: base``(context)

{

this``.connectionFactory = connectionFactory;

this``.logger = logger;

this``.connection = this``.connectionFactory.CreateConnection();

this``.channel = this``.connection.CreateModel();

this``.exchangeType = exchangeType;

this``.exchangeName = exchangeName;

this``.autoAck = autoAck;

this``.channel.ExchangeDeclare(``this``.exchangeName, this``.exchangeType);

this``.queueName = this``.InitializeEventConsumer(queueName);

logger.LogInformation($``"RabbitMQEventBus构造函数调用完成。Hash Code:{this.GetHashCode()}."``);

}

public override Task PublishAsync<TEvent>(TEvent @``event``, CancellationToken cancellationToken = default``(CancellationToken))

{

var json = JsonConvert.SerializeObject(@``event``, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });

var eventBody = Encoding.UTF8.GetBytes(json);

channel.BasicPublish(``this``.exchangeName,

@``event``.GetType().FullName,

null``,

eventBody);

return Task.CompletedTask;

}

public override void Subscribe<TEvent, TEventHandler>()

{

if (!``this``.eventHandlerExecutionContext.HandlerRegistered<TEvent, TEventHandler>())

{

this``.eventHandlerExecutionContext.RegisterHandler<TEvent, TEventHandler>();

this``.channel.QueueBind(``this``.queueName, this``.exchangeName, typeof``(TEvent).FullName);

}

}

protected override void Dispose(``bool disposing)

{

if (!disposed)

{

if (disposing)

{

this``.channel.Dispose();

this``.connection.Dispose();

logger.LogInformation($``"RabbitMQEventBus已经被Dispose。Hash Code:{this.GetHashCode()}."``);

}

disposed = true``;

base``.Dispose(disposing);

}

}

private string InitializeEventConsumer(``string queue)

{

var localQueueName = queue;

if (``string``.IsNullOrEmpty(localQueueName))

{

localQueueName = this``.channel.QueueDeclare().QueueName;

}

else

{

this``.channel.QueueDeclare(localQueueName, true``, false``, false``, null``);

}

var consumer = new EventingBasicConsumer(``this``.channel);

consumer.Received += async (model, eventArgument) =>

{

var eventBody = eventArgument.Body;

var json = Encoding.UTF8.GetString(eventBody);

var @``event = (IEvent)JsonConvert.DeserializeObject(json, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });

await this``.eventHandlerExecutionContext.HandleEventAsync(@``event``);

if (!autoAck)

{

channel.BasicAck(eventArgument.DeliveryTag, false``);

}

};

this``.channel.BasicConsume(localQueueName, autoAck: this``.autoAck, consumer: consumer);

return localQueueName;

}

}