this is my code

DataStream<CityWithGeoHashes> cityWithGeoHashesDataStream =
        filteredGeohashDataStream.keyBy(FilteredGeoHashes::getCity).window(
                ProcessingTimeSessionWindows.withGap(Time.seconds(4)))
                .process(new ProcessWindowFunction<FilteredGeoHashes,
CityWithGeoHashes, String, TimeWindow>() {
                    @Override
                    public void process(String city, Context context,
Iterable<FilteredGeoHashes> iterable, Collector<CityWithGeoHashes>
collector)
                            throws Exception {
                        Map<String, List<FilteredGeoHashes>>
geoHashesPerCategory = StreamSupport.stream(iterable.spliterator(),
false)

.collect(Collectors.groupingBy(FilteredGeoHashes::getCategory));
                        collector.collect(new CityWithGeoHashes(city,
geoHashesPerCategory));
                    }
                }).name("city-geohashes-processor")
                .uid("city-geohashes-processor");


On Mon, Sep 24, 2018 at 4:56 PM yuvraj singh <19yuvrajsing...@gmail.com>
wrote:

>
> Hi all ,
>
>
> I  am stuck with this error
>
>
> please help me .
>
>
> I am using sessionwindow
>
>
> 2018-09-23 07:15:08,097 INFO  org.apache.flink.runtime.taskmanager.Task
>                   - city-geohashes-processor (24/48)
> (26aed9a769743191c7cb0257087e490a) switched from RUNNING to FAILED.
>
> java.lang.UnsupportedOperationException: The end timestamp of a
> processing-time window cannot become earlier than the current processing
> time by merging. Current processing time: 1537667108063 window:
> TimeWindow{start=1537667100054, end=1537667107237}
>
>         at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)
>
>         at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
>
>         at
> org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
>
>         at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
>
>         at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>
>
>
>
> Thanks
>
> Yuvraj Singh
>

Reply via email to