Dear community, I'm struggling with an issue regarding `GlobalKTable`: that
is demanded to read from a topic A, where I add key-mapped records from
another stream. This topic A is populated within the same application by `
stream.to` costruct with some events that I need to manipulate before they
can be sinked. So, The `GlobalKTable` has to materialize the records read
in its store.
The problem is in the first run,  where GlobalKTable reads null records (I
have a json serializer and it reads a record with null value and, of
course, the records written formerly are not null). In the next runs,
records are correctly read from topic and then saved in the store.
Here is the snippet code:

val streamProperties = {
  val settings = new Properties
  settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "myApp")
  settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.Bytes.getClass.getName)
  settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.Bytes.getClass.getName)
  settings.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
  settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
  settings.put(StreamsConfig.STATE_DIR_CONFIG,
"in-memory-avg-store/myApp/global")
  settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.AT_LEAST_ONCE)
  settings
}

var globalKTable: Option[GlobalKTable[String, Event]] = None

val serdeEvent: Serde[Event] = JsonSerde[Event]
val eventTopic = "eventTopicGlobalTable"

val builder: StreamsBuilder = new StreamsBuilder()

val stream: KStream[String, Event] = eventStream.selectKey(new
KeyValueMapper[Array[Byte], Event, String] {
  override def apply(key: Array[Byte], value: Event): String = {
    value.id
  }
})

stream.to(eventTopic, Produced.`with`[String, Event](Serdes.String(),
serdeEvent))

globalKTable match {
  case None => {
    globalKTable = Some(builder.globalTable[String, Event](eventTopic,
      Consumed.`with`[String, Event](Serdes.String(), serdeEvent),
      Materialized.as[String, Event, KeyValueStore[Bytes,
Array[Byte]]]("eventStore")))
  }
  case _ => ()
}

val streams: KafkaStreams = new KafkaStreams(builder.build(), streamProperties)

streams.start()


So my question is why is this happening? Am I not allowed to read from a
sinked topic in the same run? Either, Is it something related to my
configuration?

Thank you so much for your work.

Reply via email to