泊坞窗容器与火花stream应用程序不能绑定到本地主机(容器)端口

我正在试着将我的火花stream应用程序包装起来,这个应用程序被称为水槽容器,它被浸入火花容器中。 所以我的水槽代理正沉入10000端口的火花容器,我的火花inputstream正在听localhost:10000的容器。

我使用docker-compose链接并运行容器,在flume容器日志上运行以下错误之后:

2017-10-27 04:51:46,819 (lifecycleSupervisor-1-1) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@45d84bfa counterGroup:{ name:null counters:{} } } - Exception follows. org.jboss.netty.channel.ChannelException: Failed to bind to: spark/172.18.0.4:10000 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:297) at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106) at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:119) at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:74) at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:68) at org.apache.spark.streaming.flume.sink.SparkSink.start(SparkSink.scala:90) at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46) at org.apache.flume.SinkRunner.start(SinkRunner.java:79) at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.BindException: Cannot assign requested address at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:433) at sun.nio.ch.Net.bind(Net.java:425) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.bind(NioServerSocketPipelineSink.java:140) at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleServerSocket(NioServerSocketPipelineSink.java:90) at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:64) at org.jboss.netty.channel.Channels.bind(Channels.java:569) at org.jboss.netty.channel.AbstractChannel.bind(AbstractChannel.java:189) at org.jboss.netty.bootstrap.ServerBootstrap$Binder.channelOpen(ServerBootstrap.java:342) at org.jboss.netty.channel.Channels.fireChannelOpen(Channels.java:170) at org.jboss.netty.channel.socket.nio.NioServerSocketChannel.<init>(NioServerSocketChannel.java:80) at org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory.newChannel(NioServerSocketChannelFactory.java:158) at org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory.newChannel(NioServerSocketChannelFactory.java:86) at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:276) 

并在火花容器上出现以下错误:

 Caused by: java.io.IOException: Error connecting to localhost/127.0.0.1:10000 at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:280) at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:206) at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:141) at org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:83) at org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:82) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at org.apache.spark.streaming.flume.FlumePollingReceiver.onStart(FlumePollingInputDStream.scala:82) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:607) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597) at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2028) at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2028) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.ConnectException: Connection refused: localhost/127.0.0.1:10000 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:150) at org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42) 

我的docke-compose文件:

 version: '3' services: nodeapp: build: . ports: - "3000:3000" depends_on: - redis - mongo spark: image: spark depends_on: - redis flume: image: flume environment: - FLUME_AGENT_NAME=agent - FLUME_CONF_FILE=/var/tmp/flume.conf depends_on: - redis - spark redis: image: redis mongo: image: mongo:3.4.9 

我的火花节目:

 import java.io.Serializable; import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.flume.FlumeUtils; import org.apache.spark.streaming.flume.SparkFlumeEvent; import redis.clients.jedis.Jedis; public class FlumeSinkCons implements Serializable { /** * */ private static final long serialVersionUID = 1L; public static void main(String args[]) throws InterruptedException { SparkConf conf = new SparkConf().setAppName("skelApp").setMaster("local[*]"); SparkContext ssc = new SparkContext(conf); JavaSparkContext sc = new JavaSparkContext(ssc); JavaStreamingContext jsc = new JavaStreamingContext(sc, Durations.milliseconds(1)); JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createPollingStream(jsc,"localhost",10000); JavaDStream<String> text = flumeStream.map(new Function<SparkFlumeEvent, String>() { private static final long serialVersionUID = -6662695075661748763L; public String call(SparkFlumeEvent arg0) throws Exception { // TODO Auto-generated method stub AvroFlumeEvent x = arg0.event(); String msg = new String(x.getBody().array()); App.pubredis(msg); return msg; } }); text.print(); jsc.start(); jsc.awaitTermination(); } }