Hi, right now, the only way of shutting down a running pipeline is to cancel it. You can do that in the JobManager dashboard or using the bin/flink command. And the watermark extraction period does not depend on the watch interval. It can be configured using env.getConfig().setAutoWatermarkInterval(long).
Cheers, Aljoscha On Thu, 15 Dec 2016 at 00:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com> wrote: > Hi Aljoscha, > > Thanks a lot for the explanation. Using readFile with PROCESS_CONTINUOUSLY > solves it. Two more questions though: > > 1. Is it possible to gracefully stop the job once it has read the input > once? > 2. Does the watermark extraction period depend on the watch interval, or > should any watch interval (except -1L) work the same way? > > In my case the input is indeed finite and static, but contains hundreds of > GBs, which made the window state grow quickly beyond the memory capacity, > and the fact that the window contents were fired periodically helped > keeping it small. > > Best, > Yassine > > 2016-12-14 10:38 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > > Hi Yassine, > for a bit more detailed explanation: We internally changed how the timer > system works, this timer system is also used to periodically extract > watermarks. Due to this change, in your case we don't extract watermarks > anymore. > > Internally, your call resolves to something like this: > > Env.readFile(FileInputFormat<OUT> inputFormat, String > filePath, FileProcessingMode watchType, long interval) > > with the FileProcessingMode being set to PROCESS_ONCE. > > To get back the old behaviour you can call this method directly with > PROCESS_CONTINUOUSLY. This will keep the pipeline running and will also > ensure that watermarks keep being extracted. > > In your case, it is not strictly wrong to emit only one large watermark in > the end because your processing is finite. I admit that the change from > Flink 1.1 seems a bit strange but this should only occur in toy examples > where the data is finite. > > Does that help? > > Cheers, > Aljoscha > > On Tue, 13 Dec 2016 at 18:17 Aljoscha Krettek <aljos...@apache.org> wrote: > > Hi Yassine, > I managed to reproduce the problem. The cause is that we recently changed > how the timer service is being cleaned up and now the watermark timers are > not firing anymore. > > I'll keep you posted and hope to find a solution fast. > > Cheers, > Aljoscha > > On Sun, 11 Dec 2016 at 22:10 Yassine MARZOUGUI <y.marzou...@mindlytix.com> > wrote: > > Hi Aljoscha, > > Please excuse me for the late response; I've been busy for the whole > previous week. > I used the custom watermark debugger (with 1.1, I changed > super.processWatermark(mark) > to super.output.emitWatermark(mark)), surprisingly with 1.2, only one > watremark is printed at the end of the stream with the value WM: Watermark > @ 9223372036854775807 (Long.MAX_VALUE), whereas with 1.1, watermarks are > printed periodically. I am using the following revision of 1.2-SNAPSHOT : > https://github.com/apache/flink/tree/4e336c692b74f218ba09844a46f49534e3a210e9 > . > > I uploaded the dataset I'm using as an input here : > https://drive.google.com/file/d/0BzERCAJnxXocNGpMTGMzX09id1U/view?usp=sharing > ,the first column corresponds to the timestamp. > > You can find the code below. Thanks you for your help. > > import com.opencsv.CSVParser; > import org.apache.flink.api.common.functions.RichFlatMapFunction; > import org.apache.flink.api.java.tuple.Tuple; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.tuple.Tuple3; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import > org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; > import org.apache.flink.streaming.api.functions.windowing.WindowFunction; > import org.apache.flink.streaming.api.operators.AbstractStreamOperator; > import org.apache.flink.streaming.api.operators.OneInputStreamOperator; > import org.apache.flink.streaming.api.watermark.Watermark; > import > org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; > import org.apache.flink.streaming.api.windowing.time.Time; > import org.apache.flink.streaming.api.windowing.windows.TimeWindow; > import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; > import org.apache.flink.util.Collector; > import java.util.*; > > /** > * Created by ymarzougui on 11/1/2016. > */ > public class SortedSessionsAssigner { > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > DataStream<Tuple3<Long,String,String>> waterMarked = > env.readTextFile("file:///E:\\data\\anonymized.csv") > .flatMap(new RichFlatMapFunction<String, > Tuple3<Long,String,String>>() { > public CSVParser csvParser; > > @Override > public void open(Configuration config) { > csvParser = new CSVParser(',', '"'); > } > > @Override > public void flatMap(String in, > Collector<Tuple3<Long,String,String>> clctr) throws Exception { > String[] result = csvParser.parseLine(in); > clctr.collect(Tuple3.of(Long.parseLong(result[0]), > result[1], result[2])); > } > }) > .assignTimestampsAndWatermarks(new > AscendingTimestampExtractor<Tuple3<Long,String,String>>() { > @Override > public long > extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) { > return tuple3.f0; > } > }); > > DataStream<Tuple2<TreeMap<String, Double>, Long>> sessions = > waterMarked > .keyBy(1) > .window(EventTimeSessionWindows.withGap(Time.minutes(5))) > .apply(new > WindowFunction<Tuple3<Long,String,String>,Tuple2<TreeMap<String, Double>, > Long>, Tuple, TimeWindow>() { > > @Override > public void apply(Tuple tuple, TimeWindow timeWindow, > Iterable<Tuple3<Long, String, String>> iterable, > Collector<Tuple2<TreeMap<String, Double>, Long>> collector) throws > Exception { > TreeMap<String,Double> treeMap = new > TreeMap<String, Double>(); > Long session_count = 0L; > for (Tuple3<Long, String, String> tuple3 : > iterable){ > treeMap.put(tuple3.f2, > treeMap.getOrDefault(tuple3.f2, 0.0) + 1); > session_count += 1; > } > collector.collect(Tuple2.of(treeMap, > session_count)); > > } > }).setParallelism(8); > > waterMarked.transform("WatermarkDebugger", waterMarked.getType(), > new WatermarkDebugger<Tuple3<Long, String, String>>()); > > //sessions.writeAsCsv("file:///E:\\data\\sessions.csv", > FileSystem.WriteMode.OVERWRITE).setParallelism(1); > > env.execute("Sorted Sessions Assigner"); > > } > > public static class WatermarkDebugger<T> > extends AbstractStreamOperator<T> implements > OneInputStreamOperator<T, T> { > private static final long serialVersionUID = 1L; > > @Override > public void processElement(StreamRecord<T> element) throws > Exception { > System.out.println("ELEMENT: " + element); > output.collect(element); > } > > @Override > public void processWatermark(Watermark mark) throws Exception { > // 1.2-snapshot > super.processWatermark(mark); > // 1.1-snapshot > //super.output.emitWatermark(mark); > System.out.println("WM: " + mark); > } > } > > } > > Best, > Yassine > > 2016-12-06 5:57 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > > Hi, > could you please try adding this custom watermark debugger to see what's > going on with the element timestamps and watermarks: > > public static class WatermarkDebugger<T> > extends AbstractStreamOperator<T> implements > OneInputStreamOperator<T, T> { > private static final long serialVersionUID = 1L; > > @Override > public void processElement(StreamRecord<T> element) throws Exception { > System.out.println("ELEMENT: " + element); > output.collect(element); > } > > @Override > public void processWatermark(Watermark mark) throws Exception { > super.processWatermark(mark); > System.out.println("WM: " + mark); > } > } > > you can use it like this: > input.transform("WatermarkDebugger", input.getType(), new > WatermarkDebugger<Tuple2<String, Integer>>()); > > That should give us something to work with. > > Cheers, > Aljoscha > > On Mon, 5 Dec 2016 at 18:54 Robert Metzger <rmetz...@apache.org> wrote: > > I'll add Aljoscha and Kostas Kloudas to the conversation. They have the > best overview over the changes to the window operator between 1.1. and 1.2. > > On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI < > y.marzou...@mindlytix.com> wrote: > > I forgot to mention : the watermark extractor is the one included in Flink > API. > > 2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: > > Hi robert, > > Yes, I am using the same code, just swithcing the version in pom.xml to > 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at > the time of the question)). Here is the watermark assignment : > > .assignTimestampsAndWatermarks(new > AscendingTimestampExtractor<Tuple3<Long,String,String>>() { > @Override > public long extractAscendingTimestamp(Tuple3<Long,String,String> > tuple3) { > return tuple3.f0; > } > }) > > Best, > Yassine > > 2016-12-05 11:24 GMT+01:00 Robert Metzger <rmetz...@apache.org>: > > Hi Yassine, > are you sure your watermark extractor is the same between the two > versions. It sounds a bit like the watermarks for the 1.2 code are not > generated correctly. > > Regards, > Robert > > > On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI < > y.marzou...@mindlytix.com> wrote: > > Hi all, > > With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows > boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing > in memory and the windows results are not emitted until the whole stream is > processed. Is this a temporary behaviour due to the developments in > 1.2-SNAPSHOT, or a bug? > > I am using a code similar to the follwoing: > > env.setParallelism(1); > > DataStream<T> sessions = env > .readTextFile() > .flatMap() > .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>()) > .keyBy(1) > .window(EventTimeSessionWindows.withGap(Time.minutes(5))) > .apply().setParallelism(32) > > sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv(); > sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv(); > > Best, > Yassine > > > > > > > >