使用Spark处理通过kafka进行的数据stream并使用Python进行编程

总而言之,我必须开始说它是一个旅程,这个问题可能需要你已经在我已经在哪里。 好吧,潜水

所以我做了一个docker容器,提供了所有我需要提交的作业来启动(安装在这个容器上),这是用python编写的。 我用监督员把他们踢出去,把控制台的输出连接到一个日志用于排除故障,基本上我想运行多个脚本来说明多个场景stream入一个单一的kafka话题。

我为什么要这样做? 我正在使用Telegraf来收集指标,并将来自多个实例的所有指标发送到名为“telegraf”的kafka中的单个主题。 我知道….它非常有创意。 此外,根据给定实例分割主题现在是不相关的,因为每个实例都有不同的度量标准,所以我仍然需要对它们进行不同的处理。

无论如何,给你一个例子,Telegraf收集我想要的一些docker统计,其中两个是“docker_container_cpu”和“docker_container_mem”。 现在,我现在的解决scheme是让我的脚本根据即将推出的指标以不同的方式处理指标数据。但是,我想要为每个帐户针对可能stream入卡夫卡的不同情况编写大量脚本。 所以一个docker_container_cpu的脚本和另一个docker_container_mem的脚本。 这两个都将听取卡夫卡的话题,既提交和同时运行的作业。

command=bash -c "spark-submit --jars $SPARK_HOME/jars/spark-streaming_2.11-$SPARK_VERSION.jar,$SPARK_HOME/jars/spark-streaming-kafka-0-8-assembly_2.11-$SPARK_VERSION.jar $SPARK_HOME/scripts/aggregateCPU.py localhost:32181 telegraf &> $SPARK_HOME/logs/cpu.log &" command=bash -c " sleep 30s; spark-submit --jars $SPARK_HOME/jars/spark-streaming_2.11-$SPARK_VERSION.jar,$SPARK_HOME/jars/spark-streaming-kafka-0-8-assembly_2.11-$SPARK_VERSION.jar $SPARK_HOME/scripts/aggregateMem.py localhost:32181 telegraf &> $SPARK_HOME/logs/mem.log &" 

那么你问的问题是什么?

这个…..

  bash-4.3# cat logs/mem.log 17/08/02 19:31:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/08/02 19:32:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 17/08/02 19:32:02 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042. 17/08/02 19:32:02 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043. 17/08/02 19:32:02 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044. 

每当我尝试提交这两个作业时,我都会收到这些错误。

所以…如果你想知道如何重现我的代码,这里是你会发现相关的代码片段。

 def setupKafkaStream(): sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount") ssc = StreamingContext(sc, 10) brokers, topic = sys.argv[1:] kvs = KafkaUtils.createStream(ssc, brokers, "spark-streaming-consumer", {topic: 1}) kvs.foreachRDD(handler) ssc.start() ssc.awaitTermination() return kafkaStream 

这两个脚本创build“Spark Streaming上下文”显然这是一个问题。 我只是不知道为什么。 我想利用火花集群处理的所有这些指标的力量,所以我的计划是有多个脚本订阅相关的主题,然后火花可以分裂工作,因为它认为合适。

我想知道我是否仅限于一个“Spark Streaming Context”,或者这个东西和我想象的一样强大。

PS:如果有火花维护者正在倾听,感谢你用scala重写整个库,现在让我们做一个更大的飞跃,并用python重写。 :)我们爱Python。 爪哇,无论多么大的整容(呃…斯卡拉),相比之下是缓慢的。