Hi All, I am trying to consume data from the existing kafka server.
*I am able to consume it successfully through python code*. But i want the same to be working with golang. i have tried sarama, confluent-kafka-go & github.com/segmentio/kafka-go. i am getting the below error: 2019/12/16 06:28:32 client/metadata fetching metadata for all topics from broker *.*.*.*:9093 2019/12/16 06:28:32 Failed to connect to broker *.*.*.*:9093: tls: first record does not look like a TLS handshake 2019/12/16 06:28:32 client/metadata got error from broker -1 while fetching metadata: tls: first record does not look like a TLS handshake 2019/12/16 06:28:32 client/metadata no available broker to send metadata request to 2019/12/16 06:28:32 client/brokers resurrecting 3 dead seed brokers 2019/12/16 06:28:32 Closing Client 2019/12/16 06:28:32 unable to create kafka client: "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)" package main // Run with: // go build examples/base-client/*.go // ./base-client import ( "crypto/tls" "crypto/x509" "io/ioutil" "log" "os" "os/signal" "sync" "github.com/Shopify/sarama" ) func main() { logger := log.New(os.Stderr, "", log.LstdFlags) sarama.Logger = logger tlsConfig, err := NewTLSConfig("bo-rsa.pem", "bo-rsa.key", "ca.pem") if err != nil { log.Fatal(err) } // This can be used on test server if domain does not match cert: // tlsConfig.InsecureSkipVerify = true consumerConfig := sarama.NewConfig() consumerConfig.Net.TLS.Enable = true consumerConfig.Net.TLS.Config = tlsConfig client, err := sarama.NewClient([]string{"*.*.*.*:9093","*.*.*.*:9093"," *.*.*.*:9093"}, consumerConfig) if err != nil { log.Fatalf("unable to create kafka client: %q", err) } consumer, err := sarama.NewConsumerFromClient(client) if err != nil { log.Fatal(err) } defer consumer.Close() consumerLoop(consumer, "rlcmData__ONOS__CTC31") } // NewTLSConfig generates a TLS configuration used to authenticate on server with // certificates. // Parameters are the three pem files path we need to authenticate: client cert, client key and CA cert. func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) { tlsConfig := tls.Config{} // Load client cert cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile) if err != nil { return &tlsConfig, err } tlsConfig.Certificates = []tls.Certificate{cert} tlsConfig.InsecureSkipVerify =true // Load CA cert caCert, err := ioutil.ReadFile(caCertFile) if err != nil { return &tlsConfig, err } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) tlsConfig.RootCAs = caCertPool tlsConfig.BuildNameToCertificate() return &tlsConfig, err } func consumerLoop(consumer sarama.Consumer, topic string) { partitions, err := consumer.Partitions(topic) if err != nil { log.Println("unable to fetch partition IDs for the topic", topic, err) return } // Trap SIGINT to trigger a shutdown. signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) var wg sync.WaitGroup for partition := range partitions { wg.Add(1) go func() { consumePartition(consumer, int32(partition), signals) wg.Done() }() } wg.Wait() } func consumePartition(consumer sarama.Consumer, partition int32, signals chan os.Signal) { log.Println("Receving on partition", partition) partitionConsumer, err := consumer.ConsumePartition("test", partition, sarama.OffsetNewest) if err != nil { log.Println(err) return } defer func() { if err := partitionConsumer.Close(); err != nil { log.Println(err) } }() consumed := 0 ConsumerLoop: for { select { case msg := <-partitionConsumer.Messages(): log.Printf("Consumed message offset %d\nData: %s\n", msg.Offset, msg.Value) consumed++ case <-signals: break ConsumerLoop } } log.Printf("Consumed: %d\n", consumed) } -- You received this message because you are subscribed to the Google Groups "golang-nuts" group. To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/a2c005d7-3dda-457c-8b5e-213e3b6a3aac%40googlegroups.com.