Hi all,
I'm struggling with a data transformation using Pyflink and the DataStream API. I'm open to using the Table API if that is more suitable. My pyflink job ingests three streams from different kafka topics using the kafka connector. These contain json messages, which I parse into a python dataclass I have defined in my pyflink job. This works OK. I do some filtering of fields and flatten some nested lists. I'm trying to get from the following 3 data streams of messages like this: C = {'c_id': 'c_1', … } M = {'m_id': 'm_1', 'c_id': 'c_1', … } O = {'o_id': 'o_1', 'm_id': 'm_1', … } To a structure like this Output = {'c_id': 'c_1', 'm_ids': ['m_1', 'm_4', … ], 'o_ids': ['o_1', 'o_6', … ] } Where C is a finite stream of messages each with a unique id c_id. M is a larger finite stream of messages each with a unique id m_id, and each one may match one of the c_id in the previous stream O is a continuous stream of messages with a unique id o_id, and each one may match one of the m_id in the previous stream. So the end result is a new object, one for each message in the original C stream, which stores the matching M and O messages and constantly updates with each new O message. This will eventually be a sink to kafka. So for example, with stateful non stream programming you could use a for loop: # Join C and M output = [] for c in C: out = {'c_id': c.c_id, 'm_ids': [], 'o_ids': []} for m in M: if m.m_id == c.c_id: out['m_ids'].append(m.m_id) output.append(out) # Join O for o in O: for out in output: if o.m_id in c.m_ids: out.o_ids.append(o.o_id) How is best to reproduce this transformation in pyflink? For the first transform of joining C and M, I have looked at using broadcast state to try and provide the stream C as a mapState to the stream M and then use a KeyedBroadcastProcessFunction to iterate over each value in C and add the m_id if matching. https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/fault-tolerance/broadcast_state/ I also looked at using CoFlatMapFunction to connect the two streams, but the problem was the shared state required the first stream to be processed entirely before the other. These don't feel stream native solutions to this problem. Would I be better off trying something like a window join? Is that definitely supported by the Python API? There aren't many examples online doing something similar, in particular with Python. I'm open to whatever is the easiest function for this kind of problem. Thank you for any help you can provide.