Hi all,

I connected my data stream with my control stream and create event time
tumbling window, everything works fine. But when I add .broadcast()
function to the control stream, the window function doesn't work anymore.

I'm running that on my local, the code is here:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = FlinkUtil.createExecutionEnvironment(args);
    DataStream<String> jsonEventStream = JsonEventStreamReader.readStream(env);
    DataStream<String> controlStream =
JsonEventStreamReader.readControlStream(env);//.broadcast();   With
broadcast, window function doesn't work
    jsonEventStream
        .flatMap(new StrToTuplesFlatMapFunImpl())
        .connect(controlStream)
        .flatMap(new DataFilterFunImpl())
        .assignTimestampsAndWatermarks(getTimestampsWatermarksAssigner())
        .keyBy(0, 1, 2, 3)
        .timeWindow(Time.seconds(WINDOW_LENGTH))
        .allowedLateness(Time.seconds(WINDOW_LATENESS))
        .reduce(new ReduceFunImpl(), new WindowFunImpl())
        .addSink(new InfluxDBSink(INFLUXDB_DB));

    env.execute();
}

Reply via email to