I have simple flink stream program, where I am using socket as my
continuous source
I have window size of 2 seconds.
Somehow my window process function is not triggering and even if I pass
events in any order, flink is not ignoring
I can see the output only when I kill my socket , please find the code
snippet below
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
DataStream<Price> price = env.socketTextStream("localhost",
9998).uid("price source").map(new MapFunction<String, Price>() {
@Override
public Price map(String s) throws Exception {
return new Price(s.split(",")[0],
LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
BigDecimal(s.split(",")[3]), s.split(",")[4], new
BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
}
}
);
DataStream<Price> priceStream = price
.assignTimestampsAndWatermarks(WatermarkStrategy.<Price>forMonotonousTimestamps()
.withTimestampAssigner((p,timestamp) ->
{
ZoneId zoneId = ZoneId.systemDefault();
long epoch = p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
System.out.println(epoch);
return epoch;
}))
.keyBy(new KeySelector<Price, String>() {
@Override
public String getKey(Price price) throws Exception {
return price.getPerformanceId();
}
}).window(TumblingEventTimeWindows.of(Time.seconds(2)))
.process(new ProcessWindowFunction<Price, Price, String,
TimeWindow>() {
@Override
public void process(String s, Context context,
Iterable<Price> iterable, Collector<Price> collector) throws Exception {
System.out.println(context.window().getStart()+
"Current watermark: "+context.window().getEnd());
Price p1 = null ;
for(Price p : iterable)
{
System.out.println(p.toString());
p1= p;
}
collector.collect(p1);
}
});
priceStream.writeAsText("c:\\ab.txt");
also data I am inputting are
p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01
--
---Regards---
Sagar Bandal
This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.