Dear community, I'm struggling with an issue regarding `GlobalKTable`: that is demanded to read from a topic A, where I add key-mapped records from another stream. This topic A is populated within the same application by ` stream.to` costruct with some events that I need to manipulate before they can be sinked. So, The `GlobalKTable` has to materialize the records read in its store. The problem is in the first run, where GlobalKTable reads null records (I have a json serializer and it reads a record with null value and, of course, the records written formerly are not null). In the next runs, records are correctly read from topic and then saved in the store. Here is the snippet code:
val streamProperties = { val settings = new Properties settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "myApp") settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Bytes.getClass.getName) settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Bytes.getClass.getName) settings.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") settings.put(StreamsConfig.STATE_DIR_CONFIG, "in-memory-avg-store/myApp/global") settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE) settings } var globalKTable: Option[GlobalKTable[String, Event]] = None val serdeEvent: Serde[Event] = JsonSerde[Event] val eventTopic = "eventTopicGlobalTable" val builder: StreamsBuilder = new StreamsBuilder() val stream: KStream[String, Event] = eventStream.selectKey(new KeyValueMapper[Array[Byte], Event, String] { override def apply(key: Array[Byte], value: Event): String = { value.id } }) stream.to(eventTopic, Produced.`with`[String, Event](Serdes.String(), serdeEvent)) globalKTable match { case None => { globalKTable = Some(builder.globalTable[String, Event](eventTopic, Consumed.`with`[String, Event](Serdes.String(), serdeEvent), Materialized.as[String, Event, KeyValueStore[Bytes, Array[Byte]]]("eventStore"))) } case _ => () } val streams: KafkaStreams = new KafkaStreams(builder.build(), streamProperties) streams.start() So my question is why is this happening? Am I not allowed to read from a sinked topic in the same run? Either, Is it something related to my configuration? Thank you so much for your work.