Hi, Calling keyBy twice will not work, because the second call overrides the first. You can keyBy on a composite key (MyKey, LargeMessageId).
You can do the following InputStream .keyBy ( (MyKey, LargeMessageId) ) \\ use composite key .flatMap(new MyReassemblyFunction()) .keyBy(MyKey) .??? If LargeMessageId is unique across MyKey (there are not two large messages with the same LargeMessageId and different MyKey values), you don't need a composite key but can use keyBy(LargeMessageId). Best, Fabian Am Mo., 4. Feb. 2019 um 15:05 Uhr schrieb Aggarwal, Ajay < ajay.aggar...@netapp.com>: > Thank you for your suggestion. But per my understanding if I KeyBy > (LargeMessageId) first then I can’t guarantee order of LargeMessages per > MyKey. Because MyKey messages will get spread over multiple partitions by > LargeMessageId. Am I correct? > > > > > > *From: *Congxian Qiu <qcx978132...@gmail.com> > *Date: *Sunday, February 3, 2019 at 6:40 AM > *To: *"Aggarwal, Ajay" <ajay.aggar...@netapp.com> > *Cc: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *Re: Reverse of KeyBy > > Hi Aggarwal > How about keyBy(LargeMessageID) first, then assemble these fragments > back into LargeMessages, then keyBy(MeyKey)? > > > > Best, > > Congxian > > > > > > Aggarwal, Ajay <ajay.aggar...@netapp.com> 于2019年2月2日周六 上午5:42写道: > > I am new to Flink. I am trying to figure out if there is an operator that > provides reverse functionality of KeyBy. Using KeyBy you can split a > stream into disjoint partitions. Is there a way to bring those partitions > back into a single stream? > > > > Let me explain using my use case below. > > > > My Input stream contains messages with following information > > { > > MyKey > > LargeMessageId > > LargeMessageFragment > > LargeMessageTimestamp // yes same timestamp repeated with each > fragment > > > > (… there are other fields, but I am leaving them out as they are not > important for this discussion) > > } > > > > > > My LargeMessage is fragmented at source into fragments. I have 2 main > requirements > > 1. Reassemble these fragments back into LargeMessages > 2. For each MyKey value, process the LargeMessages in the order based > on time associated with them. > > > > > > So I am thinking > > > > InputStream > > .KeyBy (MyKey) > > .KeyBy (LargeMessageId) > > .flatMap(new MyReassemblyFunction()) > > . ??? > > > > At this point I need to throw all assembled LargeMessages for a given > MyKey back into a common partition, so I can try to process them in order. > This is where I am stuck. Any help from the experts will be much > appreciated. > > > > Ajay > > > >