Yes, LargeMessageId is globally unique, so I shouldn’t need composite key.

So both of you are suggesting I do the following

InputStream
  (1)   .keyBy (LargeMessageId)
  (2)   .flatMap(new MyReassemblyFunction())
  (3)   .keyBy(MyKey)
  (4)   .???

Let me explain my doubt (perhaps due to lack of understanding). By the way I am 
expecting to run this job with parallelism > 1. My understanding of above is as 
below:

First operator (1): First KeyBy (LargeMessageId) will partition the input 
stream by LargeMessageId. Right here messages with same MyKey value will be 
spread across these partitions. Is it not a problem already?
Second operator (2) : run flatMap(new MyReassemblyFunction()) on these 
partitions. Here each one will produce exactly one LargeMessage.
Third operator (3):  At this point I don’t understand the point of second 
KeyBy(MyKey)? My understanding is that this will further partition the already 
partitioned input stream (from 1 above) and will not help me, as I need to 
process all LargeMessages for a given MyKey in order.

Is there an implicit assumption here that the flatMap operation (2) above will 
automatically join the partitioned sub-streams from first KeyBy into a single 
stream?

Ajay


From: Fabian Hueske <fhue...@gmail.com>
Date: Monday, February 4, 2019 at 9:17 AM
To: "Aggarwal, Ajay" <ajay.aggar...@netapp.com>
Cc: Congxian Qiu <qcx978132...@gmail.com>, "user@flink.apache.org" 
<user@flink.apache.org>
Subject: Re: Reverse of KeyBy

Hi,

Calling keyBy twice will not work, because the second call overrides the first.
You can keyBy on a composite key (MyKey, LargeMessageId).

You can do the following

InputStream
  .keyBy ( (MyKey, LargeMessageId) ) \\ use composite key
  .flatMap(new MyReassemblyFunction())
  .keyBy(MyKey)
  .???

If LargeMessageId is unique across MyKey (there are not two large messages with 
the same LargeMessageId and different MyKey values), you don't need a composite 
key but can use keyBy(LargeMessageId).

Best, Fabian


Am Mo., 4. Feb. 2019 um 15:05 Uhr schrieb Aggarwal, Ajay 
<ajay.aggar...@netapp.com<mailto:ajay.aggar...@netapp.com>>:
Thank you for your suggestion. But per my understanding if I KeyBy 
(LargeMessageId) first then I can’t guarantee order of LargeMessages per MyKey. 
Because MyKey messages will get spread over multiple partitions by 
LargeMessageId. Am I correct?


From: Congxian Qiu <qcx978132...@gmail.com<mailto:qcx978132...@gmail.com>>
Date: Sunday, February 3, 2019 at 6:40 AM
To: "Aggarwal, Ajay" <ajay.aggar...@netapp.com<mailto:ajay.aggar...@netapp.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Reverse of KeyBy
Hi Aggarwal
   How about keyBy(LargeMessageID) first, then assemble these fragments back 
into LargeMessages, then keyBy(MeyKey)?

Best,
Congxian


Aggarwal, Ajay <ajay.aggar...@netapp.com<mailto:ajay.aggar...@netapp.com>> 
于2019年2月2日周六 上午5:42写道:

I am new to Flink. I am trying to figure out if there is an operator that 
provides reverse functionality of KeyBy.  Using KeyBy you can split a stream 
into disjoint partitions. Is there a way to bring those partitions back into a 
single stream?



Let me explain using my use case below.



My Input stream contains messages with following information

{

    MyKey

    LargeMessageId

    LargeMessageFragment

    LargeMessageTimestamp // yes same timestamp repeated with each fragment



    (… there are other fields, but I am leaving them out as they are not 
important for this discussion)

}





My LargeMessage is fragmented at source into fragments. I have 2 main 
requirements

  1.  Reassemble these fragments back into LargeMessages
  2.  For each MyKey value, process the LargeMessages in the order based on 
time associated with them.





So I am thinking



InputStream

  .KeyBy (MyKey)

  .KeyBy (LargeMessageId)

  .flatMap(new MyReassemblyFunction())

  . ???



At this point I need to throw all assembled LargeMessages for a given MyKey 
back into a common partition, so I can try to process them in order.  This is 
where I am stuck. Any help from the experts will be much appreciated.



Ajay

Reply via email to