>> So, are you implying that `GlobalKTable` can not consume records from a >> topic populated at runtime by the same job, >> but it needs instead a pre-populated topic either some other application >> populating it?
No, I did not indent to imply this. During runtime, GlobalKTable will still consume data from its input topic and update itself accordingly. I am still unclear what you mean by > The problem is in the first run, where GlobalKTable reads null records What are `null records`? > (I >>>> have a json serializer and it reads a record with null value and, of >>>> course, the records written formerly are not null) What do you mean by "the records written formerly are not null" ? Records with `<key,null>` are so-called tombstone records and are interpreted as deletes. Thus, if you have tomstones in the input topic, it's by design that they delete data from the table. > In the next runs, >>>> records are correctly read from topic and then saved in the store. You mean, you stop the application and restart it? This should not be required. How long did you run the application in the first run? During the first run, can you confirm that the app write date into the GlobalKTable topic (I want to understand if the write or read path is the issue). -Matthias On 5/17/18 2:04 AM, Daniele Tria wrote: > Hi Matthias, thanks for the reply. > >>> Can you elaborate? What is the behavior you expect? > > I want my GlobalKTable reading records from a topic A, that is populated > within the same job > (so I want my table changing at runitme). The producer of A, takes a > key-mapped stream. > Next I perform a left join between a KStream and the GlobalKTable on > multiple partitions. > >>> On startup, GlobalKTable consumes the specified topic to the current >>> end-offsets to pre-populate itself before any other processing starts. > > So, are you implying that `GlobalKTable` can not consume records from a > topic populated at runtime by the same job, > but it needs instead a pre-populated topic either some other application > populating it? > If so, I can not execute the next left join because globalKTable records > must be consumed at runtime. > > Is there any possibility of using another method to obtain the expected > result? > > Thank you, > > Daniele > > 2018-05-16 17:47 GMT+02:00 Matthias J. Sax <matth...@confluent.io>: > >> Should be ok to read the topic. I cannot spot any error in your >> configs/program either. >> >> However, I am not entirely sure, if I understand the problem correctly. >> >>>> The problem is in the first run, where GlobalKTable reads null records >> (I >>>> have a json serializer and it reads a record with null value and, of >>>> course, the records written formerly are not null). >> >> Can you elaborate? What is the behavior you expect? >> >> On startup, GlobalKTable consumes the specified topic to the current >> end-offsets to pre-populate itself before any other processing starts. >> >> >> -Matthias >> >> On 5/16/18 6:30 AM, Daniele Tria wrote: >>> Dear community, I'm struggling with an issue regarding `GlobalKTable`: >> that >>> is demanded to read from a topic A, where I add key-mapped records from >>> another stream. This topic A is populated within the same application by >> ` >>> stream.to` costruct with some events that I need to manipulate before >> they >>> can be sinked. So, The `GlobalKTable` has to materialize the records read >>> in its store. >>> The problem is in the first run, where GlobalKTable reads null records >> (I >>> have a json serializer and it reads a record with null value and, of >>> course, the records written formerly are not null). In the next runs, >>> records are correctly read from topic and then saved in the store. >>> Here is the snippet code: >>> >>> val streamProperties = { >>> val settings = new Properties >>> settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "myApp") >>> settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") >>> settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, >>> Serdes.Bytes.getClass.getName) >>> settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, >>> Serdes.Bytes.getClass.getName) >>> settings.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") >>> settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") >>> settings.put(StreamsConfig.STATE_DIR_CONFIG, >>> "in-memory-avg-store/myApp/global") >>> settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, >>> StreamsConfig.AT_LEAST_ONCE) >>> settings >>> } >>> >>> var globalKTable: Option[GlobalKTable[String, Event]] = None >>> >>> val serdeEvent: Serde[Event] = JsonSerde[Event] >>> val eventTopic = "eventTopicGlobalTable" >>> >>> val builder: StreamsBuilder = new StreamsBuilder() >>> >>> val stream: KStream[String, Event] = eventStream.selectKey(new >>> KeyValueMapper[Array[Byte], Event, String] { >>> override def apply(key: Array[Byte], value: Event): String = { >>> value.id >>> } >>> }) >>> >>> stream.to(eventTopic, Produced.`with`[String, Event](Serdes.String(), >>> serdeEvent)) >>> >>> globalKTable match { >>> case None => { >>> globalKTable = Some(builder.globalTable[String, Event](eventTopic, >>> Consumed.`with`[String, Event](Serdes.String(), serdeEvent), >>> Materialized.as[String, Event, KeyValueStore[Bytes, >>> Array[Byte]]]("eventStore"))) >>> } >>> case _ => () >>> } >>> >>> val streams: KafkaStreams = new KafkaStreams(builder.build(), >> streamProperties) >>> >>> streams.start() >>> >>> >>> So my question is why is this happening? Am I not allowed to read from a >>> sinked topic in the same run? Either, Is it something related to my >>> configuration? >>> >>> Thank you so much for your work. >>> >> >> >
signature.asc
Description: OpenPGP digital signature