Hi, I am using flink-1.2 and reading data stream from Kafka (using FlinkKafkaConsumer08). I want to connect this data stream with another stream (read control stream) so as to do some filtering on the fly. After filtering, I am applying window function (tumbling/sliding event window) along with fold function. However, the window function does not get called.
Any help to debug/fix this is greatly appreciated! Below is a reproducible code that one can run in IDE like IntelliJ or on flink cluster. You will need to have a running Kafka cluster (local or otherwise). Create a topic and add test data points- $KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper localhost:2181 --replication-factor 1 --partitions 1 $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test < small_input.csv where small_input.csv contains the following lines- p1,10.0f,2017-03-14 16:01:01 p1,10.0f,2017-03-14 16:01:02 p1,10.0f,2017-03-14 16:01:03 p1,10.0f,2017-03-14 16:01:04 p1,10.0f,2017-03-14 16:01:05 p1,10.0f,2017-03-14 16:01:10 p1,10.0f,2017-03-14 16:01:11 p1,10.0f,2017-03-14 16:01:12 p1,10.0f,2017-03-14 16:01:40 p1,10.0f,2017-03-14 16:01:50 Now you can run the code given below. Note: 1) In this example, I am not reading control stream from Kafka (but issue can be reproduced with this code as well) 2) If instead of reading data stream from kafka, I create stream from elements (i.e. use getInput function instead of getKafkaInput function), the code works and window function is fired. Thanks, Tarandeep import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.Collector; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.*; public class Test3 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //DataStream<Product> product = getInput(env); DataStream<Product> product = getKafkaInput(env); DataStream<Tuple1<String>> control= getControl(env); DataStream<Product> filteredStream = product.keyBy(0) .connect(control.keyBy(0)) .flatMap(new CoFlatMapFunImpl()); DataStream<Product> watermarkedStream = filteredStream.assignTimestampsAndWatermarks( getTimestampAssigner(Time.seconds(1))).setParallelism(3); watermarkedStream.transform("WatermarkDebugger", watermarkedStream.getType(), new WatermarkDebugger<Product>()); watermarkedStream .keyBy(0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .fold(new NameCount("", 0), new FoldFunImpl(), new WindowFunImpl()) .print(); env.execute(); } /** * If instead of reading from Kafka, create stream from elements, the * code works and window function is fired! */ private static DataStream<Product> getInput(StreamExecutionEnvironment env) { return env.fromCollection(Arrays.asList( new Product("p1",10.0f,"2017-03-14 16:01:01"), new Product("p1",10.0f,"2017-03-14 16:01:02"), new Product("p1",10.0f,"2017-03-14 16:01:03"), new Product("p1",10.0f,"2017-03-14 16:01:04"), new Product("p1",10.0f,"2017-03-14 16:01:05"), new Product("p1",10.0f,"2017-03-14 16:01:10"), new Product("p1",10.0f,"2017-03-14 16:01:11"), new Product("p1",10.0f,"2017-03-14 16:01:12"), new Product("p1",10.0f,"2017-03-14 16:01:40"), new Product("p1",10.0f,"2017-03-14 16:01:50") )); } private static DataStream<Product> getKafkaInput(StreamExecutionEnvironment env) throws IOException { DataStream<String> s = readKafkaStream("test", env); return s.map(new MapFunction<String, Product>() { @Override public Product map(String s) throws Exception { String[] fields = s.split(","); return new Product(fields[0], Float.parseFloat(fields[1]), fields[2]); } }); } private static DataStream<Tuple1<String>> getControl(StreamExecutionEnvironment env) { return env.fromElements(new Tuple1<>("p1")); } private static class CoFlatMapFunImpl extends RichCoFlatMapFunction<Product, Tuple1<String>,Product> { private Set<String> productNames = new HashSet<>(Arrays.asList("p1")); @Override public void flatMap1(Product product, Collector<Product> collector) throws Exception { if (productNames.contains(product.f0)) { collector.collect(product); System.out.println("Retaining product " + product + " in data stream"); } } @Override public void flatMap2(Tuple1<String> t, Collector<Product> collector) throws Exception { productNames.add(t.f0); System.out.println("Adding product to set:" + t.f0); } } private static class FoldFunImpl implements FoldFunction<Product,NameCount> { @Override public NameCount fold(NameCount current, Product p) throws Exception { current.f0 = p.f0; current.f1 += 1; return current; } } /** * WINDOW FUNCTION NEVER GETS CALLED. */ private static class WindowFunImpl extends RichWindowFunction<NameCount,NameCount,Tuple,TimeWindow> { @Override public void apply(Tuple key, TimeWindow timeWindow, Iterable<NameCount> iterable, Collector<NameCount> collector) throws Exception { NameCount nc = iterable.iterator().next(); collector.collect(nc); System.out.println("WINDOW: start time: " + new Date(timeWindow.getStart()) + " " + nc); } } private static BoundedOutOfOrdernessTimestampExtractor<Product> getTimestampAssigner(final Time maxOutOfOrderness) { final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return new BoundedOutOfOrdernessTimestampExtractor<Product>(maxOutOfOrderness) { @Override public long extractTimestamp(Product p) { long ts = 0L; try { ts = dateFormat.parse(p.f2).getTime(); } catch (Exception e) {} return ts; } }; } public static class Product extends Tuple3<String,Float,String> { public Product() {} public Product(String name, Float price, String dateTime) { super(name, price, dateTime); } } public static class NameCount extends Tuple2<String,Integer> { public NameCount() {} public NameCount(String name, Integer count) { super(name, count); } } private static DataStream<String> readKafkaStream(String topic, StreamExecutionEnvironment env) throws IOException { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "group-0009"); properties.setProperty("auto.offset.reset", "smallest"); return env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(), properties)); } public static class WatermarkDebugger<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> { private static final long serialVersionUID = 1L; @Override public void processElement(StreamRecord<T> element) throws Exception { System.out.println("ELEMENT: " + element); output.collect(element); } @Override public void processWatermark(Watermark mark) throws Exception { super.processWatermark(mark); System.out.println("WM: " + mark); } } }