Thanks John! I don't think transformValues will work here because I need to remove records which already have manual data?
Either way it doesn't matter too much as I just write them straight to kafka. Thanks for your help! On Mon, Jan 20, 2020 at 4:48 PM John Roesler <vvcep...@apache.org> wrote: > Hi Yair, > > You should be fine! > > Merging does preserve copartitioning. > > Also processing on that partition is single-threaded, so you don’t have to > worry about races on the same key in your transformer. > > Actually, you might want to use transformValues to inform Streams that you > haven’t changed the key. Otherwise, it would need to repartition the result > before you could do further stateful processing. > > I hope this helps! > > Thanks, > John > > On Mon, Jan 20, 2020, at 05:27, Yair Halberstadt wrote: > > Hi > > I asked this question on stack-overflow and was wondering if anyone here > > could answer it: > > > https://stackoverflow.com/questions/59820243/does-merging-two-kafka-streams-preserve-co-partitioning > > > > > > I have 2 co-partitioned kafka topics. One contains automatically > generated > > data, and the other manual overrides. > > > > I want to merge them and filter out any automatically generated data that > > has already been manually overidden, and then forward everything to a > > combined ChangeLog topic. > > > > To do so I create a stream from each topic, and [merge the streams]( > > > https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/KStream.html#merge-org.apache.kafka.streams.kstream.KStream- > ) > > using the dsl API. > > > > I then apply the following transform, which stores any manual data, and > > deletes any automatic data which has already been manually overidden: > > (Scala but should be pretty easy to understand if you know java) > > > > ```scala > > class FilterManuallyClassifiedTransformer(manualOverridesStoreName : > String) > > extends Transformer[Long, Data, KeyValue[Long, Data]] { > > > > // Array[Byte] used as dummy value since we don't use the value > > var store: KeyValueStore[Long, Array[Byte]] = _ > > > > override def init(context: ProcessorContext): Unit = { > > store = context.getStateStore(manualOverridesStoreName > > ).asInstanceOf[KeyValueStore[Long, Array[Byte]]] > > } > > > > override def close(): Unit = {} > > > > override def transform(key: Long, value: Data): KeyValue[Long, Data] = > { > > if (value.getIsManual) { > > store.put(key, Array.emptyByteArray) > > new KeyValue(key, value) > > } > > else if (store.get(key) == null) { > > new KeyValue(key, value) > > } > > else { > > null > > } > > } > > } > > ``` > > > > If I understand correctly, there is no guarantee this will work unless > > manual data and automatic data with the same key are in the same > partition. > > Otherwise the manual override might be stored in a different state store > to > > the one that the automatic data checks. > > > > And even if they are stored in the same StateStore there might be race > > conditions, where an automatic data checks the state store, then the > manual > > override is added to the state store, then the manual override is written > > to the output topic, then the automatic data is written to the output > > topic, leading to the automatic data overwriting the manual override. > > > > Is that correct? > > > > And if so will `merge` preserve the co-partitioning guarantee I need? > > > > Thanks for your help > > >