Thanks John!

I don't think transformValues will work here because I need to remove
records which already have manual data?

Either way it doesn't matter too much as I just write them straight to
kafka.

Thanks for your help!

On Mon, Jan 20, 2020 at 4:48 PM John Roesler <vvcep...@apache.org> wrote:

> Hi Yair,
>
> You should be fine!
>
> Merging does preserve copartitioning.
>
> Also processing on that partition is single-threaded, so you don’t have to
> worry about races on the same key in your transformer.
>
> Actually, you might want to use transformValues to inform Streams that you
> haven’t changed the key. Otherwise, it would need to repartition the result
> before you could do further stateful processing.
>
> I hope this helps!
>
> Thanks,
> John
>
> On Mon, Jan 20, 2020, at 05:27, Yair Halberstadt wrote:
> > Hi
> > I asked this question on stack-overflow and was wondering if anyone here
> > could answer it:
> >
> https://stackoverflow.com/questions/59820243/does-merging-two-kafka-streams-preserve-co-partitioning
> >
> >
> > I have 2 co-partitioned kafka topics. One contains automatically
> generated
> > data, and the other manual overrides.
> >
> > I want to merge them and filter out any automatically generated data that
> > has already been manually overidden, and then forward everything to a
> > combined ChangeLog topic.
> >
> > To do so I create a stream from each topic, and [merge the streams](
> >
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/KStream.html#merge-org.apache.kafka.streams.kstream.KStream-
> )
> > using the dsl API.
> >
> > I then apply the following transform, which stores any manual data, and
> > deletes any automatic data which has already been manually overidden:
> > (Scala but should be pretty easy to understand if you know java)
> >
> > ```scala
> > class FilterManuallyClassifiedTransformer(manualOverridesStoreName :
> String)
> >   extends Transformer[Long, Data, KeyValue[Long, Data]] {
> >
> >   // Array[Byte] used as dummy value since we don't use the value
> >   var store: KeyValueStore[Long, Array[Byte]] = _
> >
> >   override def init(context: ProcessorContext): Unit = {
> >     store = context.getStateStore(manualOverridesStoreName
> > ).asInstanceOf[KeyValueStore[Long, Array[Byte]]]
> >   }
> >
> >   override def close(): Unit = {}
> >
> >   override def transform(key: Long, value: Data): KeyValue[Long, Data] =
> {
> >     if (value.getIsManual) {
> >       store.put(key, Array.emptyByteArray)
> >       new KeyValue(key, value)
> >     }
> >     else if (store.get(key) == null) {
> >       new KeyValue(key, value)
> >     }
> >     else {
> >       null
> >     }
> >   }
> > }
> > ```
> >
> > If I understand correctly, there is no guarantee this will work unless
> > manual data and automatic data with the same key are in the same
> partition.
> > Otherwise the manual override might be stored in a different state store
> to
> > the one that the automatic data checks.
> >
> > And even if they are stored in the same StateStore there might be race
> > conditions, where an automatic data checks the state store, then the
> manual
> > override is added to the state store, then the manual override is written
> > to the output topic, then the automatic data is written to the output
> > topic, leading to the automatic data overwriting the manual override.
> >
> > Is that correct?
> >
> > And if so will `merge` preserve the co-partitioning guarantee I need?
> >
> > Thanks for your help
> >
>

Reply via email to