Anyone? Any suggestions what could be going wrong or what I am doing wrong?
Thanks, Tarandeep On Thu, Mar 16, 2017 at 7:34 AM, Tarandeep Singh <tarand...@gmail.com> wrote: > Data is read from Kafka and yes I use different group id every time I run > the code. I have put break points and print statements to verify that. > > Also, if I don't connect with control stream the window function works. > > - Tarandeep > > On Mar 16, 2017, at 1:12 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > > Hi Tarandeep, > > I haven’t looked at the rest of the code yet, but my first guess is that > you might not be reading any data from Kafka at all: > > private static DataStream<String> readKafkaStream(String topic, > StreamExecutionEnvironment env) throws IOException { > > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", "localhost:9092"); > properties.setProperty("zookeeper.connect", "localhost:2181"); > properties.setProperty("group.id", "group-0009"); > properties.setProperty("auto.offset.reset", "smallest"); > return env.addSource(new FlinkKafkaConsumer08<>(topic, new > SimpleStringSchema(), properties)); > } > > > Have you tried using a different “group.id” everytime you’re re-running > the job? > Note that the “auto.offset.reset” value is only respected when there > aren’t any offsets for the group committed in Kafka. > So you might not actually be reading the complete “small_input.cv” > dataset, unless you use a different group.id overtime. > > Cheers, > Gordon > > On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarand...@gmail.com) > wrote: > > Hi, > > I am using flink-1.2 and reading data stream from Kafka (using > FlinkKafkaConsumer08). I want to connect this data stream with another > stream (read control stream) so as to do some filtering on the fly. After > filtering, I am applying window function (tumbling/sliding event window) > along with fold function. However, the window function does not get called. > > Any help to debug/fix this is greatly appreciated! > > Below is a reproducible code that one can run in IDE like IntelliJ or on > flink cluster. You will need to have a running Kafka cluster (local or > otherwise). > Create a topic and add test data points- > > $KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper > localhost:2181 --replication-factor 1 --partitions 1 > $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 > --topic test < small_input.csv > > where small_input.csv contains the following lines- > > p1,10.0f,2017-03-14 16:01:01 > p1,10.0f,2017-03-14 16:01:02 > p1,10.0f,2017-03-14 16:01:03 > p1,10.0f,2017-03-14 16:01:04 > p1,10.0f,2017-03-14 16:01:05 > p1,10.0f,2017-03-14 16:01:10 > p1,10.0f,2017-03-14 16:01:11 > p1,10.0f,2017-03-14 16:01:12 > p1,10.0f,2017-03-14 16:01:40 > p1,10.0f,2017-03-14 16:01:50 > > Now you can run the code given below. Note: > > 1) In this example, I am not reading control stream from Kafka (but issue > can be reproduced with this code as well) > 2) If instead of reading data stream from kafka, I create stream from > elements (i.e. use getInput function instead of getKafkaInput function), > the code works and window function is fired. > > Thanks, > Tarandeep > > > > import org.apache.flink.api.common.functions.FoldFunction; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.java.tuple.Tuple; > import org.apache.flink.api.java.tuple.Tuple1; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.tuple.Tuple3; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; > import > org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; > import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; > import org.apache.flink.streaming.api.operators.AbstractStreamOperator; > import org.apache.flink.streaming.api.operators.OneInputStreamOperator; > import org.apache.flink.streaming.api.watermark.Watermark; > import > org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; > import org.apache.flink.streaming.api.windowing.time.Time; > import org.apache.flink.streaming.api.windowing.windows.TimeWindow; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; > import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; > import org.apache.flink.streaming.util.serialization.SimpleStringSchema; > import org.apache.flink.util.Collector; > > import java.io.IOException; > import java.text.DateFormat; > import java.text.SimpleDateFormat; > import java.util.*; > > public class Test3 { > > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > //DataStream<Product> product = getInput(env); > DataStream<Product> product = getKafkaInput(env); > DataStream<Tuple1<String>> control= getControl(env); > > DataStream<Product> filteredStream = product.keyBy(0) > .connect(control.keyBy(0)) > .flatMap(new CoFlatMapFunImpl()); > > DataStream<Product> watermarkedStream = > filteredStream.assignTimestampsAndWatermarks( > getTimestampAssigner(Time.seconds(1))).setParallelism(3); > > watermarkedStream.transform("WatermarkDebugger", > watermarkedStream.getType(), new WatermarkDebugger<Product>()); > > watermarkedStream > .keyBy(0) > .window(TumblingEventTimeWindows.of(Time.seconds(5))) > .fold(new NameCount("", 0), new FoldFunImpl(), new > WindowFunImpl()) > .print(); > > env.execute(); > } > > /** > * If instead of reading from Kafka, create stream from elements, the > * code works and window function is fired! > */ > private static DataStream<Product> getInput(StreamExecutionEnvironment > env) { > return env.fromCollection(Arrays.asList( > new Product("p1",10.0f,"2017-03-14 16:01:01"), > new Product("p1",10.0f,"2017-03-14 16:01:02"), > new Product("p1",10.0f,"2017-03-14 16:01:03"), > new Product("p1",10.0f,"2017-03-14 16:01:04"), > new Product("p1",10.0f,"2017-03-14 16:01:05"), > new Product("p1",10.0f,"2017-03-14 16:01:10"), > new Product("p1",10.0f,"2017-03-14 16:01:11"), > new Product("p1",10.0f,"2017-03-14 16:01:12"), > new Product("p1",10.0f,"2017-03-14 16:01:40"), > new Product("p1",10.0f,"2017-03-14 16:01:50") > )); > } > > private static DataStream<Product> > getKafkaInput(StreamExecutionEnvironment env) throws IOException { > DataStream<String> s = readKafkaStream("test", env); > > return s.map(new MapFunction<String, Product>() { > @Override > public Product map(String s) throws Exception { > String[] fields = s.split(","); > return new Product(fields[0], Float.parseFloat(fields[1]), > fields[2]); > } > }); > } > > private static DataStream<Tuple1<String>> > getControl(StreamExecutionEnvironment env) { > return env.fromElements(new Tuple1<>("p1")); > } > > private static class CoFlatMapFunImpl extends > RichCoFlatMapFunction<Product, Tuple1<String>,Product> { > > private Set<String> productNames = new HashSet<>(Arrays.asList("p1")); > > @Override > public void flatMap1(Product product, Collector<Product> collector) > throws Exception { > if (productNames.contains(product.f0)) { > collector.collect(product); > System.out.println("Retaining product " + product + " in data > stream"); > } > } > > @Override > public void flatMap2(Tuple1<String> t, Collector<Product> collector) > throws Exception { > productNames.add(t.f0); > System.out.println("Adding product to set:" + t.f0); > } > } > > private static class FoldFunImpl implements > FoldFunction<Product,NameCount> { > @Override > public NameCount fold(NameCount current, Product p) throws Exception { > current.f0 = p.f0; > current.f1 += 1; > return current; > } > } > > /** > * WINDOW FUNCTION NEVER GETS CALLED. > */ > private static class WindowFunImpl extends > RichWindowFunction<NameCount,NameCount,Tuple,TimeWindow> { > @Override > public void apply(Tuple key, TimeWindow timeWindow, > Iterable<NameCount> iterable, > Collector<NameCount> collector) throws Exception { > NameCount nc = iterable.iterator().next(); > collector.collect(nc); > System.out.println("WINDOW: start time: " + new > Date(timeWindow.getStart()) + " " + nc); > } > } > > private static BoundedOutOfOrdernessTimestampExtractor<Product> > getTimestampAssigner(final Time maxOutOfOrderness) { > final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd > HH:mm:ss"); > > return new > BoundedOutOfOrdernessTimestampExtractor<Product>(maxOutOfOrderness) { > @Override > public long extractTimestamp(Product p) { > long ts = 0L; > try { > ts = dateFormat.parse(p.f2).getTime(); > } catch (Exception e) {} > return ts; > } > }; > } > > public static class Product extends Tuple3<String,Float,String> { > public Product() {} > public Product(String name, Float price, String dateTime) { > super(name, price, dateTime); > } > } > > public static class NameCount extends Tuple2<String,Integer> { > public NameCount() {} > public NameCount(String name, Integer count) { > super(name, count); > } > } > > private static DataStream<String> readKafkaStream(String topic, > StreamExecutionEnvironment env) throws IOException { > > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", "localhost:9092"); > properties.setProperty("zookeeper.connect", "localhost:2181"); > properties.setProperty("group.id", "group-0009"); > properties.setProperty("auto.offset.reset", "smallest"); > return env.addSource(new FlinkKafkaConsumer08<>(topic, new > SimpleStringSchema(), properties)); > } > > public static class WatermarkDebugger<T> > extends AbstractStreamOperator<T> implements > OneInputStreamOperator<T, T> { > private static final long serialVersionUID = 1L; > > @Override > public void processElement(StreamRecord<T> element) throws Exception { > System.out.println("ELEMENT: " + element); > output.collect(element); > } > > @Override > public void processWatermark(Watermark mark) throws Exception { > super.processWatermark(mark); > System.out.println("WM: " + mark); > } > } > } > > >