Hi Martin, Let me understand your question first. You have two Stream: Data Stream and Control Stream and you want to select data in Data Stream based on the key set got from Control Stream.
If I were not misunderstanding your question, I think SideInput is what you want. https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamAPI-StoringSide-InputData It lets you to define one stream as a SideInput and can be assigned to the other stream, then the data in SideInput stream will be broadcasted. So far, I have no idea if there is any solution to solve this without SideInput. Best, Tony Wei 2017-08-31 16:10 GMT+08:00 Martin Eden <martineden...@gmail.com>: > Hi all, > > I am trying to implement the following using Flink: > > I have 2 input message streams: > > 1. Data Stream: > KEY VALUE TIME > . > . > . > C V6 6 > B V6 6 > A V5 5 > A V4 4 > C V3 3 > A V3 3 > B V3 3 > B V2 2 > A V1 1 > > 2. Control Stream: > Lambda ArgumentKeys TIME > . > . > . > f2 [A, C] 4 > f1 [A, B, C] 1 > > I want to apply the lambdas coming in the control stream to the selection > of keys that are coming in the data stream. > > Since we have 2 streams I naturally thought of connecting them using > .connect. For this I need to key both of them by a certain criteria. And > here lies the problem, how can I make sure the messages with keys A,B,C > specified in the control stream end up in the same task as well as the > control message (f1, [A, B, C]) itself. Basically I don't know how to key > by to achieve this. > > I suspect a custom partitioner is required that partitions the data stream > based on the messages in the control stream? Is this even possible? > > Any suggestions welcomed! > > Thanks, > M > > > > > > >