Hi,
right now, the only way of shutting down a running pipeline is to cancel
it. You can do that in the JobManager dashboard or using the bin/flink
command. And the watermark extraction period does not depend on the watch
interval. It can be configured using
env.getConfig().setAutoWatermarkInterval(long).

Cheers,
Aljoscha

On Thu, 15 Dec 2016 at 00:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>
wrote:

> Hi Aljoscha,
>
> Thanks a lot for the explanation. Using readFile with PROCESS_CONTINUOUSLY
> solves it. Two more questions though:
>
> 1. Is it possible to gracefully stop the job once it has read the input
> once?
> 2. Does the watermark extraction period depend on the watch interval, or
> should any watch interval (except -1L) work the same way?
>
> In my case the input is indeed finite and static, but contains hundreds of
> GBs, which made the window state grow quickly beyond the memory capacity,
> and the fact that the window contents were fired periodically helped
> keeping it small.
>
> Best,
> Yassine
>
> 2016-12-14 10:38 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
>
> Hi Yassine,
> for a bit more detailed explanation: We internally changed how the timer
> system works, this timer system is also used to periodically extract
> watermarks. Due to this change, in your case we don't extract watermarks
> anymore.
>
> Internally, your call resolves to something like this:
>
> Env.readFile(FileInputFormat<OUT> inputFormat, String
> filePath, FileProcessingMode watchType, long interval)
>
> with the FileProcessingMode being set to PROCESS_ONCE.
>
> To get back the old behaviour you can call this method directly with
> PROCESS_CONTINUOUSLY. This will keep the pipeline running and will also
> ensure that watermarks keep being extracted.
>
> In your case, it is not strictly wrong to emit only one large watermark in
> the end because your processing is finite. I admit that the change from
> Flink 1.1 seems a bit strange but this should only occur in toy examples
> where the data is finite.
>
> Does that help?
>
> Cheers,
> Aljoscha
>
> On Tue, 13 Dec 2016 at 18:17 Aljoscha Krettek <aljos...@apache.org> wrote:
>
> Hi Yassine,
> I managed to reproduce the problem. The cause is that we recently changed
> how the timer service is being cleaned up and now the watermark timers are
> not firing anymore.
>
> I'll keep you posted and hope to find a solution fast.
>
> Cheers,
> Aljoscha
>
> On Sun, 11 Dec 2016 at 22:10 Yassine MARZOUGUI <y.marzou...@mindlytix.com>
> wrote:
>
> Hi Aljoscha,
>
> Please excuse me for the late response; I've been busy for the whole
> previous week.
> I used the custom watermark debugger (with 1.1, I changed 
> super.processWatermark(mark)
> to super.output.emitWatermark(mark)), surprisingly with 1.2, only one
> watremark is printed at the end of the stream with the value WM: Watermark
> @ 9223372036854775807 (Long.MAX_VALUE), whereas with 1.1, watermarks are
> printed periodically. I am  using the following revision of 1.2-SNAPSHOT :
> https://github.com/apache/flink/tree/4e336c692b74f218ba09844a46f49534e3a210e9
> .
>
> I uploaded the dataset I'm using as an input here :
> https://drive.google.com/file/d/0BzERCAJnxXocNGpMTGMzX09id1U/view?usp=sharing
>  ,the first column corresponds to the timestamp.
>
> You can find the code below. Thanks you for your help.
>
> import com.opencsv.CSVParser;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
> import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
> import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
> import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import
> org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.util.Collector;
> import java.util.*;
>
> /**
>  * Created by ymarzougui on 11/1/2016.
>  */
> public class SortedSessionsAssigner {
>     public static void main(String[] args) throws Exception {
>         final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(1);
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>         DataStream<Tuple3<Long,String,String>> waterMarked =
> env.readTextFile("file:///E:\\data\\anonymized.csv")
>                 .flatMap(new RichFlatMapFunction<String,
> Tuple3<Long,String,String>>() {
>                     public CSVParser csvParser;
>
>                     @Override
>                     public void open(Configuration config) {
>                         csvParser = new CSVParser(',', '"');
>                     }
>
>                     @Override
>                     public void flatMap(String in,
> Collector<Tuple3<Long,String,String>> clctr) throws Exception {
>                         String[] result = csvParser.parseLine(in);
>                         clctr.collect(Tuple3.of(Long.parseLong(result[0]),
> result[1], result[2]));
>                     }
>                 })
>                 .assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
>                     @Override
>                     public long
> extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) {
>                         return tuple3.f0;
>                     }
>                 });
>
>         DataStream<Tuple2<TreeMap<String, Double>, Long>> sessions =
> waterMarked
>                 .keyBy(1)
>                 .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
>                 .apply(new
> WindowFunction<Tuple3<Long,String,String>,Tuple2<TreeMap<String, Double>,
> Long>, Tuple, TimeWindow>() {
>
>                     @Override
>                     public void apply(Tuple tuple, TimeWindow timeWindow,
> Iterable<Tuple3<Long, String, String>> iterable,
> Collector<Tuple2<TreeMap<String, Double>, Long>> collector) throws
> Exception {
>                         TreeMap<String,Double> treeMap = new
> TreeMap<String, Double>();
>                         Long session_count = 0L;
>                         for (Tuple3<Long, String, String> tuple3 :
> iterable){
>                             treeMap.put(tuple3.f2,
> treeMap.getOrDefault(tuple3.f2, 0.0) + 1);
>                             session_count += 1;
>                         }
>                         collector.collect(Tuple2.of(treeMap,
> session_count));
>
>                     }
>                 }).setParallelism(8);
>
>         waterMarked.transform("WatermarkDebugger", waterMarked.getType(),
> new WatermarkDebugger<Tuple3<Long, String, String>>());
>
>         //sessions.writeAsCsv("file:///E:\\data\\sessions.csv",
> FileSystem.WriteMode.OVERWRITE).setParallelism(1);
>
>         env.execute("Sorted Sessions Assigner");
>
>     }
>
>     public static class WatermarkDebugger<T>
>             extends AbstractStreamOperator<T> implements
> OneInputStreamOperator<T, T> {
>         private static final long serialVersionUID = 1L;
>
>         @Override
>         public void processElement(StreamRecord<T> element) throws
> Exception {
>             System.out.println("ELEMENT: " + element);
>             output.collect(element);
>         }
>
>         @Override
>         public void processWatermark(Watermark mark) throws Exception {
>             // 1.2-snapshot
>             super.processWatermark(mark);
>             // 1.1-snapshot
>             //super.output.emitWatermark(mark);
>             System.out.println("WM: " + mark);
>         }
>     }
>
> }
>
> Best,
> Yassine
>
> 2016-12-06 5:57 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
>
> Hi,
> could you please try adding this custom watermark debugger to see what's
> going on with the element timestamps and watermarks:
>
> public static class WatermarkDebugger<T>
>         extends AbstractStreamOperator<T> implements
> OneInputStreamOperator<T, T> {
>     private static final long serialVersionUID = 1L;
>
>     @Override
>     public void processElement(StreamRecord<T> element) throws Exception {
>         System.out.println("ELEMENT: " + element);
>         output.collect(element);
>     }
>
>     @Override
>     public void processWatermark(Watermark mark) throws Exception {
>         super.processWatermark(mark);
>         System.out.println("WM: " + mark);
>     }
> }
>
> you can use it like this:
> input.transform("WatermarkDebugger", input.getType(), new
> WatermarkDebugger<Tuple2<String, Integer>>());
>
> That should give us something to work with.
>
> Cheers,
> Aljoscha
>
> On Mon, 5 Dec 2016 at 18:54 Robert Metzger <rmetz...@apache.org> wrote:
>
> I'll add Aljoscha and Kostas Kloudas to the conversation. They have the
> best overview over the changes to the window operator between 1.1. and 1.2.
>
> On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
>
> I forgot to mention : the watermark extractor is the one included in Flink
> API.
>
> 2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>
> Hi robert,
>
> Yes, I am using the same code, just swithcing the version in pom.xml to
> 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at
> the time of the question)). Here is the watermark assignment :
>
> .assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
>     @Override
>         public long extractAscendingTimestamp(Tuple3<Long,String,String>
> tuple3) {
>             return tuple3.f0;
>         }
> })
>
> Best,
> Yassine
>
> 2016-12-05 11:24 GMT+01:00 Robert Metzger <rmetz...@apache.org>:
>
> Hi Yassine,
> are you sure your watermark extractor is the same between the two
> versions. It sounds a bit like the watermarks for the 1.2 code are not
> generated correctly.
>
> Regards,
> Robert
>
>
> On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
>
> Hi all,
>
> With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows
> boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing
> in memory and the windows results are not emitted until the whole stream is
> processed. Is this a temporary behaviour due to the developments in
> 1.2-SNAPSHOT, or a bug?
>
> I am using a code similar to the follwoing:
>
> env.setParallelism(1);
>
> DataStream<T> sessions = env
>     .readTextFile()
>     .flatMap()
>     .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
>     .keyBy(1)
>     .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
>     .apply().setParallelism(32)
>
> sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
> sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();
>
> Best,
> Yassine
>
>
>
>
>
>
>
>

Reply via email to