诊断Kafka连接问题

我试图尽可能在我的卡夫卡连接设置尽可能多的诊断,但它仍然导致神秘的问题。 特别是,我所做的第一件事是使用Kafka Admin Client来获取clusterId,因为如果这个操作失败了,没有别的可能会成功。

def getKafkaClusterId(describeClusterResult: DescribeClusterResult): Try[String] = { try { val clusterId = describeClusterResult.clusterId().get(futureTimeout.length / 2, futureTimeout.unit) Success(clusterId) } catch { case cause: Exception => Failure(cause) } } 

在testing这通常工作,一切都很好。 一般情况下,只有当端点不可达时才会失败。 它失败了,因为未来超时,所以我没有其他的诊断过去了。 为了testing这些问题,我通常telnet到terminal,例如

 $ telnet blah 9094 Trying blah... Connected to blah. Escape character is '^]'. Connection closed by foreign host. 

通常,如果我可以telnet到一个Kafka经纪人,我可以从我的服务器连接到Kafka。 所以我的问题是:

  1. 如果我可以通过telnet访问Kafka经纪人,这意味着什么,但是我无法通过Kafka Admin Client
  2. 还有哪些诊断技术可以解决卡夫卡经纪人连接问题?

在这种特殊情况下,我通过Docker Swarm在AWS上运行Kafka,并试图找出为什么我的服务器无法成功连接。 当我尝试远程login时,我可以在代理日志中看到,所以我知道代理可以访问。 但是,当我的服务器试图连接到3个经纪人中的任何一个,日志完全沉默。

如果你没有正确configurationlisteners和advertised.listeners,基本上Kafka只是不听。 即使telnet正在侦听您configuration的端口,Kafka客户端库也会以静默方式失败。

我认为这是卡夫卡devise中的一个缺陷,导致不必要的混淆。

这是一篇很好的文章,解释了您首次连接到Kafka经纪人时所发生的步骤

https://community.hortonworks.com/articles/72429/how-kafka-producer-work-internally.html

如果您可以远程login到引导程序服务器,那么它正在侦听客户端连接和请求。

然而,客户端不知道哪个真正的代理是主题的每个分区的领导者,所以他们总是发送给引导程序服务器的第一个请求是元数据请求,以获得所有主题元数据的完整列表。 客户端使用来自引导程序服务器的元数据响应来知道它可以在哪里创build与每个试图生成的主题的每个主题分区的活动领导者的每个Kafka经纪人的新连接。

那就是你的错误configuration的经纪人问题发挥的地方。 当您错误地configuration了advertised.listener端口时,第一个元数据请求的结果是redirect客户端以连接到无法访问的IP地址或主机名。 这是第二个连接超时,而不是你telnet到端口的第一个连接。

另一种考虑的方式是,您必须configuration一个Kafka服务器,作为引导服务器和常规的发布/订阅消息代理,因为它向客户端提供了这两种服务。 您的configuration正确的发布/子服务器,但不正确地作为引导服务器,因为内部和外部的IP地址在AWS(也在docker集装箱或NAT或代理之后)是不同的。

在引导服务器通常与客户端最终连接的相同代理的小型集群中,这可能看起来反常直观,但实际上这是一个非常有用的架构devise,它允许kafka进行扩展并无缝地进行故障转移,而不需要提供静态列表您的引导程序服务器列表上有20个或更多的代理,或者维护额外的负载平衡器和运行状况检查,以知道哪个代理redirect客户端请求。