val stream: KStream[String, Event] = eventStream.selectKey(new KeyValueMapper[Array[Byte], Event, String] { override def apply(key: Array[Byte], value: Event): String = { value.id } })
stream.to(eventTopic, Produced.`with`[String, Event](Serdes.String(), serdeEvent)) globalKTable match { case None => { globalKTable = Some(builder.globalTable[String, Event](eventTopic, Consumed.`with`[String, Event](Serdes.String(), serdeEvent), Materialized.as[String, Event, KeyValueStore[Bytes, Array[Byte]]]("eventStore"))) } case _ => () } >>What do you mean by "the records written formerly are not null" ? In the same job I expect the GlobalKTable to read from the topic 'eventTopic' which is filled with not-null records from the KStream 'stream' after being key-mapped. Then, this is the job routine and the above code implements it. So, consider that I package my application and I run it with empty environment (new topics): the globalKTable read a null record from the 'eventTopic' and my application breaks because the null record keep my deserializer failing (because since I know that I'm filling this topic with not-null records I won't consider null values). Then. if I try to run again my job (after the failure) without cleaning my environment (so on the same topics), my GlobalKTable is going to read the records as expected (so not-null records). 2018-05-18 0:04 GMT+02:00 Matthias J. Sax <matth...@confluent.io>: > >> So, are you implying that `GlobalKTable` can not consume records from a > >> topic populated at runtime by the same job, > >> but it needs instead a pre-populated topic either some other application > >> populating it? > > No, I did not indent to imply this. During runtime, GlobalKTable will > still consume data from its input topic and update itself accordingly. > > I am still unclear what you mean by > > > The problem is in the first run, where GlobalKTable reads null records > > What are `null records`? > > > (I > >>>> have a json serializer and it reads a record with null value and, of > >>>> course, the records written formerly are not null) > > What do you mean by "the records written formerly are not null" ? > > Records with `<key,null>` are so-called tombstone records and are > interpreted as deletes. Thus, if you have tomstones in the input topic, > it's by design that they delete data from the table. > > > In the next runs, > >>>> records are correctly read from topic and then saved in the store. > > You mean, you stop the application and restart it? This should not be > required. How long did you run the application in the first run? During > the first run, can you confirm that the app write date into the > GlobalKTable topic (I want to understand if the write or read path is > the issue). > > > -Matthias > > On 5/17/18 2:04 AM, Daniele Tria wrote: > > Hi Matthias, thanks for the reply. > > > >>> Can you elaborate? What is the behavior you expect? > > > > I want my GlobalKTable reading records from a topic A, that is populated > > within the same job > > (so I want my table changing at runitme). The producer of A, takes a > > key-mapped stream. > > Next I perform a left join between a KStream and the GlobalKTable on > > multiple partitions. > > > >>> On startup, GlobalKTable consumes the specified topic to the current > >>> end-offsets to pre-populate itself before any other processing starts. > > > > So, are you implying that `GlobalKTable` can not consume records from a > > topic populated at runtime by the same job, > > but it needs instead a pre-populated topic either some other application > > populating it? > > If so, I can not execute the next left join because globalKTable records > > must be consumed at runtime. > > > > Is there any possibility of using another method to obtain the expected > > result? > > > > Thank you, > > > > Daniele > > > > 2018-05-16 17:47 GMT+02:00 Matthias J. Sax <matth...@confluent.io>: > > > >> Should be ok to read the topic. I cannot spot any error in your > >> configs/program either. > >> > >> However, I am not entirely sure, if I understand the problem correctly. > >> > >>>> The problem is in the first run, where GlobalKTable reads null > records > >> (I > >>>> have a json serializer and it reads a record with null value and, of > >>>> course, the records written formerly are not null). > >> > >> Can you elaborate? What is the behavior you expect? > >> > >> On startup, GlobalKTable consumes the specified topic to the current > >> end-offsets to pre-populate itself before any other processing starts. > >> > >> > >> -Matthias > >> > >> On 5/16/18 6:30 AM, Daniele Tria wrote: > >>> Dear community, I'm struggling with an issue regarding `GlobalKTable`: > >> that > >>> is demanded to read from a topic A, where I add key-mapped records from > >>> another stream. This topic A is populated within the same application > by > >> ` > >>> stream.to` costruct with some events that I need to manipulate before > >> they > >>> can be sinked. So, The `GlobalKTable` has to materialize the records > read > >>> in its store. > >>> The problem is in the first run, where GlobalKTable reads null records > >> (I > >>> have a json serializer and it reads a record with null value and, of > >>> course, the records written formerly are not null). In the next runs, > >>> records are correctly read from topic and then saved in the store. > >>> Here is the snippet code: > >>> > >>> val streamProperties = { > >>> val settings = new Properties > >>> settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "myApp") > >>> settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092") > >>> settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > >>> Serdes.Bytes.getClass.getName) > >>> settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > >>> Serdes.Bytes.getClass.getName) > >>> settings.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") > >>> settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") > >>> settings.put(StreamsConfig.STATE_DIR_CONFIG, > >>> "in-memory-avg-store/myApp/global") > >>> settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > >>> StreamsConfig.AT_LEAST_ONCE) > >>> settings > >>> } > >>> > >>> var globalKTable: Option[GlobalKTable[String, Event]] = None > >>> > >>> val serdeEvent: Serde[Event] = JsonSerde[Event] > >>> val eventTopic = "eventTopicGlobalTable" > >>> > >>> val builder: StreamsBuilder = new StreamsBuilder() > >>> > >>> val stream: KStream[String, Event] = eventStream.selectKey(new > >>> KeyValueMapper[Array[Byte], Event, String] { > >>> override def apply(key: Array[Byte], value: Event): String = { > >>> value.id > >>> } > >>> }) > >>> > >>> stream.to(eventTopic, Produced.`with`[String, Event](Serdes.String(), > >>> serdeEvent)) > >>> > >>> globalKTable match { > >>> case None => { > >>> globalKTable = Some(builder.globalTable[String, Event](eventTopic, > >>> Consumed.`with`[String, Event](Serdes.String(), serdeEvent), > >>> Materialized.as[String, Event, KeyValueStore[Bytes, > >>> Array[Byte]]]("eventStore"))) > >>> } > >>> case _ => () > >>> } > >>> > >>> val streams: KafkaStreams = new KafkaStreams(builder.build(), > >> streamProperties) > >>> > >>> streams.start() > >>> > >>> > >>> So my question is why is this happening? Am I not allowed to read from > a > >>> sinked topic in the same run? Either, Is it something related to my > >>> configuration? > >>> > >>> Thank you so much for your work. > >>> > >> > >> > > > >