Hello Andrey, Thanks for your quick response. I have tried with your above code but it didn't suit's my requirement. I need global ordering of my records by using multiple kafka partitions. Please suggest me any workaround for this. as mentioned in this <https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams> link is it possible to buffer data for some amount of time and then perform sort on this or any other way out there?
----------------------------------------------- *Amol Suryawanshi* Java Developer am...@iprogrammer.com *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com <sac...@iprogrammer.com> ------------------------------------------------ On Tue, Jun 19, 2018 at 10:19 PM, Andrey Zagrebin <and...@data-artisans.com> wrote: > Hi Amol, > > I think you could try (based on your stack overflow code) > org.apache.flink.streaming.api.functions.timestamps. > BoundedOutOfOrdernessTimestampExtractor > like this: > > DataStream<Document> streamSource = env > .addSource(kafkaConsumer) > .setParallelism(4) > .assignTimestampsAndWatermarks( > new > BoundedOutOfOrdernessTimestampExtractor<Document>(Time.milliseconds(3500)) > { > @Override > public long extractTimestamp(Event element) { > Map timeStamp = (Map) event.get("ts”); > return (long) timeStamp.get("value"); > } > }); > > In general, if records are sorted by anything in a Kafka partition, > parallel subtask of Flink Kafka source will consume these records and push > to user operators in the same order. There is maximum one consuming subtask > per Kafka partition but several partitions might be served by one subtask. > It means that there are the same guarantees as in Kafka: ordering per > partition but not across them, including no global ordering. > > The case of global and per window ordering is already described by Sihua. > The global ordering might be impractical in case of distributed system. > > If a subtask of your Flink operator consumes from several partitions or > there is no ordering at all, you can try the above approach with > BoundedOutOfOrdernessTimestampExtractor to get approximate ordering > across these partitions per key or all records. It is similar to ordering > within a window. It means there could still be late records coming after > out of orderness period of time which can break the ordering. This operator > buffers records in state to maintain the order but only for out of > orderness period of time which also increases latency. > > Cheers, > Andrey > > > On 19 Jun 2018, at 14:12, sihua zhou <summerle...@163.com> wrote: > > > > > > > > Hi Amol, > > > > > > I'm not sure whether this is impossible, especially when you need to > operate the record in multi parallelism. > > > > > > IMO, in theroy, we can only get a ordered stream when there is a single > partition of kafka and operate it with a single parallelism in flink. Even > in this case, if you only want to order the records in a window, than you > need to store the records in the state, and order them when the window is > triggered. But if you want to order the records with a single > `keyBy()`(non-window), I think that's maybe impossible in practice, because > you need to store the all the incoming records and order the all data for > every incoming records, also you need to send retracted message for the > previous result(because every incoming record might change the global order > of the records). > > > > > > Best, Sihua > > On 06/19/2018 19:19,Amol S - iProgrammer<am...@iprogrammer.com> wrote: > > Hi, > > > > I have used flink streaming API in my application where the source of > > streaming is kafka. My kafka producer will publish data in ascending > order > > of time in different partitions of kafka and consumer will read data from > > these partitions. However some kafka partitions may be slow due to some > > operation and produce late results. Is there any way to maintain order in > > this stream though the data arrive out of order. I have tried > > BoundedOutOfOrdernessTimestampExtractor but it didn't served the > purpose. > > While digging this problem I came across your documentation (URL: > > https://cwiki.apache.org/confluence/display/FLINK/Time+ > and+Order+in+Streams) > > and tried to implement this but it didnt worked. I also tried with Table > > API order by but it seems you not support orderBy in flink 1.5 version. > > Please suggest me any workaround for this. > > > > I have raised same concern on stack overflow > > > > https://stackoverflow.com/questions/50904615/ordering- > of-streams-while-reading-data-from-multiple-kafka-partitions > > > > Thanks, > > > > ----------------------------------------------- > > *Amol Suryawanshi* > > Java Developer > > am...@iprogrammer.com > > > > > > *iProgrammer Solutions Pvt. Ltd.* > > > > > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > 411016, > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > www.iprogrammer.com <sac...@iprogrammer.com> > > ------------------------------------------------ > >