Hi folks,
I am new to Beam and try to play with some example, I am running Beam 2.14
with Direct runner to read some files (I continue generated).
I am facing this error: Cannot output with timestamp
2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the
timestamp of the current input (2019-08-16T12:39:06.887Z) minus the allowed
skew (0 milliseconds). I searched online but still don't quite understand
it so I am asking here for some help.
A file has some past timestamp in it:
1565958615120
1565958615120
1565958615121
My code looks something like this:
static class ReadWithEventTime extends DoFn<String, String> {
@ProcessElement
public void processElement(@Element String line,
OutputReceiver<String> out){
out.outputWithTimestamp(line, new Instant(Long.parseLong(line)));
}
}
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
String sourcePath = new File("files/").getPath();
PCollection<String> data = pipeline.apply("ReadData",
TextIO.read().from(sourcePath + "/test*")
.watchForNewFiles(Duration.standardSeconds(5),
Watch.Growth.<String>never()));
data.apply("ReadWithEventTime", ParDo.of(new ReadWithEventTime()));
pipeline.run().waitUntilFinish();
}
I am trying to understand in the error message where "current input
(2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark
when I start my application? If that's the case, is there a way that I can
change the initial watermark?
Also, I can setup `withAllowedTimestampSkew` but it looks like it has
been deprecated.
Any suggestion would be appreciated. Thank you!
Best,
Chengzhi