val stream: KStream[String, Event] = eventStream.selectKey(new
KeyValueMapper[Array[Byte], Event, String] {
  override def apply(key: Array[Byte], value: Event): String = {
    value.id
  }
})

stream.to(eventTopic, Produced.`with`[String, Event](Serdes.String(),
serdeEvent))

globalKTable match {
  case None => {
    globalKTable = Some(builder.globalTable[String, Event](eventTopic,
      Consumed.`with`[String, Event](Serdes.String(), serdeEvent),
      Materialized.as[String, Event, KeyValueStore[Bytes,
Array[Byte]]]("eventStore")))
  }
  case _ => ()
}

>>What do you mean by "the records written formerly are not null" ?

In the same job I expect the GlobalKTable to read from the topic
'eventTopic' which is filled with not-null records from the KStream
'stream' after being key-mapped.
Then, this is the job routine and the above code implements it.

So, consider that I package my application and I run it with empty
environment (new topics): the globalKTable read a null record from the
'eventTopic' and my application breaks because the null record keep my
deserializer failing (because since I know that I'm filling this topic with
not-null records I won't consider null values).
Then. if I try to run again my job (after the failure) without cleaning my
environment (so on the same topics), my GlobalKTable is going to read the
records as expected (so not-null records).




2018-05-18 0:04 GMT+02:00 Matthias J. Sax <matth...@confluent.io>:

> >> So, are you implying that `GlobalKTable` can not consume records from a
> >> topic populated at runtime by the same job,
> >> but it needs instead a pre-populated topic either some other application
> >> populating it?
>
> No, I did not indent to imply this. During runtime, GlobalKTable will
> still consume data from its input topic and update itself accordingly.
>
> I am still unclear what you mean by
>
> > The problem is in the first run,  where GlobalKTable reads null records
>
> What are `null records`?
>
> > (I
> >>>> have a json serializer and it reads a record with null value and, of
> >>>> course, the records written formerly are not null)
>
> What do you mean by "the records written formerly are not null" ?
>
> Records with `<key,null>` are so-called tombstone records and are
> interpreted as deletes. Thus, if you have tomstones in the input topic,
> it's by design that they delete data from the table.
>
> > In the next runs,
> >>>> records are correctly read from topic and then saved in the store.
>
> You mean, you stop the application and restart it? This should not be
> required. How long did you run the application in the first run? During
> the first run, can you confirm that the app write date into the
> GlobalKTable topic (I want to understand if the write or read path is
> the issue).
>
>
> -Matthias
>
> On 5/17/18 2:04 AM, Daniele Tria wrote:
> > Hi Matthias, thanks for the reply.
> >
> >>> Can you elaborate? What is the behavior you expect?
> >
> > I want my GlobalKTable reading records from a topic A, that is populated
> > within the same job
> > (so I want my table changing at runitme). The producer of A, takes a
> > key-mapped stream.
> > Next I perform a left join between a KStream and the GlobalKTable on
> > multiple partitions.
> >
> >>> On startup, GlobalKTable consumes the specified topic to the current
> >>> end-offsets to pre-populate itself before any other processing starts.
> >
> > So, are you implying that `GlobalKTable` can not consume records from a
> > topic populated at runtime by the same job,
> > but it needs instead a pre-populated topic either some other application
> > populating it?
> > If so, I can not execute the next left join because globalKTable records
> > must be consumed at runtime.
> >
> > Is there any possibility of using another method to obtain the expected
> > result?
> >
> > Thank you,
> >
> > Daniele
> >
> > 2018-05-16 17:47 GMT+02:00 Matthias J. Sax <matth...@confluent.io>:
> >
> >> Should be ok to read the topic. I cannot spot any error in your
> >> configs/program either.
> >>
> >> However, I am not entirely sure, if I understand the problem correctly.
> >>
> >>>> The problem is in the first run,  where GlobalKTable reads null
> records
> >> (I
> >>>> have a json serializer and it reads a record with null value and, of
> >>>> course, the records written formerly are not null).
> >>
> >> Can you elaborate? What is the behavior you expect?
> >>
> >> On startup, GlobalKTable consumes the specified topic to the current
> >> end-offsets to pre-populate itself before any other processing starts.
> >>
> >>
> >> -Matthias
> >>
> >> On 5/16/18 6:30 AM, Daniele Tria wrote:
> >>> Dear community, I'm struggling with an issue regarding `GlobalKTable`:
> >> that
> >>> is demanded to read from a topic A, where I add key-mapped records from
> >>> another stream. This topic A is populated within the same application
> by
> >> `
> >>> stream.to` costruct with some events that I need to manipulate before
> >> they
> >>> can be sinked. So, The `GlobalKTable` has to materialize the records
> read
> >>> in its store.
> >>> The problem is in the first run,  where GlobalKTable reads null records
> >> (I
> >>> have a json serializer and it reads a record with null value and, of
> >>> course, the records written formerly are not null). In the next runs,
> >>> records are correctly read from topic and then saved in the store.
> >>> Here is the snippet code:
> >>>
> >>> val streamProperties = {
> >>>   val settings = new Properties
> >>>   settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "myApp")
> >>>   settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092")
> >>>   settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> >>> Serdes.Bytes.getClass.getName)
> >>>   settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> >>> Serdes.Bytes.getClass.getName)
> >>>   settings.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
> >>>   settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
> >>>   settings.put(StreamsConfig.STATE_DIR_CONFIG,
> >>> "in-memory-avg-store/myApp/global")
> >>>   settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
> >>> StreamsConfig.AT_LEAST_ONCE)
> >>>   settings
> >>> }
> >>>
> >>> var globalKTable: Option[GlobalKTable[String, Event]] = None
> >>>
> >>> val serdeEvent: Serde[Event] = JsonSerde[Event]
> >>> val eventTopic = "eventTopicGlobalTable"
> >>>
> >>> val builder: StreamsBuilder = new StreamsBuilder()
> >>>
> >>> val stream: KStream[String, Event] = eventStream.selectKey(new
> >>> KeyValueMapper[Array[Byte], Event, String] {
> >>>   override def apply(key: Array[Byte], value: Event): String = {
> >>>     value.id
> >>>   }
> >>> })
> >>>
> >>> stream.to(eventTopic, Produced.`with`[String, Event](Serdes.String(),
> >>> serdeEvent))
> >>>
> >>> globalKTable match {
> >>>   case None => {
> >>>     globalKTable = Some(builder.globalTable[String, Event](eventTopic,
> >>>       Consumed.`with`[String, Event](Serdes.String(), serdeEvent),
> >>>       Materialized.as[String, Event, KeyValueStore[Bytes,
> >>> Array[Byte]]]("eventStore")))
> >>>   }
> >>>   case _ => ()
> >>> }
> >>>
> >>> val streams: KafkaStreams = new KafkaStreams(builder.build(),
> >> streamProperties)
> >>>
> >>> streams.start()
> >>>
> >>>
> >>> So my question is why is this happening? Am I not allowed to read from
> a
> >>> sinked topic in the same run? Either, Is it something related to my
> >>> configuration?
> >>>
> >>> Thank you so much for your work.
> >>>
> >>
> >>
> >
>
>

Reply via email to