Hi Flink Team,

I am seeing one of the out file for on my task manager is dumping lot of
data.
Not sure, why this is happening. All the data that is getting dumped in out
file is ideally what *parsedInput *stream should be getting.



Here is the flink program that is executing:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);

DataStream<String> rawInput = env.addSource(new FlinkKafkaConsumer010<>(
                                        "event-ft",
                                        new SimpleStringSchema(),
                                        kafkaProps).setStartFromLatest());

DataStream<String> input2 = rawInput
                            .map(new KafkaMsgReads());

DataStream<EventRec> parsedInput = input2
                                    .flatMap(new Splitter())
                                    .assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<FTRecord>(Time.seconds(2)) {
                                    @Override
                                    public long
extractTimestamp(EventRec record) {
                                        return
record.getmTimeStamp()/TT_SCALE_FACTOR;
                                    }
                                }).rebalance().map(new RawInputCounter());

parsedInput
        .keyBy("mflowHashLSB","mflowHashMSB")
        
.window(SlidingEventTimeWindows.of(Time.milliseconds(1000),Time.milliseconds(950)))
        .allowedLateness(Time.seconds(1))
        .apply(new CRWindow());

parsedInput.writeUsingOutputFormat(new DiscardingOutputFormat<>());

env.execute();


Here is the definition of *CRWindow* class:


public static class CRWindow  implements WindowFunction<FTRecord,
FTFlow, Tuple, TimeWindow> {

    @Override
    public void apply(Tuple key, TimeWindow window, Iterable<FTRecord>
ftRecords, Collector<FTFlow> collector) {
        return;
    }

}


Also, is there any elaborate documentation of windowing mechanism
available? I am intereseted in using windowing with ability to push
the events from one window to future window. Similar funcationality
exist in storm for pushing an event to subsequent window.


Thanks

-Ashish

Reply via email to