I'm sorry.
I was wrong about that.
Using the continuously method on FileIO does not create a watermark.
In conclusion, it doesn't work as I intended.
To fix this, I tried switching to KafkaIO and this time it seems to work as I
intended.
I apologize if I confused you with the wrong information.
Thank you.
The modified pipeline looks like this
This time it works just as I intended!
pipeline.apply(
KafkaIO.<String, Forex>read()
.withBootstrapServers("node03.ming.com:6667")
.withTopic("forex")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder(ForexDeserializer.class, ForexCoder.of())
.withTimestampPolicyFactory((tp,
previousWatermark) -> new TimestampPolicy<String, Forex>() {
Instant maxTimestamp =
BoundedWindow.TIMESTAMP_MIN_VALUE;
@NotNull
@Override
public Instant
getTimestampForRecord(PartitionContext ctx, KafkaRecord<String, Forex> record) {
final Instant stamp =
record.getKV().getValue().getTimestamp();
if (stamp.isAfter(maxTimestamp)) {
maxTimestamp = stamp;
}
return stamp;
}
@NotNull
@Override
public Instant
getWatermark(PartitionContext ctx) {
// log.info("maxTimestamp = {}",
this.maxTimestamp);
return this.maxTimestamp.minus(100);
}
})
)
.apply(MapElements.into(TypeDescriptor.of(Forex.class))
.via(e -> e.getKV().getValue()))
.apply(WithTimestamps.of(Forex::getTimestamp))
.apply(WithKeys.of(Forex::getPid))
.setCoder(KvCoder.of(StringUtf8Coder.of(), ForexCoder.of()))
.apply(
Window.<KV<String,
Forex>>into(FixedWindows.of(Duration.standardHours(1L)))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30L)))
.withLateFirings(AfterPane.elementCountAtLeast(1))
)
.withAllowedLateness(Duration.standardSeconds(10L))
.accumulatingFiredPanes()
)
.apply(ParDo.of(new DoFn<KV<String, Forex>, KV<String,
Double>>() {
@StateId("buffer")
private final StateSpec<@NonNull BagState<Forex>>
bufferSpec =
StateSpecs.bag(ForexCoder.of());
@StateId("minTimestamp")
private final StateSpec<@NonNull CombiningState<Long,
long[], Long>> minTimestampSpec =
StateSpecs.combining(Min.ofLongs());
@TimerId("timer")
private final TimerSpec timerSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
@RequiresTimeSortedInput
public void process(
@Timestamp Instant timestamp,
@Element KV<String, Forex> element,
@StateId("buffer") BagState<Forex> bufferState,
@AlwaysFetched @StateId("minTimestamp")
CombiningState<Long, long[], Long> minTimestampState,
@TimerId("timer") Timer timer
) {
log.info("[process] at {}", timestamp);
final long minTimestamp =
Math.min(minTimestampState.read(),
timestamp.getMillis());
timer.set(Instant.ofEpochMilli(minTimestamp).plus(Duration.standardMinutes(1L)));
minTimestampState.add(timestamp.getMillis());
bufferState.add(element.getValue());
}
@OnTimer("timer")
public void onTimer(
@Key String key,
@AlwaysFetched @StateId("buffer") BagState<Forex>
bufferState,
@AlwaysFetched @StateId("minTimestamp")
CombiningState<Long, long[], Long> minTimestampState,
@Timestamp Instant timestamp,
OutputReceiver<KV<String, Double>> output,
@TimerId("timer") Timer timer
) {
log.info("[onTimer] [{}] at {},", key, timestamp);
log.info(
"[onTimer] key = {}, minTimestamp = {},
maxTimestamp = {}",
key,
Streams.stream(bufferState.read()).min(Comparator.comparing(Forex::getTimestamp))
.map(Forex::getTimestamp).get()
,
Streams.stream(bufferState.read()).max(Comparator.comparing(Forex::getTimestamp))
.map(Forex::getTimestamp).get()
);
final double average =
Streams.stream(bufferState.read())
.mapToDouble(Forex::getLastNumeric)
.average()
.orElse(.0);
output.output(KV.of(key, average));
minTimestampState.clear();
bufferState.clear();
timer.clear();
}
}))
.apply(Reify.windows())
.apply(LogUtils.of());
정보: [onTimer] key = 8830, minTimestamp = 2024-03-20T03:58:20.000Z, maxTimestamp
= 2024-03-20T03:59:15.000Z
정보: [onTimer] key = 2095, minTimestamp = 2024-03-20T03:58:20.000Z, maxTimestamp
= 2024-03-20T03:58:39.000Z
정보: [onTimer] key = 54, minTimestamp = 2024-03-20T03:58:20.000Z, maxTimestamp =
2024-03-20T03:59:02.000Z
정보: [onTimer] key = 7, minTimestamp = 2024-03-20T03:58:20.000Z, maxTimestamp =
2024-03-20T03:59:12.000Z
정보: [onTimer] key = 2055, minTimestamp = 2024-03-20T03:58:20.000Z, maxTimestamp
= 2024-03-20T03:58:39.000Z
정보: [onTimer] key = 2096, minTimestamp = 2024-03-20T03:58:20.000Z, maxTimestamp
= 2024-03-20T03:58:39.000Z