Tag: apache kafka

在AWS EBS microservice docker环境中使用Kafka,以避免丢失用户请求并处理更多并发点击

目前,我正在使用AWS EBS微服务docker环境来部署在Scala和Akka中编写的微服务。 如果微服务docker的任何人崩溃,并重新启动。 在这种情况下,我们将失去用户的请求,服务将不会返回这些情况下的任何回应。 我目前的架构可以处理高达1000个并发请求,没有任何问题。 为了避免这个问题,我打算使用Kafka存储和检索所有请求和响应。 所以我想使用Kafka来pipe理我的所有Web服务的请求和响应,并包含一个单独的服务或Web套接字来处理所有请求,并将响应再次存储到Kafka。 在这种情况下,如果我的核心进程泊坞窗崩溃或重新启动。 在任何时候都不会失去任何要求和回应。 它将再次开始阅读卡夫卡的请求并处理它。 所有的Web服务都会将请求存储在Kafka的相关主题中,并获得相关响应主题的响应并返回到API响应。 我发现以下库在Scala Web服务中使用Kafka。 https://github.com/akka/reactive-kafka/ 请检查附加的架构图,我将使用它来有效地处理来自客户端应用程序的大量并发请求。 这是一个好方法吗? 我需要改变我的架构中的任何东西吗? 在对Kafka和微服务端口进行了更多的研究之后,我创build了这个架构。 请让我知道这个架构是否有问题。

在部署到Docker的情况下,在Golang中实现的Apache Kafka消费者恐慌

