Hi Yuvraj,

It looks as some race condition for me. Would it be ok for you to switch
to either Event or Ingestion time[1]?

I also cced @Aljosha who might give you a bit more insights

Best,

Dawid


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_time.html#event-time--processing-time--ingestion-time

On 24/09/18 13:26, yuvraj singh wrote:
> this is my code 
>
>
> DataStream<CityWithGeoHashes> cityWithGeoHashesDataStream =
>         filteredGeohashDataStream.keyBy(FilteredGeoHashes::getCity).window(
>                 ProcessingTimeSessionWindows.withGap(Time.seconds(4)))
>                 .process(new ProcessWindowFunction<FilteredGeoHashes, 
> CityWithGeoHashes, String, TimeWindow>() {
>                     @Override public void process(String city, Context 
> context, Iterable<FilteredGeoHashes> iterable, Collector<CityWithGeoHashes> 
> collector)
>                             throws Exception {
>                         Map<String, List<FilteredGeoHashes>> 
> geoHashesPerCategory = StreamSupport.stream(iterable.spliterator(), false)
>                                 
> .collect(Collectors.groupingBy(FilteredGeoHashes::getCategory));
>                         collector.collect(new CityWithGeoHashes(city, 
> geoHashesPerCategory));
>                     }
>                 }).name("city-geohashes-processor")
>                 .uid("city-geohashes-processor");
>
> On Mon, Sep 24, 2018 at 4:56 PM yuvraj singh
> <19yuvrajsing...@gmail.com <mailto:19yuvrajsing...@gmail.com>> wrote:
>
>
>     Hi all , 
>
>
>     I  am stuck with this error 
>
>
>     please help me .
>
>
>     I am using sessionwindow 
>
>
>     2018-09-23 07:15:08,097 INFO 
>     org.apache.flink.runtime.taskmanager.Task                     -
>     city-geohashes-processor (24/48)
>     (26aed9a769743191c7cb0257087e490a) switched from RUNNING to FAILED.
>
>     java.lang.UnsupportedOperationException: The end timestamp of a
>     processing-time window cannot become earlier than the current
>     processing time by merging. Current processing time: 1537667108063
>     window: TimeWindow{start=1537667100054, end=1537667107237}
>
>             at
>     
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)
>
>             at
>     
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
>
>             at
>     
> org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
>
>             at
>     
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
>
>             at
>     
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>
>
>
>
>     Thanks 
>
>     Yuvraj Singh 
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to