Docker上的Apache Kafka – 在数百个游泳池后停止

我在运行Kafka时遇到了一个问题,即在Java应用程序中用作事件存储。

  • 卡夫卡版本:0.10.1.0
  • Java lib版本:0.10.1.0
  • Docker版本:当前边缘(17.06.0-ce,build 02c1d87) (稳定发布时的问题)
  • Java 1.8
  • Docker组合版本:3.3

卡夫卡在147次请求后突然停下来。 任何configuration,docker设置,主机等的变化不会导致不同的结果。

Docker撰写文件:

version: '3.3' services: queue: image: spotify/kafka ports: - "9092:9092" - "2181:2181" environment: - ADVERTISED_HOST=queue - ADVERTISED_PORT=9092 # kafka-manager: # image: sheepkiller/kafka-manager # ports: # - "9000:9000" # environment: # - ZK_HOSTS=queue:2181 # depends_on: # - queue # links: # - queue postgresql: image: postgres:9.4 ports: - "5432:5432" environment: - POSTGRES_USER=db_user - POSTGRES_PASSWORD=db_pass 

为了调查这个问题,我们创build了一个简单的testing(问题在主应用程序中开始发生) 这是设置:

 @Test public void testKafkaPublisherWithEventStore() { String topicId = UUID.randomUUID().toString(); System.out.println("Topic ID: " + topicId); System.out.println("Entity ID: " + this.aggregateEntity.identity()); KafkaPublisher publisher = new KafkaPublisher( "generalTestTopic_" + topicId, new SubscriberPriorityQueue(new SubscriberPriorityQueueComparator()), this.props, new TestEventConverter() ); //Attach subscribers this.subscriberQueue = new SubscriberPriorityQueue(new SubscriberPriorityQueueComparator()); TestEventStore eventStore = new TestEventStore(TestEntity.class, this.props, new TestEventConverter()); this.subscriberQueue.add(new GeneralSubscriberQueueElement(eventStore, 1)); this.subscriberQueue.forEach(publisher::attachSubscriber); int eventsCount = 0; int publishEventsTimes = 300; try { //Make sure listeners attached before publishing events Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Publishing new events"); for (int j = 0; j < publishEventsTimes; j++) { eventsCount++; publisher.publish(new TestEvent1(this.aggregateEntity.identity(), 1, new Date())); eventsCount++; publisher.publish(new TestEvent2(this.aggregateEntity.identity(), 2, new Date())); } try { int counter = 0; int eventContainerSize = 0; do { counter++; Thread.sleep(100); eventContainerSize = eventStore.get(this.aggregateEntity.identity()).events().size(); System.out.println("Event store caught " + eventContainerSize + " events after " + counter + " tries"); } while(eventContainerSize < eventsCount); } catch (InterruptedException e) { e.printStackTrace(); } assertEquals(eventsCount, eventStore.get(this.aggregateEntity.identity()).events().size()); } 

在本地设置,由于许多打开的文件错误,问题发生很晚,但设置,甚至可笑的高限制,并没有帮助docker。

我不知道其他细节会有用。