将消息发布到docker中运行的Kafka

我在docker集装箱内运行卡夫卡。 我使用以下命令启动我的容器

docker run --rm -p 2181:2181 -p 9092:9092 -p 8081:8081 --env ADVERTISED_HOST=\`docker-machine ip \\`docker-machine active\\`` --env ADVERTISED_PORT=9092 -v /Users/abhishek.srivastava/MyProjects/KafkaTest/target/scala-2.11:/app -it -- name kafka spotify/kafka bash 

我已经写了一个简单的程序,我可以在容器中复制并执行它,它完美的工作。

 object KafkaProducerString { def SendStringMessage(msg: String) : Unit = { val inputRecord = new ProducerRecord[String, String]("test", null, msg) val producer: KafkaProducer[String, String] = CreateProducerString val rm = producer.send(inputRecord).get(10, SECONDS) println(s"offset: ${rm.offset()} partition: ${rm.partition()} topic: ${rm.topic()}") producer.close() } private def CreateProducerString: KafkaProducer[String, String] = { val props = new Properties() props.put("bootstrap.servers", "localhost:9092") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("batch.size", "0") props.put("client.id", "1") val producer = new KafkaProducer[String, String](props) producer } } 

但是,如果我从容器外(从我的Mac)运行这个相同的程序。 [我用docker-machine ip的输出replace“localhost”]

我得到这个错误

 [error] (run-main-0) java.util.concurrent.TimeoutException: Timeout after waiting for 10000 ms. java.util.concurrent.TimeoutException: Timeout after waiting for 10000 ms. at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:50) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) at com.abhi.KafkaProducerString$.SendStringMessage(KafkaProducerString.scala:23) at com.abhi.KafkaMain$$anonfun$main$1.apply$mcVI$sp(KafkaMain.scala:19) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at com.abhi.KafkaMain$.main(KafkaMain.scala:17) at com.abhi.KafkaMain.main(KafkaMain.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) 

我的理解是,对于一个卡夫卡制片人是遥远的,我需要打开的唯一的端口是2181(动物园pipe理员)和9092(卡夫卡),你可以看到我打开了这些。

但仍然相同的程序,当在容器外部执行超时,但在容器内(与本地主机)工作。

编辑::根据下面的build议,我尝试了以下

 docker run --rm -p 127.0.0.1:2181:2181 -p 127.0.0.1:9092:9092 -p 127.0.0.1:8081:8081 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 -v /Users/abhishek.srivastava/MyProjects/KafkaTest/target/scala-2.11:/app -it -- name kafka kafka_9.0 bash 

 docker run --rm -p 0.0.0.0:2181:2181 -p 0.0.0.0:9092:9092 -p 0.0.0.0:8081:8081 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 -v /Users/abhishek.srivastava/MyProjects/KafkaTest/target/scala-2.11:/app -it -- name kafka kafka_9.0 bash 

但是这些都没有解决问题。 我得到完全相同的问题

您将不得不将docker容器绑定到本地机器。 这可以通过使用docker run来完成:

 docker run --rm -p 127.0.0.1:2181:2181 -p 127.0.0.1:9092:9092 -p 127.0.0.1:8081:8081 .... 

或者,您可以使用绑定IP运行docker run:

 docker run --rm -p 0.0.0.0:2181:2181 -p 0.0.0.0:9092:9092 -p 0.0.0.0:8081:8081 ..... 

如果您想让Docker容器在您的networking上可路由,您可以使用:

 docker run --rm -p <private-IP>:2181:2181 -p <private-IP>:9092:9092 -p <private-IP>:8081:8081 .... 

或者,最后你可以通过使用下面的命令来避免你的networking接口被装箱:

 docker run --rm -p 2181:2181 -p 9092:9092 -p 8081:8081 --net host .... 

虽然我自己也面临类似的问题,但我可以试着解释这种行为。

卡夫卡制片人将会在Zookeeper的分区负责人发布该主题之前查找分区负责人。 Zookeeper将拥有由在Docker容器内运行的Kafka服务器标记的主机主机条目。

由于这个事实,服务器标记的IP将是Docker的内部IP,而不是主机IP。 这当然不能从客户端机器解决,因此超时。

可能的解决scheme是将advertised.host.name设置为Docker机器的主机IP。 但是,这会引起另一个问题(就像我面对的一样!)

由服务器提取代理元数据将立即开始失败。 这是因为现在Zookeeper条目具有主机IP,它不能从容器内parsing。 因此,任何消费者应用程序现在都将开始获取LEADER_NOT_AVAILABLE警告。

这是一个僵局,解决scheme主要取决于采用的主机解决策略 。 我想知道人们会怎么build议去这里。