Hi folks,

I am new to Beam and try to play with some example, I am running Beam 2.14
with Direct runner to read some files (I continue generated).

I am facing this error: Cannot output with timestamp
2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the
timestamp of the current input (2019-08-16T12:39:06.887Z) minus the allowed
skew (0 milliseconds). I searched online but still don't quite understand
it so I am asking here for some help.

A file has some past timestamp in it:
1565958615120
1565958615120
1565958615121

My code looks something like this:

   static class ReadWithEventTime extends DoFn<String, String> {
    @ProcessElement
    public void processElement(@Element String line,
OutputReceiver<String> out){
        out.outputWithTimestamp(line, new Instant(Long.parseLong(line)));
    }
}

    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline pipeline = Pipeline.create(options);

        String sourcePath = new File("files/").getPath();

        PCollection<String> data = pipeline.apply("ReadData",
                TextIO.read().from(sourcePath + "/test*")
                        .watchForNewFiles(Duration.standardSeconds(5),
Watch.Growth.<String>never()));

        data.apply("ReadWithEventTime", ParDo.of(new ReadWithEventTime()));

        pipeline.run().waitUntilFinish();

    }


I am trying to understand in the error message where "current input
(2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark
when I start my application? If that's the case, is there a way that I can
change the initial watermark?

Also, I can setup `withAllowedTimestampSkew` but it looks like it has
been deprecated.

Any suggestion would be appreciated. Thank you!

Best,
Chengzhi

Reply via email to