Apache kafka消费者对意外的消息进行无限循环轮询

我正在做卡夫卡消费者实施的集成testing。 我使用wurstmeister / kafka docker镜像和Apache Kafka客户端。 当我向某个主题发送“意外”消息时,嗡嗡的情景就是这样。 在运行模式下, kafkaConsumer.poll(POLLING_TIMEOUT)似乎进入无限循环。 当我debugging但是,当我暂停和运行时,它工作。

发送预期的消息时,我没有这个问题(不要在反序列化时抛出exception)。

这是我的kafka的docker docker-composeconfiguration:

 kafka: image: wurstmeister/kafka links: - zookeeper ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: localhost KAFKA_ADVERTISED_PORT: 9092 KAFKA_CREATE_TOPICS: "ProductLocation:1:1,ProductInformation:1:1,InventoryAvailableToSell:1:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /var/run/docker.sock:/var/run/docker.sock 

Java通用消费者:

 @Override public Collection<T> consume() { String eventToBePublishedName = ERROR_WHILE_RESETTING_OFFSET; boolean success = false; try { kafkaConsumer.resume(kafkaAssignments); if (isPollingTypeFull) { // dummy poll because its needed before resetting offset. // https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition kafkaConsumer.poll(POLLING_TIMEOUT); resetOffset(); } else if (!offsetGotResetFirstTime) { resetOffset(); offsetGotResetFirstTime = true; } eventToBePublishedName = ERROR_WHILE_POLLING; ConsumerRecords<Object, T> records; List<T> output = new ArrayList<>(); do { records = kafkaConsumer.poll(POLLING_TIMEOUT); records.forEach(cr -> { T val = cr.value(); if (val != null) { output.add(cr.value()); } }); } while (records.count() > 0); eventToBePublishedName = CONSUMING; success = true; kafkaConsumer.pause(kafkaAssignments); return output; } finally { applicationEventPublisher.publishEvent( new OperationResultApplicationEvent( this, OperationType.ConsumingOfMessages, eventToBePublishedName, success)); } } 

反序列化:

 public T deserialize(String topic, byte[] data) { try { JsonNode jsonNode = mapper.readTree(data); JavaType javaType = mapper.getTypeFactory().constructType(getValueClass()); JsonNode value = jsonNode.get("value"); return mapper.readValue(value.toString(), javaType); } catch (IllegalArgumentException | IOException | SerializationException e) { LOGGER.error("Can't deserialize data [" + Arrays.toString(data) + "] from topic [" + topic + "]", e); return null; } } 

在我的集成testing中,我通过发送一个带时间标记的主题名称来为每个testing创build一个主题。 这创造了新的话题,并使testing无状态。

这是我如何configuration卡夫卡消费者:

 Properties properties = new Properties(); properties.put("bootstrap.servers", kafkaConfiguration.getServer()); properties.put("group.id", kafkaConfiguration.getGroupId()); properties.put("key.deserializer", kafkaConfiguration.getKeyDeserializer().getName()); properties.put("value.deserializer", kafkaConfiguration.getValueDeserializer().getName()); 

抓住exception,并提高你承诺的偏移量+1跳过“毒丸”消息。

如果您遇到这种情况,请在使用后close消费者,或在使用后pause使用,然后resume开始使用。