Hi Yuvraj, It looks as some race condition for me. Would it be ok for you to switch to either Event or Ingestion time[1]?
I also cced @Aljosha who might give you a bit more insights Best, Dawid [1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_time.html#event-time--processing-time--ingestion-time On 24/09/18 13:26, yuvraj singh wrote: > this is my code > > > DataStream<CityWithGeoHashes> cityWithGeoHashesDataStream = > filteredGeohashDataStream.keyBy(FilteredGeoHashes::getCity).window( > ProcessingTimeSessionWindows.withGap(Time.seconds(4))) > .process(new ProcessWindowFunction<FilteredGeoHashes, > CityWithGeoHashes, String, TimeWindow>() { > @Override public void process(String city, Context > context, Iterable<FilteredGeoHashes> iterable, Collector<CityWithGeoHashes> > collector) > throws Exception { > Map<String, List<FilteredGeoHashes>> > geoHashesPerCategory = StreamSupport.stream(iterable.spliterator(), false) > > .collect(Collectors.groupingBy(FilteredGeoHashes::getCategory)); > collector.collect(new CityWithGeoHashes(city, > geoHashesPerCategory)); > } > }).name("city-geohashes-processor") > .uid("city-geohashes-processor"); > > On Mon, Sep 24, 2018 at 4:56 PM yuvraj singh > <19yuvrajsing...@gmail.com <mailto:19yuvrajsing...@gmail.com>> wrote: > > > Hi all , > > > I am stuck with this error > > > please help me . > > > I am using sessionwindow > > > 2018-09-23 07:15:08,097 INFO > org.apache.flink.runtime.taskmanager.Task - > city-geohashes-processor (24/48) > (26aed9a769743191c7cb0257087e490a) switched from RUNNING to FAILED. > > java.lang.UnsupportedOperationException: The end timestamp of a > processing-time window cannot become earlier than the current > processing time by merging. Current processing time: 1537667108063 > window: TimeWindow{start=1537667100054, end=1537667107237} > > at > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325) > > at > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311) > > at > > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212) > > at > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311) > > at > > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > > > > > Thanks > > Yuvraj Singh >
signature.asc
Description: OpenPGP digital signature