Re: Ordering of stream from different kafka partitions

2018-06-21 Thread Amol S - iProgrammer
Hello andrey, Thanks for the help. I am trying to implement your above given code code sourceStream .setParallelism(4) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<>(…) {…}) .windowAll(TumblingEventTimeWindows.of(Time...)) .p

Re: Ordering of stream from different kafka partitions

2018-06-20 Thread Andrey Zagrebin
Hi Amol, > In above code also it will sort the records in specific time window only. All windows will be emitted as watermark passes the end of the window. The watermark only increases. So the non-overlapping windows should be also sorted by time and as a consequence the records across windows

Re: Ordering of stream from different kafka partitions

2018-06-20 Thread Amol S - iProgrammer
Hello Andrey, In above code also it will sort the records in specific time window only. Anyways we agreed to create N number of partitions with N number of consumers based on some key as order is maintained per kafka partition. I have some questions about this. 1. How should I create N consumers

Re: Ordering of stream from different kafka partitions

2018-06-20 Thread Andrey Zagrebin
Hi, Good point, sorry for confusion, BoundedOutOfOrdernessTimestampExtractor of course does not buffer records, you need to apply windowing (e.g. TumblingEventTimeWindows) for that and then sort the window output by time and emit records in sorted order. You can also use windowAll which alread

Re: Ordering of stream from different kafka partitions

2018-06-20 Thread sihua zhou
Hi, I think a global ordering is a bit impractical on production, but in theroy, you still can do that. You need to - Firstly fix the operate's parallelism to 1(except the source node). - If you want to sort the records within a bouned time, then you can keyBy() a constant and window it,

Re: Ordering of stream from different kafka partitions

2018-06-20 Thread Amol S - iProgrammer
Hello Andrey, Thanks for your quick response. I have tried with your above code but it didn't suit's my requirement. I need global ordering of my records by using multiple kafka partitions. Please suggest me any workaround for this. as mentioned in this

Re: Ordering of stream from different kafka partitions

2018-06-19 Thread Andrey Zagrebin
Hi Amol, I think you could try (based on your stack overflow code) org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor like this: DataStream streamSource = env .addSource(kafkaConsumer) .setParallelism(4) .assignTimestampsAndWatermarks( new

Re: Ordering of stream from different kafka partitions

2018-06-19 Thread Andrey Zagrebin
Hi Amol, I think you could try (based on your stack overflow code) org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor like this: DataStream streamSource = env .addSource(kafkaConsumer) .setParallelism(4) .assignTimestampsAndWatermarks(