Should be ok to read the topic. I cannot spot any error in your configs/program either.
However, I am not entirely sure, if I understand the problem correctly. >> The problem is in the first run, where GlobalKTable reads null records (I >> have a json serializer and it reads a record with null value and, of >> course, the records written formerly are not null). Can you elaborate? What is the behavior you expect? On startup, GlobalKTable consumes the specified topic to the current end-offsets to pre-populate itself before any other processing starts. -Matthias On 5/16/18 6:30 AM, Daniele Tria wrote: > Dear community, I'm struggling with an issue regarding `GlobalKTable`: that > is demanded to read from a topic A, where I add key-mapped records from > another stream. This topic A is populated within the same application by ` > stream.to` costruct with some events that I need to manipulate before they > can be sinked. So, The `GlobalKTable` has to materialize the records read > in its store. > The problem is in the first run, where GlobalKTable reads null records (I > have a json serializer and it reads a record with null value and, of > course, the records written formerly are not null). In the next runs, > records are correctly read from topic and then saved in the store. > Here is the snippet code: > > val streamProperties = { > val settings = new Properties > settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "myApp") > settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") > settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.Bytes.getClass.getName) > settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.Bytes.getClass.getName) > settings.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") > settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") > settings.put(StreamsConfig.STATE_DIR_CONFIG, > "in-memory-avg-store/myApp/global") > settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.AT_LEAST_ONCE) > settings > } > > var globalKTable: Option[GlobalKTable[String, Event]] = None > > val serdeEvent: Serde[Event] = JsonSerde[Event] > val eventTopic = "eventTopicGlobalTable" > > val builder: StreamsBuilder = new StreamsBuilder() > > val stream: KStream[String, Event] = eventStream.selectKey(new > KeyValueMapper[Array[Byte], Event, String] { > override def apply(key: Array[Byte], value: Event): String = { > value.id > } > }) > > stream.to(eventTopic, Produced.`with`[String, Event](Serdes.String(), > serdeEvent)) > > globalKTable match { > case None => { > globalKTable = Some(builder.globalTable[String, Event](eventTopic, > Consumed.`with`[String, Event](Serdes.String(), serdeEvent), > Materialized.as[String, Event, KeyValueStore[Bytes, > Array[Byte]]]("eventStore"))) > } > case _ => () > } > > val streams: KafkaStreams = new KafkaStreams(builder.build(), > streamProperties) > > streams.start() > > > So my question is why is this happening? Am I not allowed to read from a > sinked topic in the same run? Either, Is it something related to my > configuration? > > Thank you so much for your work. >
signature.asc
Description: OpenPGP digital signature