Yes, LargeMessageId is globally unique, so I shouldn’t need composite key.
So both of you are suggesting I do the following InputStream (1) .keyBy (LargeMessageId) (2) .flatMap(new MyReassemblyFunction()) (3) .keyBy(MyKey) (4) .??? Let me explain my doubt (perhaps due to lack of understanding). By the way I am expecting to run this job with parallelism > 1. My understanding of above is as below: First operator (1): First KeyBy (LargeMessageId) will partition the input stream by LargeMessageId. Right here messages with same MyKey value will be spread across these partitions. Is it not a problem already? Second operator (2) : run flatMap(new MyReassemblyFunction()) on these partitions. Here each one will produce exactly one LargeMessage. Third operator (3): At this point I don’t understand the point of second KeyBy(MyKey)? My understanding is that this will further partition the already partitioned input stream (from 1 above) and will not help me, as I need to process all LargeMessages for a given MyKey in order. Is there an implicit assumption here that the flatMap operation (2) above will automatically join the partitioned sub-streams from first KeyBy into a single stream? Ajay From: Fabian Hueske <fhue...@gmail.com> Date: Monday, February 4, 2019 at 9:17 AM To: "Aggarwal, Ajay" <ajay.aggar...@netapp.com> Cc: Congxian Qiu <qcx978132...@gmail.com>, "user@flink.apache.org" <user@flink.apache.org> Subject: Re: Reverse of KeyBy Hi, Calling keyBy twice will not work, because the second call overrides the first. You can keyBy on a composite key (MyKey, LargeMessageId). You can do the following InputStream .keyBy ( (MyKey, LargeMessageId) ) \\ use composite key .flatMap(new MyReassemblyFunction()) .keyBy(MyKey) .??? If LargeMessageId is unique across MyKey (there are not two large messages with the same LargeMessageId and different MyKey values), you don't need a composite key but can use keyBy(LargeMessageId). Best, Fabian Am Mo., 4. Feb. 2019 um 15:05 Uhr schrieb Aggarwal, Ajay <ajay.aggar...@netapp.com<mailto:ajay.aggar...@netapp.com>>: Thank you for your suggestion. But per my understanding if I KeyBy (LargeMessageId) first then I can’t guarantee order of LargeMessages per MyKey. Because MyKey messages will get spread over multiple partitions by LargeMessageId. Am I correct? From: Congxian Qiu <qcx978132...@gmail.com<mailto:qcx978132...@gmail.com>> Date: Sunday, February 3, 2019 at 6:40 AM To: "Aggarwal, Ajay" <ajay.aggar...@netapp.com<mailto:ajay.aggar...@netapp.com>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Reverse of KeyBy Hi Aggarwal How about keyBy(LargeMessageID) first, then assemble these fragments back into LargeMessages, then keyBy(MeyKey)? Best, Congxian Aggarwal, Ajay <ajay.aggar...@netapp.com<mailto:ajay.aggar...@netapp.com>> 于2019年2月2日周六 上午5:42写道: I am new to Flink. I am trying to figure out if there is an operator that provides reverse functionality of KeyBy. Using KeyBy you can split a stream into disjoint partitions. Is there a way to bring those partitions back into a single stream? Let me explain using my use case below. My Input stream contains messages with following information { MyKey LargeMessageId LargeMessageFragment LargeMessageTimestamp // yes same timestamp repeated with each fragment (… there are other fields, but I am leaving them out as they are not important for this discussion) } My LargeMessage is fragmented at source into fragments. I have 2 main requirements 1. Reassemble these fragments back into LargeMessages 2. For each MyKey value, process the LargeMessages in the order based on time associated with them. So I am thinking InputStream .KeyBy (MyKey) .KeyBy (LargeMessageId) .flatMap(new MyReassemblyFunction()) . ??? At this point I need to throw all assembled LargeMessages for a given MyKey back into a common partition, so I can try to process them in order. This is where I am stuck. Any help from the experts will be much appreciated. Ajay