Hi,

Calling keyBy twice will not work, because the second call overrides the
first.
You can keyBy on a composite key (MyKey, LargeMessageId).

You can do the following

InputStream
  .keyBy ( (MyKey, LargeMessageId) ) \\ use composite key
  .flatMap(new MyReassemblyFunction())
  .keyBy(MyKey)
  .???

If LargeMessageId is unique across MyKey (there are not two large messages
with the same LargeMessageId and different MyKey values), you don't need a
composite key but can use keyBy(LargeMessageId).

Best, Fabian


Am Mo., 4. Feb. 2019 um 15:05 Uhr schrieb Aggarwal, Ajay <
ajay.aggar...@netapp.com>:

> Thank you for your suggestion. But per my understanding if I KeyBy
> (LargeMessageId) first then I can’t guarantee order of LargeMessages per
> MyKey. Because MyKey messages will get spread over multiple partitions by
> LargeMessageId. Am I correct?
>
>
>
>
>
> *From: *Congxian Qiu <qcx978132...@gmail.com>
> *Date: *Sunday, February 3, 2019 at 6:40 AM
> *To: *"Aggarwal, Ajay" <ajay.aggar...@netapp.com>
> *Cc: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: Reverse of KeyBy
>
> Hi Aggarwal
>    How about keyBy(LargeMessageID) first, then assemble these fragments
> back into LargeMessages, then keyBy(MeyKey)?
>
>
>
> Best,
>
> Congxian
>
>
>
>
>
> Aggarwal, Ajay <ajay.aggar...@netapp.com> 于2019年2月2日周六 上午5:42写道:
>
> I am new to Flink. I am trying to figure out if there is an operator that
> provides reverse functionality of KeyBy.  Using KeyBy you can split a
> stream into disjoint partitions. Is there a way to bring those partitions
> back into a single stream?
>
>
>
> Let me explain using my use case below.
>
>
>
> My Input stream contains messages with following information
>
> {
>
>     MyKey
>
>     LargeMessageId
>
>     LargeMessageFragment
>
>     LargeMessageTimestamp // yes same timestamp repeated with each
> fragment
>
>
>
>     (… there are other fields, but I am leaving them out as they are not
> important for this discussion)
>
> }
>
>
>
>
>
> My LargeMessage is fragmented at source into fragments. I have 2 main
> requirements
>
>    1. Reassemble these fragments back into LargeMessages
>    2. For each MyKey value, process the LargeMessages in the order based
>    on time associated with them.
>
>
>
>
>
> So I am thinking
>
>
>
> InputStream
>
>   .KeyBy (MyKey)
>
>   .KeyBy (LargeMessageId)
>
>   .flatMap(new MyReassemblyFunction())
>
>   . ???
>
>
>
> At this point I need to throw all assembled LargeMessages for a given
> MyKey back into a common partition, so I can try to process them in order.
> This is where I am stuck. Any help from the experts will be much
> appreciated.
>
>
>
> Ajay
>
>
>
>

Reply via email to