Data is read from Kafka and yes I use different group id every time I run the code. I have put break points and print statements to verify that.
Also, if I don't connect with control stream the window function works. - Tarandeep > On Mar 16, 2017, at 1:12 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > > Hi Tarandeep, > > I haven’t looked at the rest of the code yet, but my first guess is that you > might not be reading any data from Kafka at all: > >> private static DataStream<String> readKafkaStream(String topic, >> StreamExecutionEnvironment env) throws IOException { >> >> Properties properties = new Properties(); >> properties.setProperty("bootstrap.servers", "localhost:9092"); >> properties.setProperty("zookeeper.connect", "localhost:2181"); >> properties.setProperty("group.id", "group-0009"); >> properties.setProperty("auto.offset.reset", "smallest"); >> return env.addSource(new FlinkKafkaConsumer08<>(topic, new >> SimpleStringSchema(), properties)); >> } > > > Have you tried using a different “group.id” everytime you’re re-running the > job? > Note that the “auto.offset.reset” value is only respected when there aren’t > any offsets for the group committed in Kafka. > So you might not actually be reading the complete “small_input.cv” dataset, > unless you use a different group.id overtime. > > Cheers, > Gordon > >> On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarand...@gmail.com) wrote: >> >> Hi, >> >> I am using flink-1.2 and reading data stream from Kafka (using >> FlinkKafkaConsumer08). I want to connect this data stream with another >> stream (read control stream) so as to do some filtering on the fly. After >> filtering, I am applying window function (tumbling/sliding event window) >> along with fold function. However, the window function does not get called. >> >> Any help to debug/fix this is greatly appreciated! >> >> Below is a reproducible code that one can run in IDE like IntelliJ or on >> flink cluster. You will need to have a running Kafka cluster (local or >> otherwise). >> Create a topic and add test data points- >> >> $KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper >> localhost:2181 --replication-factor 1 --partitions 1 >> $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 >> --topic test < small_input.csv >> >> where small_input.csv contains the following lines- >> >> p1,10.0f,2017-03-14 16:01:01 >> p1,10.0f,2017-03-14 16:01:02 >> p1,10.0f,2017-03-14 16:01:03 >> p1,10.0f,2017-03-14 16:01:04 >> p1,10.0f,2017-03-14 16:01:05 >> p1,10.0f,2017-03-14 16:01:10 >> p1,10.0f,2017-03-14 16:01:11 >> p1,10.0f,2017-03-14 16:01:12 >> p1,10.0f,2017-03-14 16:01:40 >> p1,10.0f,2017-03-14 16:01:50 >> >> Now you can run the code given below. Note: >> >> 1) In this example, I am not reading control stream from Kafka (but issue >> can be reproduced with this code as well) >> 2) If instead of reading data stream from kafka, I create stream from >> elements (i.e. use getInput function instead of getKafkaInput function), the >> code works and window function is fired. >> >> Thanks, >> Tarandeep >> >> >> >> import org.apache.flink.api.common.functions.FoldFunction; >> import org.apache.flink.api.common.functions.MapFunction; >> import org.apache.flink.api.java.tuple.Tuple; >> import org.apache.flink.api.java.tuple.Tuple1; >> import org.apache.flink.api.java.tuple.Tuple2; >> import org.apache.flink.api.java.tuple.Tuple3; >> import org.apache.flink.streaming.api.TimeCharacteristic; >> import org.apache.flink.streaming.api.datastream.DataStream; >> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; >> import >> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; >> import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; >> import org.apache.flink.streaming.api.operators.AbstractStreamOperator; >> import org.apache.flink.streaming.api.operators.OneInputStreamOperator; >> import org.apache.flink.streaming.api.watermark.Watermark; >> import >> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; >> import org.apache.flink.streaming.api.windowing.time.Time; >> import org.apache.flink.streaming.api.windowing.windows.TimeWindow; >> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; >> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; >> import org.apache.flink.streaming.util.serialization.SimpleStringSchema; >> import org.apache.flink.util.Collector; >> >> import java.io.IOException; >> import java.text.DateFormat; >> import java.text.SimpleDateFormat; >> import java.util.*; >> >> public class Test3 { >> >> public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> >> //DataStream<Product> product = getInput(env); >> DataStream<Product> product = getKafkaInput(env); >> DataStream<Tuple1<String>> control= getControl(env); >> >> DataStream<Product> filteredStream = product.keyBy(0) >> .connect(control.keyBy(0)) >> .flatMap(new CoFlatMapFunImpl()); >> >> DataStream<Product> watermarkedStream = >> filteredStream.assignTimestampsAndWatermarks( >> getTimestampAssigner(Time.seconds(1))).setParallelism(3); >> >> watermarkedStream.transform("WatermarkDebugger", >> watermarkedStream.getType(), new WatermarkDebugger<Product>()); >> >> watermarkedStream >> .keyBy(0) >> .window(TumblingEventTimeWindows.of(Time.seconds(5))) >> .fold(new NameCount("", 0), new FoldFunImpl(), new >> WindowFunImpl()) >> .print(); >> >> env.execute(); >> } >> >> /** >> * If instead of reading from Kafka, create stream from elements, the >> * code works and window function is fired! >> */ >> private static DataStream<Product> getInput(StreamExecutionEnvironment >> env) { >> return env.fromCollection(Arrays.asList( >> new Product("p1",10.0f,"2017-03-14 16:01:01"), >> new Product("p1",10.0f,"2017-03-14 16:01:02"), >> new Product("p1",10.0f,"2017-03-14 16:01:03"), >> new Product("p1",10.0f,"2017-03-14 16:01:04"), >> new Product("p1",10.0f,"2017-03-14 16:01:05"), >> new Product("p1",10.0f,"2017-03-14 16:01:10"), >> new Product("p1",10.0f,"2017-03-14 16:01:11"), >> new Product("p1",10.0f,"2017-03-14 16:01:12"), >> new Product("p1",10.0f,"2017-03-14 16:01:40"), >> new Product("p1",10.0f,"2017-03-14 16:01:50") >> )); >> } >> >> private static DataStream<Product> >> getKafkaInput(StreamExecutionEnvironment env) throws IOException { >> DataStream<String> s = readKafkaStream("test", env); >> >> return s.map(new MapFunction<String, Product>() { >> @Override >> public Product map(String s) throws Exception { >> String[] fields = s.split(","); >> return new Product(fields[0], Float.parseFloat(fields[1]), >> fields[2]); >> } >> }); >> } >> >> private static DataStream<Tuple1<String>> >> getControl(StreamExecutionEnvironment env) { >> return env.fromElements(new Tuple1<>("p1")); >> } >> >> private static class CoFlatMapFunImpl extends >> RichCoFlatMapFunction<Product, Tuple1<String>,Product> { >> >> private Set<String> productNames = new >> HashSet<>(Arrays.asList("p1")); >> >> @Override >> public void flatMap1(Product product, Collector<Product> collector) >> throws Exception { >> if (productNames.contains(product.f0)) { >> collector.collect(product); >> System.out.println("Retaining product " + product + " in >> data stream"); >> } >> } >> >> @Override >> public void flatMap2(Tuple1<String> t, Collector<Product> collector) >> throws Exception { >> productNames.add(t.f0); >> System.out.println("Adding product to set:" + t.f0); >> } >> } >> >> private static class FoldFunImpl implements >> FoldFunction<Product,NameCount> { >> @Override >> public NameCount fold(NameCount current, Product p) throws Exception >> { >> current.f0 = p.f0; >> current.f1 += 1; >> return current; >> } >> } >> >> /** >> * WINDOW FUNCTION NEVER GETS CALLED. >> */ >> private static class WindowFunImpl extends >> RichWindowFunction<NameCount,NameCount,Tuple,TimeWindow> { >> @Override >> public void apply(Tuple key, TimeWindow timeWindow, >> Iterable<NameCount> iterable, >> Collector<NameCount> collector) throws Exception { >> NameCount nc = iterable.iterator().next(); >> collector.collect(nc); >> System.out.println("WINDOW: start time: " + new >> Date(timeWindow.getStart()) + " " + nc); >> } >> } >> >> private static BoundedOutOfOrdernessTimestampExtractor<Product> >> getTimestampAssigner(final Time maxOutOfOrderness) { >> final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd >> HH:mm:ss"); >> >> return new >> BoundedOutOfOrdernessTimestampExtractor<Product>(maxOutOfOrderness) { >> @Override >> public long extractTimestamp(Product p) { >> long ts = 0L; >> try { >> ts = dateFormat.parse(p.f2).getTime(); >> } catch (Exception e) {} >> return ts; >> } >> }; >> } >> >> public static class Product extends Tuple3<String,Float,String> { >> public Product() {} >> public Product(String name, Float price, String dateTime) { >> super(name, price, dateTime); >> } >> } >> >> public static class NameCount extends Tuple2<String,Integer> { >> public NameCount() {} >> public NameCount(String name, Integer count) { >> super(name, count); >> } >> } >> >> private static DataStream<String> readKafkaStream(String topic, >> StreamExecutionEnvironment env) throws IOException { >> >> Properties properties = new Properties(); >> properties.setProperty("bootstrap.servers", "localhost:9092"); >> properties.setProperty("zookeeper.connect", "localhost:2181"); >> properties.setProperty("group.id", "group-0009"); >> properties.setProperty("auto.offset.reset", "smallest"); >> return env.addSource(new FlinkKafkaConsumer08<>(topic, new >> SimpleStringSchema(), properties)); >> } >> >> public static class WatermarkDebugger<T> >> extends AbstractStreamOperator<T> implements >> OneInputStreamOperator<T, T> { >> private static final long serialVersionUID = 1L; >> >> @Override >> public void processElement(StreamRecord<T> element) throws Exception >> { >> System.out.println("ELEMENT: " + element); >> output.collect(element); >> } >> >> @Override >> public void processWatermark(Watermark mark) throws Exception { >> super.processWatermark(mark); >> System.out.println("WM: " + mark); >> } >> } >> } >> >>