I'm sorry.

I was wrong about that.

Using the continuously method on FileIO does not create a watermark.

In conclusion, it doesn't work as I intended.

To fix this, I tried switching to KafkaIO and this time it seems to work as I 
intended.

I apologize if I confused you with the wrong information.

Thank you.



The modified pipeline looks like this


This time it works just as I intended!





pipeline.apply(
                        KafkaIO.<String, Forex>read()
                                .withBootstrapServers("node03.ming.com:6667")
                                .withTopic("forex")
                                .withKeyDeserializer(StringDeserializer.class)
                                
.withValueDeserializerAndCoder(ForexDeserializer.class, ForexCoder.of())
                                .withTimestampPolicyFactory((tp, 
previousWatermark) -> new TimestampPolicy<String, Forex>() {
                                    Instant maxTimestamp = 
BoundedWindow.TIMESTAMP_MIN_VALUE;

                                    @NotNull
                                    @Override
                                    public Instant 
getTimestampForRecord(PartitionContext ctx, KafkaRecord<String, Forex> record) {
                                        final Instant stamp = 
record.getKV().getValue().getTimestamp();
                                        if (stamp.isAfter(maxTimestamp)) {
                                            maxTimestamp = stamp;
                                        }

                                        return stamp;
                                    }

                                    @NotNull
                                    @Override
                                    public Instant 
getWatermark(PartitionContext ctx) {
//                                        log.info("maxTimestamp = {}", 
this.maxTimestamp);
                                        return this.maxTimestamp.minus(100);
                                    }
                                })
                )
                .apply(MapElements.into(TypeDescriptor.of(Forex.class))
                        .via(e -> e.getKV().getValue()))
                .apply(WithTimestamps.of(Forex::getTimestamp))
                .apply(WithKeys.of(Forex::getPid))
                .setCoder(KvCoder.of(StringUtf8Coder.of(), ForexCoder.of()))
                .apply(
                        Window.<KV<String, 
Forex>>into(FixedWindows.of(Duration.standardHours(1L)))
                                .triggering(
                                        AfterWatermark.pastEndOfWindow()
                                                
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30L)))
                                                
.withLateFirings(AfterPane.elementCountAtLeast(1))
                                )
                                
.withAllowedLateness(Duration.standardSeconds(10L))
                                .accumulatingFiredPanes()
                )
                .apply(ParDo.of(new DoFn<KV<String, Forex>, KV<String, 
Double>>() {

                    @StateId("buffer")
                    private final StateSpec<@NonNull BagState<Forex>> 
bufferSpec =
                            StateSpecs.bag(ForexCoder.of());

                    @StateId("minTimestamp")
                    private final StateSpec<@NonNull CombiningState<Long, 
long[], Long>> minTimestampSpec =
                            StateSpecs.combining(Min.ofLongs());

                    @TimerId("timer")
                    private final TimerSpec timerSpec =
                            TimerSpecs.timer(TimeDomain.EVENT_TIME);

                    @ProcessElement
                    @RequiresTimeSortedInput
                    public void process(
                            @Timestamp Instant timestamp,
                            @Element KV<String, Forex> element,
                            @StateId("buffer") BagState<Forex> bufferState,
                            @AlwaysFetched @StateId("minTimestamp") 
CombiningState<Long, long[], Long> minTimestampState,
                            @TimerId("timer") Timer timer
                    ) {
                        log.info("[process] at {}", timestamp);
                        final long minTimestamp =
                                Math.min(minTimestampState.read(), 
timestamp.getMillis());

                        
timer.set(Instant.ofEpochMilli(minTimestamp).plus(Duration.standardMinutes(1L)));

                        minTimestampState.add(timestamp.getMillis());
                        bufferState.add(element.getValue());
                    }

                    @OnTimer("timer")
                    public void onTimer(
                            @Key String key,
                            @AlwaysFetched @StateId("buffer") BagState<Forex> 
bufferState,
                            @AlwaysFetched @StateId("minTimestamp") 
CombiningState<Long, long[], Long> minTimestampState,
                            @Timestamp Instant timestamp,
                            OutputReceiver<KV<String, Double>> output,
                            @TimerId("timer") Timer timer
                    ) {
                        log.info("[onTimer] [{}] at {},", key, timestamp);
                        log.info(
                                "[onTimer] key = {}, minTimestamp = {}, 
maxTimestamp = {}",
                                key,
                                
Streams.stream(bufferState.read()).min(Comparator.comparing(Forex::getTimestamp))
                                        .map(Forex::getTimestamp).get()
                                ,
                                
Streams.stream(bufferState.read()).max(Comparator.comparing(Forex::getTimestamp))
                                        .map(Forex::getTimestamp).get()
                        );
                        final double average = 
Streams.stream(bufferState.read())
                                .mapToDouble(Forex::getLastNumeric)
                                .average()
                                .orElse(.0);

                        output.output(KV.of(key, average));
                        minTimestampState.clear();
                        bufferState.clear();
                        timer.clear();
                    }

                }))
                .apply(Reify.windows())
                .apply(LogUtils.of());

정보: [onTimer] key = 8830, minTimestamp = 2024-03-20T03:58:20.000Z, maxTimestamp 
= 2024-03-20T03:59:15.000Z
정보: [onTimer] key = 2095, minTimestamp = 2024-03-20T03:58:20.000Z, maxTimestamp 
= 2024-03-20T03:58:39.000Z
정보: [onTimer] key = 54, minTimestamp = 2024-03-20T03:58:20.000Z, maxTimestamp = 
2024-03-20T03:59:02.000Z
정보: [onTimer] key = 7, minTimestamp = 2024-03-20T03:58:20.000Z, maxTimestamp = 
2024-03-20T03:59:12.000Z
정보: [onTimer] key = 2055, minTimestamp = 2024-03-20T03:58:20.000Z, maxTimestamp 
= 2024-03-20T03:58:39.000Z
정보: [onTimer] key = 2096, minTimestamp = 2024-03-20T03:58:20.000Z, maxTimestamp 
= 2024-03-20T03:58:39.000Z






Reply via email to