tableEnv.executeSql(DDLSourceSQLManager.cdcCreateTableSQL("order_info"));
tableEnv
.toRetractStream(tableEnv.from("order_info"), Row.class)
.filter((FilterFunction<Tuple2<Boolean, Row>>)
booleanRowTuple2 -> booleanRowTuple2.f0)
.map((MapFunction<Tuple2<Boolean, Row>, Row>)
booleanRowTuple2 -> booleanRowTuple2.f1)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Row>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner(((element, recordTimestamp)
-> System.currentTimeMillis())))
.keyBy((KeySelector<Row, String>) row ->
row.getField("consignee").toString())
.window(TumblingEventTimeWindows.of(Time.seconds(100)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
.process(new ProcessWindowFunction<Row,
Tuple2<TimeWindow, Long>,
String,
TimeWindow>() {
@Override
public void process(String s, Context context,
Iterable<Row> elements, Collector<Tuple2<TimeWindow, Long>> out) throws
Exception {
Long count = 0L;
for (Row element : elements) {
count += 1;
}
out.collect(new Tuple2(context.window(),
count));
}
})
.print();
;
streamEnv.execute();
--
Sent from: http://apache-flink.147419.n8.nabble.com/