在部署到Docker的情况下,在Golang中实现的Apache Kafka消费者恐慌

这里是我尝试实现一个简单的微服务,它应该读取来自kafka服务器的消息并通过HTTP发送。 它运行正常,当我从terminal运行它,但是当部署在泊坞窗上它恐慌

panic: runtime error: invalid memory address or nil pointer dereference [signal SIGSEGV: segmentation violation code=0x1 addr=0x40 pc=0x7b6345] goroutine 12 [running]: main.kafkaRoutine.func1(0xc420174060, 0x0, 0x0) /go/src/github.com/deathcore666/ProperConsumerServiceYo/kafka.go:36 +0x95 created by main.kafkaRoutine /go/src/github.com/deathcore666/ProperConsumerServiceYo/kafka.go:32 +0x1ad 

kafka.go第32行和第36行是go func(pc sarama.PartitionConsumer)函数的地方。 我对编程相对较新,所以任何帮助,将不胜感激。 谢谢!

main.go:

 func main() { var ( listen = flag.String("listen", ":8080", "HTTP listen address") proxy = flag.String("proxy", "", "Optional comma-separated list of URLs to proxy uppercase requests") ) flag.Parse() logger := log.NewLogfmtLogger(os.Stderr) var svc KafkaService svc = kafkaService{} svc = proxyingMiddleware(context.Background(), *proxy, logger)(svc) svc = loggingMiddleware(logger)(svc) consumehandler := httptransport.NewServer( makeConsumeEndpoint(svc), decodeConsumeRequest, encodeResponse, ) http.Handle("/consume", consumehandler) logger.Log("msg", "HTTP", "addr", *listen) logger.Log("err", http.ListenAndServe(*listen, nil))} 

service.go:

  package main import ( "context" "errors" "time" ) //KafkaService yolo type KafkaService interface { Consume(context.Context, string) (string, error) } //ErrEmpty yolo var ErrEmpty = errors.New("No topic provided") type kafkaService struct{} //Consumer logic implemented here func (kafkaService) Consume(_ context.Context, topic string) (string, error) { if topic == "" { return "", ErrEmpty } var inChan = make(chan string) var readyChan = make(chan struct{}) var result string var brokers = []string{"192.168.88.208:9092"} //var brokersLocal = []string{"localhost:9092"} go kafkaRoutine(inChan, topic, brokers) go func() { for { select { case msg := <-inChan: result = result + msg + "\n" case <-time.After(time.Second * 1): readyChan <- struct{}{} } } }() <-readyChan close(inChan) return result, nil } //ServiceMiddleware is a chainable thing for the service type ServiceMiddleware func(KafkaService) KafkaService 

kafka.go:

 package main import ( "fmt" "time" "github.com/Shopify/sarama" ) func kafkaRoutine(inChan chan string, topic string, brokers []string) { config := sarama.NewConfig() config.Consumer.Return.Errors = true consumer, err := sarama.NewConsumer(brokers, config) if err != nil { panic(err) } topics, _ := consumer.Topics() if !(containsTopic(topics, topic)) { inChan <- "There is no such a topic" fmt.Println("kafkaroutine exited") return } partitionList, err := consumer.Partitions(topic) for _, partition := range partitionList { pc, _ := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest) go func(pc sarama.PartitionConsumer) { loop: for { select { case msg := <-pc.Messages(): inChan <- string(msg.Value) case <-time.After(time.Second * 1): break loop } } }(pc) } fmt.Println("Kafka GoRoutine exited") } func containsTopic(topics []string, topic string) bool { for _, v := range topics { if topic == v { return true } } return false } 

在kafka.go的第27行,你忽略了从ConsumePartition()返回的错误。 这很可能是因为它返回了一个错误而不是一个有效的分区使用者,但是当你试图使用分区使用者时,它会忽略它,所以它会崩溃。