由于Docker延迟,Kafka超时

对于Kafka Docker,我是全新的,并且已经解决了一个问题。 我们对Kafka(Apache)队列的持续集成testing在本地机器上运行得很好,但是当在Jenkins CI服务器上时,偶尔会出现这种错误:

%3|1508247800.270|FAIL|art#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused %3|1508247800.270|ERROR|art#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused %3|1508247800.270|ERROR|art#producer-1| [thrd:localhost:9092/bootstrap]: 1/1 brokers are down 

工作理论是,Docker镜像需要时间才能开始,到时候卡夫卡制作人已经放弃了。 有问题的代码是

  producer_properties = { 'bootstrap.servers': self._job_queue.bootstrap_server, 'client.id': self._job_queue.client_id } try: self._producer = kafka.Producer(**producer_properties) except: print("Bang!") 

上面的错误行出现在制作人的创作中。 但是,不会引发exception,并且调用返回一个看起来有效的外观生产者,所以我不能以编程方式testing代理端点的存在。 是否有API来检查代理的状态?

看起来,如果连接到代理失败,客户端不会抛出exception。 当第一次生产者尝试发送消息时,它实际上试图连接到引导服务器。 如果连接失败,它将重复尝试连接到在引导列表中传递的任何代理。 最后,如果经纪人出现,发送将会发生(我们可能会检查callback函数中的状态)。 融合的kafka python库正在使用librdkafka库,这个客户端似乎没有正确的文档。 由卡夫卡协议指定的一些卡夫卡生产者选项似乎不受librdkafka支持。

这里是我使用的callback示例代码:

 from confluent_kafka import Producer def notifyme(err, msg): print err, msg.key(), msg.value() p = Producer({'bootstrap.servers': '127.0.0.1:9092', 'retry.backoff.ms' : 100, 'message.send.max.retries' : 20, "reconnect.backoff.jitter.ms" : 2000}) try: p.produce(topic='sometopic', value='this is data', on_delivery=notifyme) except Exception as e: print e p.flush() 

此外,检查经纪人的存在,你可能只是telnet到经纪人的IP端口(在这个例子中是9092)。 在Kafka集群使用的Zookeeper上,您可以检查/ brokers / id下的znodes的内容

这里是似乎为我工作的代码。 如果它看起来有点科学怪人,那么你是对的,这是! 如果有一个干净的解决scheme,我期待着看到它:

 import time import uuid from threading import Event from typing import Dict import confluent_kafka as kafka # pylint: disable=no-name-in-module from confluent_kafka.cimpl import KafkaError # more imports... LOG = # ... # Default number of times to retry connection to Kafka Broker _DEFAULT_RETRIES = 3 # Default time in seconds to wait between connection attempts _DEFAULT_RETRY_DELAY_S = 5.0 # Number of times to scan for an error after initiating the connection. It appears that calling # flush() once on a producer after construction isn't sufficient to catch the 'broker not available' # # error. At least twice seems to work. _NUM_ERROR_SCANS = 2 class JobProducer(object): def __init__(self, connection_retries: int=_DEFAULT_RETRIES, retry_delay_s: float=_DEFAULT_RETRY_DELAY_S) -> None: """ Constructs a producer. :param connection_retries: how many times to retry the connection before raising a RuntimeError. If 0, retry forever. :param retry_delay_s: how long to wait between retries in seconds. """ self.__error_event = Event() self._job_queue = JobQueue() self._producer = self.__wait_for_broker(connection_retries, retry_delay_s) self._topic = self._job_queue.topic def produce_job(self, job_definition: Dict) -> None: """ Produce a job definition on the queue :param job_definition: definition of the job to be executed """ value = ... # Conversion to JSON key = str(uuid.uuid4()) LOG.info('Produced message: %s', value) self.__error_event.clear() self._producer.produce(self._topic, value=value, key=key, on_delivery=self._on_delivery) self._producer.flush(self._job_queue.flush_timeout) @staticmethod def _on_delivery(error, message): if error: LOG.error('Failed to produce job %s, with error: %s', message.key(), error) def __create_producer(self) -> kafka.Producer: producer_properties = { 'bootstrap.servers': self._job_queue.bootstrap_server, 'error_cb': self.__on_error, 'client.id': self._job_queue.client_id, } return kafka.Producer(**producer_properties) def __wait_for_broker(self, retries: int, delay: float) -> kafka.Producer: retry_count = 0 while True: self.__error_event.clear() producer = self.__create_producer() # Need to call flush() several times with a delay between to ensure errors are caught. if not self.__error_event.is_set(): for _ in range(_NUM_ERROR_SCANS): producer.flush(0.1) if self.__error_event.is_set(): break time.sleep(0.1) else: # Success: no errors. return producer # If we get to here, the error callback was invoked. retry_count += 1 if retries == 0: msg = '({})'.format(retry_count) else: if retry_count <= retries: msg = '({}/{})'.format(retry_count, retries) else: raise RuntimeError('JobProducer timed out') LOG.warn('JobProducer: could not connect to broker, will retry %s', msg) time.sleep(delay) def __on_error(self, error: KafkaError) -> None: LOG.error('KafkaError: %s', error.str()) self.__error_event.set()