Should be ok to read the topic. I cannot spot any error in your configs/program either.
However, I am not entirely sure, if I understand the problem correctly.
>> The problem is in the first run, where GlobalKTable reads null records (I
>> have a json serializer and it reads a record with null value and, of
>> course, the records written formerly are not null).
Can you elaborate? What is the behavior you expect?
On startup, GlobalKTable consumes the specified topic to the current
end-offsets to pre-populate itself before any other processing starts.
-Matthias
On 5/16/18 6:30 AM, Daniele Tria wrote:
> Dear community, I'm struggling with an issue regarding `GlobalKTable`: that
> is demanded to read from a topic A, where I add key-mapped records from
> another stream. This topic A is populated within the same application by `
> stream.to` costruct with some events that I need to manipulate before they
> can be sinked. So, The `GlobalKTable` has to materialize the records read
> in its store.
> The problem is in the first run, where GlobalKTable reads null records (I
> have a json serializer and it reads a record with null value and, of
> course, the records written formerly are not null). In the next runs,
> records are correctly read from topic and then saved in the store.
> Here is the snippet code:
>
> val streamProperties = {
> val settings = new Properties
> settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "myApp")
> settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> Serdes.Bytes.getClass.getName)
> settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> Serdes.Bytes.getClass.getName)
> settings.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
> settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
> settings.put(StreamsConfig.STATE_DIR_CONFIG,
> "in-memory-avg-store/myApp/global")
> settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
> StreamsConfig.AT_LEAST_ONCE)
> settings
> }
>
> var globalKTable: Option[GlobalKTable[String, Event]] = None
>
> val serdeEvent: Serde[Event] = JsonSerde[Event]
> val eventTopic = "eventTopicGlobalTable"
>
> val builder: StreamsBuilder = new StreamsBuilder()
>
> val stream: KStream[String, Event] = eventStream.selectKey(new
> KeyValueMapper[Array[Byte], Event, String] {
> override def apply(key: Array[Byte], value: Event): String = {
> value.id
> }
> })
>
> stream.to(eventTopic, Produced.`with`[String, Event](Serdes.String(),
> serdeEvent))
>
> globalKTable match {
> case None => {
> globalKTable = Some(builder.globalTable[String, Event](eventTopic,
> Consumed.`with`[String, Event](Serdes.String(), serdeEvent),
> Materialized.as[String, Event, KeyValueStore[Bytes,
> Array[Byte]]]("eventStore")))
> }
> case _ => ()
> }
>
> val streams: KafkaStreams = new KafkaStreams(builder.build(),
> streamProperties)
>
> streams.start()
>
>
> So my question is why is this happening? Am I not allowed to read from a
> sinked topic in the same run? Either, Is it something related to my
> configuration?
>
> Thank you so much for your work.
>
signature.asc
Description: OpenPGP digital signature
