Hi,

I am using flink-1.2 and reading data stream from Kafka (using
FlinkKafkaConsumer08). I want to connect this data stream with another
stream (read control stream) so as to do some filtering on the fly. After
filtering, I am applying window function (tumbling/sliding event window)
along with fold function. However, the window function does not get called.

Any help to debug/fix this is greatly appreciated!

Below is a reproducible code that one can run in IDE like IntelliJ or on
flink cluster. You will need to have a running Kafka cluster (local or
otherwise).
Create a topic and add test data points-

$KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper
localhost:2181 --replication-factor 1 --partitions 1
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092
--topic test < small_input.csv

where small_input.csv contains the following lines-

p1,10.0f,2017-03-14 16:01:01
p1,10.0f,2017-03-14 16:01:02
p1,10.0f,2017-03-14 16:01:03
p1,10.0f,2017-03-14 16:01:04
p1,10.0f,2017-03-14 16:01:05
p1,10.0f,2017-03-14 16:01:10
p1,10.0f,2017-03-14 16:01:11
p1,10.0f,2017-03-14 16:01:12
p1,10.0f,2017-03-14 16:01:40
p1,10.0f,2017-03-14 16:01:50

Now you can run the code given below. Note:

1) In this example, I am not reading control stream from Kafka (but issue
can be reproduced with this code as well)
2) If instead of reading data stream from kafka, I create stream from
elements (i.e. use getInput function instead of getKafkaInput function),
the code works and window function is fired.

Thanks,
Tarandeep



import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;

public class Test3 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //DataStream<Product> product = getInput(env);
        DataStream<Product> product = getKafkaInput(env);
        DataStream<Tuple1<String>> control= getControl(env);

        DataStream<Product> filteredStream = product.keyBy(0)
                .connect(control.keyBy(0))
                .flatMap(new CoFlatMapFunImpl());

        DataStream<Product> watermarkedStream =
filteredStream.assignTimestampsAndWatermarks(
                getTimestampAssigner(Time.seconds(1))).setParallelism(3);

        watermarkedStream.transform("WatermarkDebugger",
watermarkedStream.getType(), new WatermarkDebugger<Product>());

        watermarkedStream
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .fold(new NameCount("", 0), new FoldFunImpl(), new
WindowFunImpl())
                .print();

        env.execute();
    }

    /**
     * If instead of reading from Kafka, create stream from elements, the
     * code works and window function is fired!
     */
    private static DataStream<Product>
getInput(StreamExecutionEnvironment env) {
        return env.fromCollection(Arrays.asList(
            new Product("p1",10.0f,"2017-03-14 16:01:01"),
            new Product("p1",10.0f,"2017-03-14 16:01:02"),
            new Product("p1",10.0f,"2017-03-14 16:01:03"),
            new Product("p1",10.0f,"2017-03-14 16:01:04"),
            new Product("p1",10.0f,"2017-03-14 16:01:05"),
            new Product("p1",10.0f,"2017-03-14 16:01:10"),
            new Product("p1",10.0f,"2017-03-14 16:01:11"),
            new Product("p1",10.0f,"2017-03-14 16:01:12"),
            new Product("p1",10.0f,"2017-03-14 16:01:40"),
            new Product("p1",10.0f,"2017-03-14 16:01:50")
        ));
    }

    private static DataStream<Product>
getKafkaInput(StreamExecutionEnvironment env) throws IOException {
        DataStream<String> s = readKafkaStream("test", env);

        return s.map(new MapFunction<String, Product>() {
            @Override
            public Product map(String s) throws Exception {
                String[] fields = s.split(",");
                return new Product(fields[0],
Float.parseFloat(fields[1]), fields[2]);
            }
        });
    }

    private static DataStream<Tuple1<String>>
getControl(StreamExecutionEnvironment env) {
        return env.fromElements(new Tuple1<>("p1"));
    }

    private static class CoFlatMapFunImpl extends
RichCoFlatMapFunction<Product, Tuple1<String>,Product> {

        private Set<String> productNames = new HashSet<>(Arrays.asList("p1"));

        @Override
        public void flatMap1(Product product, Collector<Product>
collector) throws Exception {
            if (productNames.contains(product.f0)) {
                collector.collect(product);
                System.out.println("Retaining product " + product + "
in data stream");
            }
        }

        @Override
        public void flatMap2(Tuple1<String> t, Collector<Product>
collector) throws Exception {
            productNames.add(t.f0);
            System.out.println("Adding product to set:" + t.f0);
        }
    }

    private static class FoldFunImpl implements
FoldFunction<Product,NameCount> {
        @Override
        public NameCount fold(NameCount current, Product p) throws Exception {
            current.f0 = p.f0;
            current.f1 += 1;
            return current;
        }
    }

    /**
     * WINDOW FUNCTION NEVER GETS CALLED.
     */
    private static class WindowFunImpl extends
RichWindowFunction<NameCount,NameCount,Tuple,TimeWindow> {
        @Override
        public void apply(Tuple key, TimeWindow timeWindow,
Iterable<NameCount> iterable,
                          Collector<NameCount> collector) throws Exception {
            NameCount nc = iterable.iterator().next();
            collector.collect(nc);
            System.out.println("WINDOW: start time: " + new
Date(timeWindow.getStart()) + " " + nc);
        }
    }

    private static BoundedOutOfOrdernessTimestampExtractor<Product>
getTimestampAssigner(final Time maxOutOfOrderness) {
        final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss");

        return new
BoundedOutOfOrdernessTimestampExtractor<Product>(maxOutOfOrderness) {
            @Override
            public long extractTimestamp(Product p) {
                long ts = 0L;
                try {
                    ts = dateFormat.parse(p.f2).getTime();
                } catch (Exception e) {}
                return ts;
            }
        };
    }

    public static class Product extends Tuple3<String,Float,String> {
        public Product() {}
        public Product(String name, Float price, String dateTime) {
            super(name, price, dateTime);
        }
    }

    public static class NameCount extends Tuple2<String,Integer> {
        public NameCount() {}
        public NameCount(String name, Integer count) {
            super(name, count);
        }
    }

    private static DataStream<String> readKafkaStream(String topic,
StreamExecutionEnvironment env) throws IOException {

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "group-0009");
        properties.setProperty("auto.offset.reset", "smallest");
        return env.addSource(new FlinkKafkaConsumer08<>(topic, new
SimpleStringSchema(), properties));
    }

    public static class WatermarkDebugger<T>
            extends AbstractStreamOperator<T> implements
OneInputStreamOperator<T, T> {
        private static final long serialVersionUID = 1L;

        @Override
        public void processElement(StreamRecord<T> element) throws Exception {
            System.out.println("ELEMENT: " + element);
            output.collect(element);
        }

        @Override
        public void processWatermark(Watermark mark) throws Exception {
            super.processWatermark(mark);
            System.out.println("WM: " + mark);
        }
    }
}

Reply via email to