Interesting. Well, i'd say open issue in Sarama's github, probably you'll find answers/ideas faster.
On Wed, Jul 25, 2018 at 7:51 PM Craig Ching <craigch...@gmail.com> wrote: > This didn’t fix my problem unfortunately. Both time stamps are 0. > > > > On Jul 24, 2018, at 15:22, Craig Ching <craigch...@gmail.com> wrote: > > > > Hey, thanks for that Dmitriy! I'll have a look. > > > >> On Tue, Jul 24, 2018 at 11:18 AM Dmitriy Vsekhvalnov < > dvsekhval...@gmail.com> wrote: > >> Not really associated with Sarama. > >> > >> But your issues sounds pretty much same i faced some time ago and fixed, > >> here it is: https://github.com/Shopify/sarama/issues/885 > >> > >> Try using msg.BlockTimestamp instead of msg.Timestamp and see if it > helps. > >> > >> On Tue, Jul 24, 2018 at 3:26 AM Craig Ching <craigch...@gmail.com> > wrote: > >> > >> > Hi Dmitry, > >> > > >> > Are you associated with the Sarama project? If so, understand that > part of > >> > what I want is to learn about Sarama and the Kafka message format ;) > >> > > >> > The problem I'm having is that if I turn on: > >> > > >> > log.message.timestamp.type=LogAppendTime > >> > > >> > in the broker, then produce on topic1 with console producer, I will > see > >> > timestamps in the sarama client. If I produce on topic2 with telegraf > >> > (incidentally, I think telegraf is a sarama producer), then I don't > see > >> > timestamps in the sarama client. In both cases, if I consume using > the > >> > console consumer (with --property print.timestamp=true) I *do* see > >> > timestamps. > >> > > >> > I'm happy to debug this issue myself and submit a PR to sarama, but I > am > >> > missing some fundamentals of how to decode the kafka message format > and > >> > would really like some pointers. > >> > > >> > Cheers, > >> > Craig > >> > > >> > P.S. Here is the sarama code I'm using to test: > >> > > >> > package main > >> > > >> > import ( > >> > "fmt" > >> > "log" > >> > "os" > >> > "os/signal" > >> > "time" > >> > > >> > "github.com/Shopify/sarama" > >> > ) > >> > > >> > func main() { > >> > > >> > // Initialize Sarama logging > >> > sarama.Logger = log.New(os.Stdout, "[Sarama] ", > >> > log.Ldate|log.Lmicroseconds|log.Lshortfile) > >> > > >> > signals := make(chan os.Signal, 1) > >> > signal.Notify(signals, os.Interrupt) > >> > > >> > config := sarama.NewConfig() > >> > config.Consumer.Return.Errors = true > >> > config.ClientID = "consumer-test" > >> > config.Metadata.RefreshFrequency = time.Duration(5) * time.Minute > >> > config.Metadata.Full = true > >> > // config.Version = sarama.V0_10_0_0 > >> > config.Version = sarama.V1_1_0_0 > >> > // config.Version = sarama.V0_10_2_1 > >> > config.Consumer.Offsets.Initial = sarama.OffsetOldest > >> > > >> > brokers := []string{"localhost:9092"} > >> > // brokers := > >> > > >> > > []string{"measurement-kafka-broker.service.tgt-pe-prod-ttc.consul.c-prod.ost.cloud.target.internal:9092"} > >> > > >> > client, err := sarama.NewConsumer(brokers, config) > >> > if err != nil { > >> > panic(err) > >> > } > >> > > >> > // topic := "topic1" > >> > topic := "topic2" > >> > // topic := "metric-influx-measurement" > >> > // How to decide partition, is it fixed value...? > >> > consumer, err := client.ConsumePartition(topic, 0, > sarama.OffsetOldest) > >> > if err != nil { > >> > panic(err) > >> > } > >> > > >> > defer func() { > >> > if err := client.Close(); err != nil { > >> > panic(err) > >> > } > >> > }() > >> > > >> > // Count how many message processed > >> > msgCount := 0 > >> > > >> > go func() { > >> > for { > >> > select { > >> > case err := <-consumer.Errors(): > >> > fmt.Println(err) > >> > case msg := <-consumer.Messages(): > >> > msgCount++ > >> > fmt.Println(msg.Timestamp) > >> > fmt.Println("Received messages", string(msg.Key), string(msg.Value)) > >> > case <-signals: > >> > fmt.Println("Interrupt is detected") > >> > break > >> > } > >> > } > >> > }() > >> > <-signals > >> > } > >> > > >> > > >> > On Mon, Jul 23, 2018 at 10:43 AM Dmitriy Vsekhvalnov < > >> > dvsekhval...@gmail.com> > >> > wrote: > >> > > >> > > Hey Craig, > >> > > > >> > > what exact problem you have with Sarama client? > >> > > > >> > > On Mon, Jul 23, 2018 at 5:11 PM Craig Ching <craigch...@gmail.com> > >> > wrote: > >> > > > >> > > > Hi! > >> > > > > >> > > > I'm working on debugging a problem with how message timestamps are > >> > > handled > >> > > > in the sarama client. In some cases, the sarama client won't > >> > associate a > >> > > > timestamp with a message while the kafka console consumer does. > I've > >> > > found > >> > > > the documentation on the message format here: > >> > > > > >> > > > https://kafka.apache.org/documentation/#messageformat > >> > > > > >> > > > But the information there is very sparse. For instance, what are > >> > > > 'firstTimestamp' and 'maxTimestamp'? It seems that when I'm > debugging > >> > > > sarama, firstTimestamp is set to -1 and maxTimestamp appears to > be the > >> > > > timestamp I want. Is there some state about the message that I > need to > >> > > > understand in order to have maxTimestamp be used? Any further > >> > > > documentation or guidance on this would be very helpful! > >> > > > > >> > > > On another note, I am trying to debug this through the scala/java > >> > console > >> > > > consumer, but I'm having a hard time getting IntelliJ setup. Is > there > >> > > > anything special or documentation I need to set this up for > debugging? > >> > > > > >> > > > >> > >