Data is read from Kafka and yes I use different group id every time I run the 
code. I have put break points and print statements to verify that.

Also, if I don't connect with control stream the window function works. 

- Tarandeep

> On Mar 16, 2017, at 1:12 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
> 
> Hi Tarandeep,
> 
> I haven’t looked at the rest of the code yet, but my first guess is that you 
> might not be reading any data from Kafka at all:
> 
>> private static DataStream<String> readKafkaStream(String topic, 
>> StreamExecutionEnvironment env) throws IOException {
>> 
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>         properties.setProperty("zookeeper.connect", "localhost:2181");
>>         properties.setProperty("group.id", "group-0009");
>>         properties.setProperty("auto.offset.reset", "smallest");
>>         return env.addSource(new FlinkKafkaConsumer08<>(topic, new 
>> SimpleStringSchema(), properties));
>>     }
> 
> 
> Have you tried using a different “group.id” everytime you’re re-running the 
> job?
> Note that the “auto.offset.reset” value is only respected when there aren’t 
> any offsets for the group committed in Kafka.
> So you might not actually be reading the complete “small_input.cv” dataset, 
> unless you use a different group.id overtime.
> 
> Cheers,
> Gordon
> 
>> On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarand...@gmail.com) wrote:
>> 
>> Hi,
>> 
>> I am using flink-1.2 and reading data stream from Kafka (using 
>> FlinkKafkaConsumer08). I want to connect this data stream with another 
>> stream (read control stream) so as to do some filtering on the fly. After 
>> filtering, I am applying window function (tumbling/sliding event window) 
>> along with fold function. However, the window function does not get called.
>> 
>> Any help to debug/fix this is greatly appreciated!
>> 
>> Below is a reproducible code that one can run in IDE like IntelliJ or on 
>> flink cluster. You will need to have a running Kafka cluster (local or 
>> otherwise).
>> Create a topic and add test data points-
>> 
>> $KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper 
>> localhost:2181 --replication-factor 1 --partitions 1
>> $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 
>> --topic test < small_input.csv
>> 
>> where small_input.csv contains the following lines-
>> 
>> p1,10.0f,2017-03-14 16:01:01
>> p1,10.0f,2017-03-14 16:01:02
>> p1,10.0f,2017-03-14 16:01:03
>> p1,10.0f,2017-03-14 16:01:04
>> p1,10.0f,2017-03-14 16:01:05
>> p1,10.0f,2017-03-14 16:01:10
>> p1,10.0f,2017-03-14 16:01:11
>> p1,10.0f,2017-03-14 16:01:12
>> p1,10.0f,2017-03-14 16:01:40
>> p1,10.0f,2017-03-14 16:01:50
>> 
>> Now you can run the code given below. Note:
>> 
>> 1) In this example, I am not reading control stream from Kafka (but issue 
>> can be reproduced with this code as well)
>> 2) If instead of reading data stream from kafka, I create stream from 
>> elements (i.e. use getInput function instead of getKafkaInput function), the 
>> code works and window function is fired.
>> 
>> Thanks,
>> Tarandeep
>> 
>> 
>> 
>> import org.apache.flink.api.common.functions.FoldFunction;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.api.java.tuple.Tuple;
>> import org.apache.flink.api.java.tuple.Tuple1;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.api.java.tuple.Tuple3;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
>> import 
>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>> import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
>> import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
>> import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
>> import org.apache.flink.streaming.api.watermark.Watermark;
>> import 
>> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
>> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>> import org.apache.flink.util.Collector;
>> 
>> import java.io.IOException;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.*;
>> 
>> public class Test3 {
>> 
>>     public static void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> 
>>         //DataStream<Product> product = getInput(env);
>>         DataStream<Product> product = getKafkaInput(env);
>>         DataStream<Tuple1<String>> control= getControl(env);
>> 
>>         DataStream<Product> filteredStream = product.keyBy(0)
>>                 .connect(control.keyBy(0))
>>                 .flatMap(new CoFlatMapFunImpl());
>> 
>>         DataStream<Product> watermarkedStream = 
>> filteredStream.assignTimestampsAndWatermarks(
>>                 getTimestampAssigner(Time.seconds(1))).setParallelism(3);
>> 
>>         watermarkedStream.transform("WatermarkDebugger", 
>> watermarkedStream.getType(), new WatermarkDebugger<Product>());
>> 
>>         watermarkedStream
>>                 .keyBy(0)
>>                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>>                 .fold(new NameCount("", 0), new FoldFunImpl(), new 
>> WindowFunImpl())
>>                 .print();
>> 
>>         env.execute();
>>     }
>> 
>>     /**
>>      * If instead of reading from Kafka, create stream from elements, the
>>      * code works and window function is fired!
>>      */
>>     private static DataStream<Product> getInput(StreamExecutionEnvironment 
>> env) {
>>         return env.fromCollection(Arrays.asList(
>>             new Product("p1",10.0f,"2017-03-14 16:01:01"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:02"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:03"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:04"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:05"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:10"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:11"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:12"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:40"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:50")
>>         ));
>>     }
>> 
>>     private static DataStream<Product> 
>> getKafkaInput(StreamExecutionEnvironment env) throws IOException {
>>         DataStream<String> s = readKafkaStream("test", env);
>> 
>>         return s.map(new MapFunction<String, Product>() {
>>             @Override
>>             public Product map(String s) throws Exception {
>>                 String[] fields = s.split(",");
>>                 return new Product(fields[0], Float.parseFloat(fields[1]), 
>> fields[2]);
>>             }
>>         });
>>     }
>> 
>>     private static DataStream<Tuple1<String>> 
>> getControl(StreamExecutionEnvironment env) {
>>         return env.fromElements(new Tuple1<>("p1"));
>>     }
>> 
>>     private static class CoFlatMapFunImpl extends 
>> RichCoFlatMapFunction<Product, Tuple1<String>,Product> {
>> 
>>         private Set<String> productNames = new 
>> HashSet<>(Arrays.asList("p1"));
>> 
>>         @Override
>>         public void flatMap1(Product product, Collector<Product> collector) 
>> throws Exception {
>>             if (productNames.contains(product.f0)) {
>>                 collector.collect(product);
>>                 System.out.println("Retaining product " + product + " in 
>> data stream");
>>             }
>>         }
>> 
>>         @Override
>>         public void flatMap2(Tuple1<String> t, Collector<Product> collector) 
>> throws Exception {
>>             productNames.add(t.f0);
>>             System.out.println("Adding product to set:" + t.f0);
>>         }
>>     }
>> 
>>     private static class FoldFunImpl implements 
>> FoldFunction<Product,NameCount> {
>>         @Override
>>         public NameCount fold(NameCount current, Product p) throws Exception 
>> {
>>             current.f0 = p.f0;
>>             current.f1 += 1;
>>             return current;
>>         }
>>     }
>> 
>>     /**
>>      * WINDOW FUNCTION NEVER GETS CALLED.
>>      */
>>     private static class WindowFunImpl extends 
>> RichWindowFunction<NameCount,NameCount,Tuple,TimeWindow> {
>>         @Override
>>         public void apply(Tuple key, TimeWindow timeWindow, 
>> Iterable<NameCount> iterable,
>>                           Collector<NameCount> collector) throws Exception {
>>             NameCount nc = iterable.iterator().next();
>>             collector.collect(nc);
>>             System.out.println("WINDOW: start time: " + new 
>> Date(timeWindow.getStart()) + " " + nc);
>>         }
>>     }
>> 
>>     private static BoundedOutOfOrdernessTimestampExtractor<Product> 
>> getTimestampAssigner(final Time maxOutOfOrderness) {
>>         final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 
>> HH:mm:ss");
>> 
>>         return new 
>> BoundedOutOfOrdernessTimestampExtractor<Product>(maxOutOfOrderness) {
>>             @Override
>>             public long extractTimestamp(Product p) {
>>                 long ts = 0L;
>>                 try {
>>                     ts = dateFormat.parse(p.f2).getTime();
>>                 } catch (Exception e) {}
>>                 return ts;
>>             }
>>         };
>>     }
>> 
>>     public static class Product extends Tuple3<String,Float,String> {
>>         public Product() {}
>>         public Product(String name, Float price, String dateTime) {
>>             super(name, price, dateTime);
>>         }
>>     }
>> 
>>     public static class NameCount extends Tuple2<String,Integer> {
>>         public NameCount() {}
>>         public NameCount(String name, Integer count) {
>>             super(name, count);
>>         }
>>     }
>> 
>>     private static DataStream<String> readKafkaStream(String topic, 
>> StreamExecutionEnvironment env) throws IOException {
>> 
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>         properties.setProperty("zookeeper.connect", "localhost:2181");
>>         properties.setProperty("group.id", "group-0009");
>>         properties.setProperty("auto.offset.reset", "smallest");
>>         return env.addSource(new FlinkKafkaConsumer08<>(topic, new 
>> SimpleStringSchema(), properties));
>>     }
>> 
>>     public static class WatermarkDebugger<T>
>>             extends AbstractStreamOperator<T> implements 
>> OneInputStreamOperator<T, T> {
>>         private static final long serialVersionUID = 1L;
>> 
>>         @Override
>>         public void processElement(StreamRecord<T> element) throws Exception 
>> {
>>             System.out.println("ELEMENT: " + element);
>>             output.collect(element);
>>         }
>> 
>>         @Override
>>         public void processWatermark(Watermark mark) throws Exception {
>>             super.processWatermark(mark);
>>             System.out.println("WM: " + mark);
>>         }
>>     }
>> }
>> 
>> 

Reply via email to