rabbitmq优雅的出口应该等待事件处理者
我有麻烦做正确的方式。 我有一个.net核心应用程序在docker集装箱。 当docker集装箱停下来,我需要等待我的邮件处理(上传到FTP),并closures连接。 但是,现在我可以closures连接,但不会等待我的eventHandler完成。 任何想法的解决scheme?
它看起来很像: 等待RabbitMQ线程在Windows Service OnStop()中完成 。
虽然我不使用Windows服务,但基本概念将是一样的,但我不想让自己离队。
这是我的代码看起来像:
public class AmqpConsumer : IAmqpConsumer { private readonly IConnection _connection; private readonly ILogger<AmqpConsumer> _logger; public AmqpConsumer(ILogger<AmqpConsumer> logger, IOptions<AmqpMessageBrokerConfig> config, IConnectionFactory connectionFactory) { _logger = logger; connectionFactory.Uri = config.Value.Uri; _connection = connectionFactory.CreateConnection(); } public void CloseConnecion() { _logger.LogInformation("Close connection."); _connection.Close(); _logger.LogInformation("Connection closed."); } public async Task ConsumeAsync<T>(AmqpConsumeConfiguration config, Action<string> eventHandler, CancellationToken cancellationToken) { if (eventHandler == null) { throw new ArgumentNullException(nameof(eventHandler)); } if (cancellationToken == null) { throw new ArgumentNullException(nameof(cancellationToken)); } _logger.LogInformation("Start consuming"); using (var channel = _connection.CreateModel()) { channel.ExchangeDeclare(config.Exchange, config.Type); channel.QueueDeclare(config.Queue, true, false, false, null); channel.QueueBind(config.Queue, config.Exchange, config.RoutingKey); _logger.LogInformation("Exchange, queue declared and binded"); var consumer = new EventingBasicConsumer(channel); _logger.LogInformation("Consumer created"); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); eventHandler(message); }; _logger.LogInformation("Received handler added"); channel.BasicConsume(config.Queue, true, consumer); _logger.LogInformation("Basic consume called"); try { await Task.Delay(Timeout.Infinite, cancellationToken); } catch (TaskCanceledException ex) { _logger.LogError($"Error occured {ex}"); } } } public void Dispose() { _connection?.Dispose(); } }
并在启动时我有:
public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime) { app.UseMvc(); var messageBroker = app.ApplicationServices.GetService<IMessageBroker>(); messageBroker.StartAsync(new CancellationToken()); lifetime.ApplicationStopping.Register(HandleShutdown(app.ApplicationServices.GetService<IAmqpConsumer>())); } private Action HandleShutdown(IAmqpConsumer consumer) { return consumer.CloseConnecion; }
我在想我可以全局使用cancellationToken
,然后在HandleShutdown
方法中调用IsCancellation Requested。 但是,再次,这仍然不会处理当前eventHandler(FTP上传),并closures应用程序。 所以也许我需要添加一些方法的布尔值,任何build议都是值得欢迎的。
- 除巧克力以外的其他获得.Net 4.5 sdk的手段
- 如何在使用Dockerfile构build时解决错误“没有这样的文件或目录”错误
- 无法将Npgsql连接到在Docker上运行的Postgres DB
- 使用Windows身份validation将在Linux容器中运行的.NET API与在VM中运行的SQL Server连接起来
- 遇到一个致命的错误。 需要库libhostpolicy.so
- 在Windows Server Core Docker上安装.NET Framework 3.5
- .NET Core SDK与Docker的运行时版本
- 启动时出现Dotnetcore容器错误,询问是否运行SDK命令
- 在Visual Studio 2017中显示多个Docker容器的交互式控制台