Hi Gordon,

When I use getInput (input created via collection), then watermarks are
always Long.MAX_VALUE:
WM: Watermark @ 9223372036854775807

This is understandable as input source has finished so a watermark of value
Long.MAX_VALUE is emitted.

When I use getKafkaInput, I get this watermark:
WM: Watermark @ 1489532509000

This corresponds to Tue Mar 14 2017 16:01:49, which seems right (last
record's timestamp: 2017-03-14 16:01:50 minus 1 sec due to maxOutOfOrder
value).

If I *don't* use control stream, I also get correct watermark and this time
window function is called and correct aggregated values are generated.

Thanks,
Tarandeep



On Thu, Mar 16, 2017 at 10:25 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Tarandeep,
>
> Thanks for clarifying.
>
> For the next step, I would recommend taking a look at
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/
> debugging_event_time.html and try to find out what exactly is wrong with
> the watermark progression. Flink 1.2 exposes watermarks as a metric, and
> that should help in figuring out why the windows aren’t firing.
>
> Also, I see you have added a “WatermarkDebugger” in your job. Have you
> checked whether or not the watermarks printed there are identical (using
> getInput v.s. getKafkaInput)?
>
> Cheers,
> Gordon
>
>
> On March 17, 2017 at 12:32:51 PM, Tarandeep Singh (tarand...@gmail.com)
> wrote:
>
> Anyone?
> Any suggestions what could be going wrong or what I am doing wrong?
>
> Thanks,
> Tarandeep
>
>
> On Thu, Mar 16, 2017 at 7:34 AM, Tarandeep Singh <tarand...@gmail.com>
> wrote:
>
>> Data is read from Kafka and yes I use different group id every time I run
>> the code. I have put break points and print statements to verify that.
>>
>> Also, if I don't connect with control stream the window function works.
>>
>> - Tarandeep
>>
>> On Mar 16, 2017, at 1:12 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
>> wrote:
>>
>> Hi Tarandeep,
>>
>> I haven’t looked at the rest of the code yet, but my first guess is that
>> you might not be reading any data from Kafka at all:
>>
>> private static DataStream<String> readKafkaStream(String topic, 
>> StreamExecutionEnvironment env) throws IOException {
>>
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>         properties.setProperty("zookeeper.connect", "localhost:2181");
>>         properties.setProperty("group.id", "group-0009");
>>         properties.setProperty("auto.offset.reset", "smallest");
>>         return env.addSource(new FlinkKafkaConsumer08<>(topic, new 
>> SimpleStringSchema(), properties));
>>     }
>>
>>
>> Have you tried using a different “group.id” everytime you’re re-running
>> the job?
>> Note that the “auto.offset.reset” value is only respected when there
>> aren’t any offsets for the group committed in Kafka.
>> So you might not actually be reading the complete “small_input.cv”
>> dataset, unless you use a different group.id overtime.
>>
>> Cheers,
>> Gordon
>>
>> On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarand...@gmail.com)
>> wrote:
>>
>> Hi,
>>
>> I am using flink-1.2 and reading data stream from Kafka (using
>> FlinkKafkaConsumer08). I want to connect this data stream with another
>> stream (read control stream) so as to do some filtering on the fly. After
>> filtering, I am applying window function (tumbling/sliding event window)
>> along with fold function. However, the window function does not get called.
>>
>> Any help to debug/fix this is greatly appreciated!
>>
>> Below is a reproducible code that one can run in IDE like IntelliJ or on
>> flink cluster. You will need to have a running Kafka cluster (local or
>> otherwise).
>> Create a topic and add test data points-
>>
>> $KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper
>> localhost:2181 --replication-factor 1 --partitions 1
>> $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092
>> --topic test < small_input.csv
>>
>> where small_input.csv contains the following lines-
>>
>> p1,10.0f,2017-03-14 16:01:01
>> p1,10.0f,2017-03-14 16:01:02
>> p1,10.0f,2017-03-14 16:01:03
>> p1,10.0f,2017-03-14 16:01:04
>> p1,10.0f,2017-03-14 16:01:05
>> p1,10.0f,2017-03-14 16:01:10
>> p1,10.0f,2017-03-14 16:01:11
>> p1,10.0f,2017-03-14 16:01:12
>> p1,10.0f,2017-03-14 16:01:40
>> p1,10.0f,2017-03-14 16:01:50
>>
>> Now you can run the code given below. Note:
>>
>> 1) In this example, I am not reading control stream from Kafka (but issue
>> can be reproduced with this code as well)
>> 2) If instead of reading data stream from kafka, I create stream from
>> elements (i.e. use getInput function instead of getKafkaInput function),
>> the code works and window function is fired.
>>
>> Thanks,
>> Tarandeep
>>
>>
>>
>> import org.apache.flink.api.common.functions.FoldFunction;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.api.java.tuple.Tuple;
>> import org.apache.flink.api.java.tuple.Tuple1;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.api.java.tuple.Tuple3;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
>> import 
>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>> import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
>> import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
>> import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
>> import org.apache.flink.streaming.api.watermark.Watermark;
>> import 
>> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
>> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>> import org.apache.flink.util.Collector;
>>
>> import java.io.IOException;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.*;
>>
>> public class Test3 {
>>
>>     public static void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>>         //DataStream<Product> product = getInput(env);
>>         DataStream<Product> product = getKafkaInput(env);
>>         DataStream<Tuple1<String>> control= getControl(env);
>>
>>         DataStream<Product> filteredStream = product.keyBy(0)
>>                 .connect(control.keyBy(0))
>>                 .flatMap(new CoFlatMapFunImpl());
>>
>>         DataStream<Product> watermarkedStream = 
>> filteredStream.assignTimestampsAndWatermarks(
>>                 getTimestampAssigner(Time.seconds(1))).setParallelism(3);
>>
>>         watermarkedStream.transform("WatermarkDebugger", 
>> watermarkedStream.getType(), new WatermarkDebugger<Product>());
>>
>>         watermarkedStream
>>                 .keyBy(0)
>>                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>>                 .fold(new NameCount("", 0), new FoldFunImpl(), new 
>> WindowFunImpl())
>>                 .print();
>>
>>         env.execute();
>>     }
>>
>>     /**
>>      * If instead of reading from Kafka, create stream from elements, the
>>      * code works and window function is fired!
>>      */
>>     private static DataStream<Product> getInput(StreamExecutionEnvironment 
>> env) {
>>         return env.fromCollection(Arrays.asList(
>>             new Product("p1",10.0f,"2017-03-14 16:01:01"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:02"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:03"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:04"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:05"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:10"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:11"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:12"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:40"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:50")
>>         ));
>>     }
>>
>>     private static DataStream<Product> 
>> getKafkaInput(StreamExecutionEnvironment env) throws IOException {
>>         DataStream<String> s = readKafkaStream("test", env);
>>
>>         return s.map(new MapFunction<String, Product>() {
>>             @Override
>>             public Product map(String s) throws Exception {
>>                 String[] fields = s.split(",");
>>                 return new Product(fields[0], Float.parseFloat(fields[1]), 
>> fields[2]);
>>             }
>>         });
>>     }
>>
>>     private static DataStream<Tuple1<String>> 
>> getControl(StreamExecutionEnvironment env) {
>>         return env.fromElements(new Tuple1<>("p1"));
>>     }
>>
>>     private static class CoFlatMapFunImpl extends 
>> RichCoFlatMapFunction<Product, Tuple1<String>,Product> {
>>
>>         private Set<String> productNames = new 
>> HashSet<>(Arrays.asList("p1"));
>>
>>         @Override
>>         public void flatMap1(Product product, Collector<Product> collector) 
>> throws Exception {
>>             if (productNames.contains(product.f0)) {
>>                 collector.collect(product);
>>                 System.out.println("Retaining product " + product + " in 
>> data stream");
>>             }
>>         }
>>
>>         @Override
>>         public void flatMap2(Tuple1<String> t, Collector<Product> collector) 
>> throws Exception {
>>             productNames.add(t.f0);
>>             System.out.println("Adding product to set:" + t.f0);
>>         }
>>     }
>>
>>     private static class FoldFunImpl implements 
>> FoldFunction<Product,NameCount> {
>>         @Override
>>         public NameCount fold(NameCount current, Product p) throws Exception 
>> {
>>             current.f0 = p.f0;
>>             current.f1 += 1;
>>             return current;
>>         }
>>     }
>>
>>     /**
>>      * WINDOW FUNCTION NEVER GETS CALLED.
>>      */
>>     private static class WindowFunImpl extends 
>> RichWindowFunction<NameCount,NameCount,Tuple,TimeWindow> {
>>         @Override
>>         public void apply(Tuple key, TimeWindow timeWindow, 
>> Iterable<NameCount> iterable,
>>                           Collector<NameCount> collector) throws Exception {
>>             NameCount nc = iterable.iterator().next();
>>             collector.collect(nc);
>>             System.out.println("WINDOW: start time: " + new 
>> Date(timeWindow.getStart()) + " " + nc);
>>         }
>>     }
>>
>>     private static BoundedOutOfOrdernessTimestampExtractor<Product> 
>> getTimestampAssigner(final Time maxOutOfOrderness) {
>>         final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 
>> HH:mm:ss");
>>
>>         return new 
>> BoundedOutOfOrdernessTimestampExtractor<Product>(maxOutOfOrderness) {
>>             @Override
>>             public long extractTimestamp(Product p) {
>>                 long ts = 0L;
>>                 try {
>>                     ts = dateFormat.parse(p.f2).getTime();
>>                 } catch (Exception e) {}
>>                 return ts;
>>             }
>>         };
>>     }
>>
>>     public static class Product extends Tuple3<String,Float,String> {
>>         public Product() {}
>>         public Product(String name, Float price, String dateTime) {
>>             super(name, price, dateTime);
>>         }
>>     }
>>
>>     public static class NameCount extends Tuple2<String,Integer> {
>>         public NameCount() {}
>>         public NameCount(String name, Integer count) {
>>             super(name, count);
>>         }
>>     }
>>
>>     private static DataStream<String> readKafkaStream(String topic, 
>> StreamExecutionEnvironment env) throws IOException {
>>
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>         properties.setProperty("zookeeper.connect", "localhost:2181");
>>         properties.setProperty("group.id", "group-0009");
>>         properties.setProperty("auto.offset.reset", "smallest");
>>         return env.addSource(new FlinkKafkaConsumer08<>(topic, new 
>> SimpleStringSchema(), properties));
>>     }
>>
>>     public static class WatermarkDebugger<T>
>>             extends AbstractStreamOperator<T> implements 
>> OneInputStreamOperator<T, T> {
>>         private static final long serialVersionUID = 1L;
>>
>>         @Override
>>         public void processElement(StreamRecord<T> element) throws Exception 
>> {
>>             System.out.println("ELEMENT: " + element);
>>             output.collect(element);
>>         }
>>
>>         @Override
>>         public void processWatermark(Watermark mark) throws Exception {
>>             super.processWatermark(mark);
>>             System.out.println("WM: " + mark);
>>         }
>>     }
>> }
>>
>>
>>
>

Reply via email to