Hi,

If you are using checkpoints, I think a simple way is using a ListState to
store all coming records.
And in endInput(), drain all records from ListState to a sorter to sort all
records.

Best,
Jingsong Lee

On Fri, Jan 31, 2020 at 3:10 PM Łukasz Jędrzejewski <l...@touk.pl> wrote:

> Hi,
>
> Thank you very much for suggestions. I will check out the
> UnilateralSortMerge. However in our case we are using checkpoints.
>
> Kind regards,
> Łukasz
> W dniu 31.01.2020 o 07:54, Jingsong Li pisze:
>
> Hi Łukasz,
>
> First, we are planning to design and implement the BoundedStream story,
> which will be discussed further in 1.11 or 1.12.
>
> SortedMapState was discussed in FLINK-6219 [1], But there are some
> problems that can not be solved well, so they have not been introduced.
>
> If it is a pure BoundedStream without checkpoints, it is not recommended
> to use state, because state is usually used for checkpoints, which will
> cause more overhead.
>
> SortOperator is introduced for table BaseRow, I recommend that you use the
> UnilateralSortMerger to construct your own SortOperator.
>
> [1] https://issues.apache.org/jira/browse/FLINK-6219
>
> Best,
> Jingsong Lee
>
> On Fri, Jan 31, 2020 at 2:08 AM Łukasz Jędrzejewski <l...@touk.pl> wrote:
>
>> Hi all,
>>
>> In Flink 1.9 couple of changes was introduced to deal with bounded
>> streams e.g.  BoundedOneInput interface. I'm wondering would it be
>> doable to do some kind of global sort after receiving end input event on
>> finished data stream source, using only DataStream API?
>>
>> We have made some experiments with BoundedOneInput - buffering elements
>> and then sorting them after receiving the end input event and finally
>> emitting sorted elements. it is seems to be working as excepted though
>> we are having troubles to sort a big stream efficiently. One problem is
>> missing appropriate state type something like SortedMapState. While
>> using MapState the elements are inserted into a kind of byte order. I
>> think it could be possible to do some key modification to achieve
>> correct bytes order but it's not trivial for every type (string, int,
>> tuples, and so on). Do you plan adding such kind of sorted state?
>>
>> In Flink Table API there is SortOperator but it is restricted to
>> BinaryRow. Would it be possible to adapt this functionality in streaming
>> API for arbitrary types? What do you think?
>>
>> Thanks,
>> Łukasz
>>
>>
>
> --
> Best, Jingsong Lee
>
>

-- 
Best, Jingsong Lee

Reply via email to