Thanks for your answer Aljoscha, The source stops, when I comment all the transformed streams and just print the input, the program completes. But this is custom SourceFunction, could this be related to this? Maybe I should implement emitWatermark? I'm using ingestion time so I assumed this wasn't needed.
Greetings, Juan On Mon, Nov 21, 2016 at 9:17 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Might it be that your initial source never stops? A loop will only > terminate if both the original source stops and the loop timeout is reached. > > On Mon, 21 Nov 2016 at 07:58 Juan Rodríguez Hortalá < > juan.rodriguez.hort...@gmail.com> wrote: > >> Hi, >> >> I wrote a proof of concept for a Java version of mapWithState with >> time-based state eviction https://github.com/juanrh/ >> flink-state-eviction/blob/a6bb0d4ca0908d2f4350209a4a41e3 >> 81e99c76c5/src/main/java/com/github/juanrh/streaming/ >> MapWithStateIterPoC.java. The idea is: >> >> - Convert an input KeyedStream with key K and value V into a KeyedStream >> of Either<V, K>, with the original values as Left. >> - Replace a ValueState<S> by a ValueState for a POJO that besides S it >> stores the timestamp of the last time that state was accessed. >> - Define a IterativeStream from the Either stream, and apply a >> transformation function that periorically sends "tombstone" events as Right >> events in the closeWith of the IterativeStream. When a tombstone is >> received, delete the state with clear if it the time since it was last >> accessed is bigger than a configured time to live. >> >> This seems to work so far, but there are some things that look weird to >> me: >> >> - The program never seems to stop, event though I Ihave defined the >> IterativeStream with https://ci.apache.org/projects/flink/flink-docs- >> master/api/java/org/apache/flink/streaming/api/ >> datastream/DataStream.html#iterate-long- . The value of seems to be >> ignored. I'm using a custom source function, but it seems like the method >> SourceFunction.cancel() it's not being called. >> >> - I'm getting several messages "WARN MetricGroup: Name collision: Group >> already contains a Metric with the name 'numRecordsOut'. Metric will not be >> reported. (null)". What does that mean? >> >> Thanks, >> >> Juan >> >