Hi, If you are using checkpoints, I think a simple way is using a ListState to store all coming records. And in endInput(), drain all records from ListState to a sorter to sort all records.
Best, Jingsong Lee On Fri, Jan 31, 2020 at 3:10 PM Łukasz Jędrzejewski <l...@touk.pl> wrote: > Hi, > > Thank you very much for suggestions. I will check out the > UnilateralSortMerge. However in our case we are using checkpoints. > > Kind regards, > Łukasz > W dniu 31.01.2020 o 07:54, Jingsong Li pisze: > > Hi Łukasz, > > First, we are planning to design and implement the BoundedStream story, > which will be discussed further in 1.11 or 1.12. > > SortedMapState was discussed in FLINK-6219 [1], But there are some > problems that can not be solved well, so they have not been introduced. > > If it is a pure BoundedStream without checkpoints, it is not recommended > to use state, because state is usually used for checkpoints, which will > cause more overhead. > > SortOperator is introduced for table BaseRow, I recommend that you use the > UnilateralSortMerger to construct your own SortOperator. > > [1] https://issues.apache.org/jira/browse/FLINK-6219 > > Best, > Jingsong Lee > > On Fri, Jan 31, 2020 at 2:08 AM Łukasz Jędrzejewski <l...@touk.pl> wrote: > >> Hi all, >> >> In Flink 1.9 couple of changes was introduced to deal with bounded >> streams e.g. BoundedOneInput interface. I'm wondering would it be >> doable to do some kind of global sort after receiving end input event on >> finished data stream source, using only DataStream API? >> >> We have made some experiments with BoundedOneInput - buffering elements >> and then sorting them after receiving the end input event and finally >> emitting sorted elements. it is seems to be working as excepted though >> we are having troubles to sort a big stream efficiently. One problem is >> missing appropriate state type something like SortedMapState. While >> using MapState the elements are inserted into a kind of byte order. I >> think it could be possible to do some key modification to achieve >> correct bytes order but it's not trivial for every type (string, int, >> tuples, and so on). Do you plan adding such kind of sorted state? >> >> In Flink Table API there is SortOperator but it is restricted to >> BinaryRow. Would it be possible to adapt this functionality in streaming >> API for arbitrary types? What do you think? >> >> Thanks, >> Łukasz >> >> > > -- > Best, Jingsong Lee > > -- Best, Jingsong Lee