这里是我尝试实现一个简单的微服务,它应该读取来自kafka服务器的消息并通过HTTP发送。 它运行正常,当我从terminal运行它,但是当部署在泊坞窗上它恐慌 panic: runtime error: invalid memory address or nil pointer dereference [signal SIGSEGV: segmentation violation code=0x1 addr=0x40 pc=0x7b6345] goroutine 12 [running]: main.kafkaRoutine.func1(0xc420174060, 0x0, 0x0) /go/src/github.com/deathcore666/ProperConsumerServiceYo/kafka.go:36 +0x95 created by main.kafkaRoutine /go/src/github.com/deathcore666/ProperConsumerServiceYo/kafka.go:32 +0x1ad kafka.go第32行和第36行是go func(pc sarama.PartitionConsumer)函数的地方。 我对编程相对较新,所以任何帮助,将不胜感激。 谢谢! main.go: func main() { var ( listen = flag.String("listen", ":8080", "HTTP listen address") proxy = flag.String("proxy", "", "Optional […]

为wurstmeister / kafka docker图像定义Zookeeper znode

我想将kafka znode设置为另一个znode而不是root。 例如kafka的默认znode是: /admin /brokers /cluster /config 但我想重新组织它们并将它们移动到kafka znode中 /kafka/admin /kafka/brokers /kafka/cluster /kafka/config docker-compose有没有默认zookeeper znode的属性? 谢谢

Kafkastream应用程序停止与org.apache.kafka.common.errors.TimeoutException

我已经在Dockercomposer php的帮助下安装了Kafka 1.0.0,并且我和两个经纪人成功地运行了这个Kafka。 我用分区手动创build了一个主题,并插入了事件。 现在我正在通过指向这个Kafka来运行带有1.0.0 Kafka Stream的应用程序。 在运行我的应用程序一段时间后,以下消息显示在日志中,并从运行停止。 除生产者request.timeout.ms外,其他所有configuration参数均为默认参数,生产者request.timeout.ms为120秒。 在下面的消息停止之前,我观察了几次'Trying to rejoin the consumer group now. org.apache.kafka.streams.errors.TaskMigratedException:' and 'Caused by: org.apache.kafka.clients.consumer.CommitFailedException:' 'Trying to rejoin the consumer group now. org.apache.kafka.streams.errors.TaskMigratedException:' and 'Caused by: org.apache.kafka.clients.consumer.CommitFailedException:'消息在日志中。 可能的原因是什么? 请帮帮我。 停止之前的消息: 2017-12-07 06:17:03,122 WARN oakcpiSender [kafka-producer-network-thread | sample-app-0.0.1-7f99fa3f-4487-48dc-af3f-9296ee513452-StreamThread-1-producer] [Producer clientId=sample-app-0.0.1-7f99fa3f-4487-48dc-af3f-9296ee513452-StreamThread-1-producer] Got error produce response with sample id 14099 on topic-partition […]

使用属性文件的kafka主题configuration

我想configuration一些主题和独特的保留设置Kafka。 所以当我启动Kafka时,它将使用这些设置加载server.properties文件。 我发现的唯一方法是启动Kafka,然后使用kafka-topics.sh脚本启动和configuration主题。 例: bin/kafka-topics.sh –zookeeper zk.yoursite.com –alter –topic as-access –config retention.ms=86400000 我在Docker上使用Kafka,所以我不需要将入口点设置为启动Kafka的实际脚本,而是需要创build自己的脚本来启动Kafka并运行一些shell命令来configuration这些主题。 另外,如果我需要自己创build这些主题,则需要开始创build一些关于已经存在的主题的逻辑。 我不想走那条路

无法连接在docker中运行的Kafka

我在docker compose yml文件中为广告的侦听器configuration了下面的configuration 我的docker-compose.yml version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:latest network_mode: host environment: ZOOKEEPER_CLIENT_PORT: 32181 ZOOKEEPER_TICK_TIME: 2000 extra_hosts: – "moby:127.0.0.1" kafka: image: confluentinc/cp-kafka:latest network_mode: host depends_on: – zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: localhost:32181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://lnc52c9:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 extra_hosts: – "moby:127.0.0.1" 我使用下面的属性连接这个kafka props.put("bootstrap.servers", "lnc52c9:29092"); 当我尝试在该服务器中产生消息时,我收到下面的错误 警告o.apache.kafka.clients.NetworkClient – 使用关联ID 1获取元数据时出错:{foo = LEADER_NOT_AVAILABLE} 主机可以从我的机器ping通,因为它们都在同一个networking中,没有防火墙问题。 当我在docker容器中使用命令hostname ,我可以看到相同的主机名。 我错过了什么吗?

stream浪者箱子和docker提供者:巨人和Kafka

最近我被引入了Docker容器的概念,发现自己正在寻找一个能够与Kafka一起运行Titan的应用程序。 我似乎无法find我需要在Docker Hub中,我不知道是否有一个可行的选项,有一个Vagrant框(VirtualBox),其中手动安装Kafka(根据安装步骤)和一个Docker镜像泰坦( https://hub.docker.com/r/elubow/titan-gremlin/ )从VagrantFileconfiguration站了起来。 你有什么看法? 那里有更好的select吗? 我的最终范围是写一个卡夫卡消费者样本所消耗的一些样本消息给泰坦。 这是一个合理的build立实验呢? 谢谢, 问候, ILARIA

Logstash无法使用Kafka接收数据

我正在使用从https://hub.docker.com/r/sebp/elk/的Docker ELK容器,根据logstash日志一切都很好,正在运行。 现在我尝试从Kafka接收数据,并使用以下configuration写入ES: input { kafka { topic_id => "collectortopic" zk_connect => "172.17.0.2:2181" type => "kafka-input" } } output { elasticsearch { hosts => ["172.17.0.4:9200"] codec => json } stdout { codec => json } } 但是,我的设置中的任何东西似乎都是错误的,因为我无法收到任何数据。 在控制台中没有Logstash输出,也没有Kibana中的任何数据,因为没有创buildlogstash索引,根据插件文档,这应该是默认的行为。 我的卡夫卡设置是好的,因为我可以发送和接收数据与其他应用程序,zk_connect也是正确的,否则我得到例外… 有任何想法吗? 提前致谢! PS:Kafka 0.9.1,Logstash 2.3.3,ES 2.3.3

无法连接到发现卡夫卡容器,基本连接问题

与docker和kafka磕磕绊绊的基础,不能得到客户端连接 到目前为止我所做的 1)在Windows 10上安装Docker窗口。2)打开kitematic,并searchkafka,并select了spotify kafka(wurstmeister图像未启动)。 3)容器启动,我可以看到图像在容器日志中运行。 4)IP和端口报告docker端口9092 – 和访问端口为本地主机:32768 docker ps show this 7bf9f9278e64 spotify / kafka:latest“supervisord -n”2小时前Up 57分钟0.0.0.0:32769->2181/tcp,0.0.0.0:32768->9092/tcp kafka docker-machine active,返回没有活动的主机 我的groovy类(从一个例子中剪切粘贴类似这样的连接 class KafkaProducer { String topicName = "wills topic" Producer<String, String> producer def init () { Properties props = new Properties() props.put("bootstrap.servers", "192.168.1.89:32768" ) //Assign localhost id and external port (9092 int) props.put("acks", […]

使用Kafka代理的Dockerized Spring Cloud Stream服务无法连接到Zookeeper

我正在使用一个源和一个接收器服务testing一个示例Spring云stream应用程序(运行在Ubuntu Linux机器上)。 我的所有服务都是docker集装箱,我想用卡夫卡作为邮件经纪人。 在docker-compose.yml的相关部分下面: zookeeper: image: confluent/zookeeper container_name: zookeeper ports: – "2181:2181" kafka: image: wurstmeister/kafka:0.9.0.0-1 container_name: kafka ports: – "9092:9092" links: – zookeeper:zk environment: – KAFKA_ADVERTISED_HOST_NAME=192.168.33.101 – KAFKA_ADVERTISED_PORT=9092 – KAFKA_DELETE_TOPIC_ENABLE=true – KAFKA_LOG_RETENTION_HOURS=1 – KAFKA_MESSAGE_MAX_BYTES=10000000 – KAFKA_REPLICA_FETCH_MAX_BYTES=10000000 – KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS=60000 – KAFKA_NUM_PARTITIONS=2 – KAFKA_DELETE_RETENTION_MS=1000 . . . # not shown: eureka service registry, spring cloud config service, […]