rabbitmq优雅的出口应该等待事件处理者

我有麻烦做正确的方式。 我有一个.net核心应用程序在docker集装箱。 当docker集装箱停下来,我需要等待我的邮件处理(上传到FTP),并closures连接。 但是,现在我可以closures连接,但不会等待我的eventHandler完成。 任何想法的解决scheme?

它看起来很像: 等待RabbitMQ线程在Windows Service OnStop()中完成 。

虽然我不使用Windows服务,但基本概念将是一样的,但我不想让自己离队。

这是我的代码看起来像:

public class AmqpConsumer : IAmqpConsumer { private readonly IConnection _connection; private readonly ILogger<AmqpConsumer> _logger; public AmqpConsumer(ILogger<AmqpConsumer> logger, IOptions<AmqpMessageBrokerConfig> config, IConnectionFactory connectionFactory) { _logger = logger; connectionFactory.Uri = config.Value.Uri; _connection = connectionFactory.CreateConnection(); } public void CloseConnecion() { _logger.LogInformation("Close connection."); _connection.Close(); _logger.LogInformation("Connection closed."); } public async Task ConsumeAsync<T>(AmqpConsumeConfiguration config, Action<string> eventHandler, CancellationToken cancellationToken) { if (eventHandler == null) { throw new ArgumentNullException(nameof(eventHandler)); } if (cancellationToken == null) { throw new ArgumentNullException(nameof(cancellationToken)); } _logger.LogInformation("Start consuming"); using (var channel = _connection.CreateModel()) { channel.ExchangeDeclare(config.Exchange, config.Type); channel.QueueDeclare(config.Queue, true, false, false, null); channel.QueueBind(config.Queue, config.Exchange, config.RoutingKey); _logger.LogInformation("Exchange, queue declared and binded"); var consumer = new EventingBasicConsumer(channel); _logger.LogInformation("Consumer created"); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); eventHandler(message); }; _logger.LogInformation("Received handler added"); channel.BasicConsume(config.Queue, true, consumer); _logger.LogInformation("Basic consume called"); try { await Task.Delay(Timeout.Infinite, cancellationToken); } catch (TaskCanceledException ex) { _logger.LogError($"Error occured {ex}"); } } } public void Dispose() { _connection?.Dispose(); } } 

并在启动时我有:

 public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime) { app.UseMvc(); var messageBroker = app.ApplicationServices.GetService<IMessageBroker>(); messageBroker.StartAsync(new CancellationToken()); lifetime.ApplicationStopping.Register(HandleShutdown(app.ApplicationServices.GetService<IAmqpConsumer>())); } private Action HandleShutdown(IAmqpConsumer consumer) { return consumer.CloseConnecion; } 

我在想我可以全局使用cancellationToken ,然后在HandleShutdown方法中调用IsCancellation Requested。 但是,再次,这仍然不会处理当前eventHandler(FTP上传),并closures应用程序。 所以也许我需要添加一些方法的布尔值,任何build议都是值得欢迎的。