Anyone?
Any suggestions what could be going wrong or what I am doing wrong?

Thanks,
Tarandeep


On Thu, Mar 16, 2017 at 7:34 AM, Tarandeep Singh <tarand...@gmail.com>
wrote:

> Data is read from Kafka and yes I use different group id every time I run
> the code. I have put break points and print statements to verify that.
>
> Also, if I don't connect with control stream the window function works.
>
> - Tarandeep
>
> On Mar 16, 2017, at 1:12 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
> Hi Tarandeep,
>
> I haven’t looked at the rest of the code yet, but my first guess is that
> you might not be reading any data from Kafka at all:
>
> private static DataStream<String> readKafkaStream(String topic, 
> StreamExecutionEnvironment env) throws IOException {
>
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers", "localhost:9092");
>         properties.setProperty("zookeeper.connect", "localhost:2181");
>         properties.setProperty("group.id", "group-0009");
>         properties.setProperty("auto.offset.reset", "smallest");
>         return env.addSource(new FlinkKafkaConsumer08<>(topic, new 
> SimpleStringSchema(), properties));
>     }
>
>
> Have you tried using a different “group.id” everytime you’re re-running
> the job?
> Note that the “auto.offset.reset” value is only respected when there
> aren’t any offsets for the group committed in Kafka.
> So you might not actually be reading the complete “small_input.cv”
> dataset, unless you use a different group.id overtime.
>
> Cheers,
> Gordon
>
> On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarand...@gmail.com)
> wrote:
>
> Hi,
>
> I am using flink-1.2 and reading data stream from Kafka (using
> FlinkKafkaConsumer08). I want to connect this data stream with another
> stream (read control stream) so as to do some filtering on the fly. After
> filtering, I am applying window function (tumbling/sliding event window)
> along with fold function. However, the window function does not get called.
>
> Any help to debug/fix this is greatly appreciated!
>
> Below is a reproducible code that one can run in IDE like IntelliJ or on
> flink cluster. You will need to have a running Kafka cluster (local or
> otherwise).
> Create a topic and add test data points-
>
> $KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper
> localhost:2181 --replication-factor 1 --partitions 1
> $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092
> --topic test < small_input.csv
>
> where small_input.csv contains the following lines-
>
> p1,10.0f,2017-03-14 16:01:01
> p1,10.0f,2017-03-14 16:01:02
> p1,10.0f,2017-03-14 16:01:03
> p1,10.0f,2017-03-14 16:01:04
> p1,10.0f,2017-03-14 16:01:05
> p1,10.0f,2017-03-14 16:01:10
> p1,10.0f,2017-03-14 16:01:11
> p1,10.0f,2017-03-14 16:01:12
> p1,10.0f,2017-03-14 16:01:40
> p1,10.0f,2017-03-14 16:01:50
>
> Now you can run the code given below. Note:
>
> 1) In this example, I am not reading control stream from Kafka (but issue
> can be reproduced with this code as well)
> 2) If instead of reading data stream from kafka, I create stream from
> elements (i.e. use getInput function instead of getKafkaInput function),
> the code works and window function is fired.
>
> Thanks,
> Tarandeep
>
>
>
> import org.apache.flink.api.common.functions.FoldFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple1;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
> import 
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
> import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
> import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
> import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import 
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> import org.apache.flink.util.Collector;
>
> import java.io.IOException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.*;
>
> public class Test3 {
>
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>         //DataStream<Product> product = getInput(env);
>         DataStream<Product> product = getKafkaInput(env);
>         DataStream<Tuple1<String>> control= getControl(env);
>
>         DataStream<Product> filteredStream = product.keyBy(0)
>                 .connect(control.keyBy(0))
>                 .flatMap(new CoFlatMapFunImpl());
>
>         DataStream<Product> watermarkedStream = 
> filteredStream.assignTimestampsAndWatermarks(
>                 getTimestampAssigner(Time.seconds(1))).setParallelism(3);
>
>         watermarkedStream.transform("WatermarkDebugger", 
> watermarkedStream.getType(), new WatermarkDebugger<Product>());
>
>         watermarkedStream
>                 .keyBy(0)
>                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>                 .fold(new NameCount("", 0), new FoldFunImpl(), new 
> WindowFunImpl())
>                 .print();
>
>         env.execute();
>     }
>
>     /**
>      * If instead of reading from Kafka, create stream from elements, the
>      * code works and window function is fired!
>      */
>     private static DataStream<Product> getInput(StreamExecutionEnvironment 
> env) {
>         return env.fromCollection(Arrays.asList(
>             new Product("p1",10.0f,"2017-03-14 16:01:01"),
>             new Product("p1",10.0f,"2017-03-14 16:01:02"),
>             new Product("p1",10.0f,"2017-03-14 16:01:03"),
>             new Product("p1",10.0f,"2017-03-14 16:01:04"),
>             new Product("p1",10.0f,"2017-03-14 16:01:05"),
>             new Product("p1",10.0f,"2017-03-14 16:01:10"),
>             new Product("p1",10.0f,"2017-03-14 16:01:11"),
>             new Product("p1",10.0f,"2017-03-14 16:01:12"),
>             new Product("p1",10.0f,"2017-03-14 16:01:40"),
>             new Product("p1",10.0f,"2017-03-14 16:01:50")
>         ));
>     }
>
>     private static DataStream<Product> 
> getKafkaInput(StreamExecutionEnvironment env) throws IOException {
>         DataStream<String> s = readKafkaStream("test", env);
>
>         return s.map(new MapFunction<String, Product>() {
>             @Override
>             public Product map(String s) throws Exception {
>                 String[] fields = s.split(",");
>                 return new Product(fields[0], Float.parseFloat(fields[1]), 
> fields[2]);
>             }
>         });
>     }
>
>     private static DataStream<Tuple1<String>> 
> getControl(StreamExecutionEnvironment env) {
>         return env.fromElements(new Tuple1<>("p1"));
>     }
>
>     private static class CoFlatMapFunImpl extends 
> RichCoFlatMapFunction<Product, Tuple1<String>,Product> {
>
>         private Set<String> productNames = new HashSet<>(Arrays.asList("p1"));
>
>         @Override
>         public void flatMap1(Product product, Collector<Product> collector) 
> throws Exception {
>             if (productNames.contains(product.f0)) {
>                 collector.collect(product);
>                 System.out.println("Retaining product " + product + " in data 
> stream");
>             }
>         }
>
>         @Override
>         public void flatMap2(Tuple1<String> t, Collector<Product> collector) 
> throws Exception {
>             productNames.add(t.f0);
>             System.out.println("Adding product to set:" + t.f0);
>         }
>     }
>
>     private static class FoldFunImpl implements 
> FoldFunction<Product,NameCount> {
>         @Override
>         public NameCount fold(NameCount current, Product p) throws Exception {
>             current.f0 = p.f0;
>             current.f1 += 1;
>             return current;
>         }
>     }
>
>     /**
>      * WINDOW FUNCTION NEVER GETS CALLED.
>      */
>     private static class WindowFunImpl extends 
> RichWindowFunction<NameCount,NameCount,Tuple,TimeWindow> {
>         @Override
>         public void apply(Tuple key, TimeWindow timeWindow, 
> Iterable<NameCount> iterable,
>                           Collector<NameCount> collector) throws Exception {
>             NameCount nc = iterable.iterator().next();
>             collector.collect(nc);
>             System.out.println("WINDOW: start time: " + new 
> Date(timeWindow.getStart()) + " " + nc);
>         }
>     }
>
>     private static BoundedOutOfOrdernessTimestampExtractor<Product> 
> getTimestampAssigner(final Time maxOutOfOrderness) {
>         final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 
> HH:mm:ss");
>
>         return new 
> BoundedOutOfOrdernessTimestampExtractor<Product>(maxOutOfOrderness) {
>             @Override
>             public long extractTimestamp(Product p) {
>                 long ts = 0L;
>                 try {
>                     ts = dateFormat.parse(p.f2).getTime();
>                 } catch (Exception e) {}
>                 return ts;
>             }
>         };
>     }
>
>     public static class Product extends Tuple3<String,Float,String> {
>         public Product() {}
>         public Product(String name, Float price, String dateTime) {
>             super(name, price, dateTime);
>         }
>     }
>
>     public static class NameCount extends Tuple2<String,Integer> {
>         public NameCount() {}
>         public NameCount(String name, Integer count) {
>             super(name, count);
>         }
>     }
>
>     private static DataStream<String> readKafkaStream(String topic, 
> StreamExecutionEnvironment env) throws IOException {
>
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers", "localhost:9092");
>         properties.setProperty("zookeeper.connect", "localhost:2181");
>         properties.setProperty("group.id", "group-0009");
>         properties.setProperty("auto.offset.reset", "smallest");
>         return env.addSource(new FlinkKafkaConsumer08<>(topic, new 
> SimpleStringSchema(), properties));
>     }
>
>     public static class WatermarkDebugger<T>
>             extends AbstractStreamOperator<T> implements 
> OneInputStreamOperator<T, T> {
>         private static final long serialVersionUID = 1L;
>
>         @Override
>         public void processElement(StreamRecord<T> element) throws Exception {
>             System.out.println("ELEMENT: " + element);
>             output.collect(element);
>         }
>
>         @Override
>         public void processWatermark(Watermark mark) throws Exception {
>             super.processWatermark(mark);
>             System.out.println("WM: " + mark);
>         }
>     }
> }
>
>
>

Reply via email to