当前位置: 动力学知识库 > 问答 > 编程问答 >

apache kafka - go sarama client can not consume the message

问题描述:

I use sarama client to produce and consume message ,but can not consume message normally.

the logs shows following:

[sarama]2016/07/27 20:37:00 Initializing new client

[sarama]2016/07/27 20:37:00 client/metadata fetching metadata for all topics from broker hx-kafka-broker-1.tuniu.org:9092

[sarama]2016/07/27 20:37:01 Connected to broker at hx-kafka-broker-1.tuniu.org:9092 (unregistered)

[sarama]2016/07/27 20:37:03 client/brokers registered new broker #2 at 10.10.190.219:9092

[sarama]2016/07/27 20:37:03 client/brokers registered new broker #3 at 10.10.190.231:9092

[sarama]2016/07/27 20:37:03 Successfully initialized new client

[sarama]2016/07/27 20:37:03 producer/broker/3 starting up

[sarama]2016/07/27 20:37:03 producer/broker/3 state change to [open] on topic.ops.falcon/1

[sarama]2016/07/27 20:37:03 Connected to broker at 10.10.190.231:9092 (registered as #3)

[sarama]2016/07/27 20:37:04 partition=1, offset=5

[sarama]2016/07/27 20:37:04 Initializing new client

[sarama]2016/07/27 20:37:04 ClientID is the default of 'sarama', you should consider setting it to something application-specific.

[sarama]2016/07/27 20:37:04 ClientID is the default of 'sarama', you should consider setting it to something application-specific.

[sarama]2016/07/27 20:37:04 client/metadata fetching metadata for all topics from broker hx-kafka-broker-1.tuniu.org:9092

[sarama]2016/07/27 20:37:04 Connected to broker at hx-kafka-broker-1.tuniu.org:9092 (unregistered)

[sarama]2016/07/27 20:37:05 client/brokers registered new broker #2 at 10.10.190.219:9092

[sarama]2016/07/27 20:37:05 client/brokers registered new broker #3 at 10.10.190.231:9092

[sarama]2016/07/27 20:37:05 Successfully initialized new client

[sarama]2016/07/27 20:37:05 ClientID is the default of 'sarama', you should consider setting it to something application-specific.

[sarama]2016/07/27 20:37:05 Connected to broker at 10.10.190.219:9092 (registered as #2)

[sarama]2016/07/27 20:37:05 consumer/broker/2 added subscription to topic.ops.falcon/0

[sarama]2016/07/27 20:37:05 ClientID is the default of 'sarama', you should consider setting it to something application-specific.

[sarama]2016/07/27 20:37:06 Connected to broker at 10.10.190.231:9092 (registered as #3)

[sarama]2016/07/27 20:37:06 consumer/broker/3 added subscription to topic.ops.falcon/1

code as following:

 package main

import (

"fmt"

"log"

"os"

"strings"

"sync"

"github.com/Shopify/sarama"

)

var (

wg sync.WaitGroup

logger = log.New(os.Stderr, "[sarama]", log.LstdFlags)

)

func main() {

//

sarama.Logger = logger

config := sarama.NewConfig()

config.ClientID = "saramaId"

config.Producer.RequiredAcks = sarama.WaitForAll

config.Producer.Partitioner = sarama.NewRandomPartitioner

msg := &sarama.ProducerMessage{}

msg.Topic = "topic.ops.falcon"

msg.Partition = int32(-1)

msg.Key = sarama.StringEncoder("key")

msg.Value = sarama.ByteEncoder("hello,world")

producer, err := sarama.NewSyncProducer(strings.Split("hx-kafka-broker-1.test.org:9092", ","), config)

if err != nil {

logger.Println("Failed to produce message: %s", err)

os.Exit(500)

}

defer producer.Close()

partition, offset, err := producer.SendMessage(msg)

if err != nil {

logger.Println("Failed to produce message: ", err)

}

logger.Printf("partition=%d, offset=%d\n", partition, offset)

//

// sarama.Logger = logger

consumer, err := sarama.NewConsumer(strings.Split("hx-kafka-broker-1.test.org:9092", ","), nil)

if err != nil {

logger.Println("Failed to start consumer:%s", err)

}

partitionList, err := consumer.Partitions("topic.ops.falcon")

if err != nil {

logger.Println("Failed to get the list of partitions: ", err)

}

for partition := range partitionList {

pc, err := consumer.ConsumePartition("topic.ops.falcon", int32(partition), sarama.OffsetNewest)

if err != nil {

logger.Printf("Failed to start consumer for partition %d: %s\n", partition, err)

}

defer pc.AsyncClose()

wg.Add(1)

// fmt.Println("abc")

go func(sarama.PartitionConsumer) {

defer wg.Done()

for msg := range pc.Messages() {

fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))

fmt.Println("123")

}

}(pc)

}

wg.Wait()

logger.Println("Done consuming topic hello")

consumer.Close()

}

分享给朋友:
您可能感兴趣的文章:
随机阅读: