Hi Theodore,

Thanks for your reply. This is just a simple example that I tried to
understand how event time works in Beam. I could have more fields and I
would have an event time for each of record, so I tried to let Beam know
which filed is the event time to use for later windowing and computation.

I think we you mentioned the probable reason sounds reasonable, I am still
trying to figure out in the error message "current input
(2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it.

Thanks a lot for your help.

-- Chengzhi

On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu <[email protected]> wrote:

> Hi Chengzhi,
>
> Are you simply trying to emit the timestamp onward? Why not just use
> `out.output` with an PCollection<Instant>?
>
> static class ReadWithEventTime extends DoFn<String, String> {
>     @DoFn.ProcessElement
>     public void processElement(@Element String line, OutputReceiver<Instant> 
> out){
>         out.output(new Instant(Long.parseLong(line)));
>     }
> }
>
> You can also output the line itself as a PCollection<String>. If you line
> has additional information to parse, consider a KeyValue Pair
> https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html
>  where
> you can emit both some parsed context of the string and the timestamp.
>
> The probable reason why outputWithTimestamp doesn't work with older times
> is that the timestamp emitted is used specifically for windowing and for
> streaming type Data pipelines to determine which window each record belongs
> for aggregations.
>
> -Theo
>
>
> On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao <[email protected]>
> wrote:
>
>> Hi folks,
>>
>> I am new to Beam and try to play with some example, I am running Beam
>> 2.14 with Direct runner to read some files (I continue generated).
>>
>> I am facing this error: Cannot output with timestamp
>> 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the
>> timestamp of the current input (2019-08-16T12:39:06.887Z) minus the allowed
>> skew (0 milliseconds). I searched online but still don't quite understand
>> it so I am asking here for some help.
>>
>> A file has some past timestamp in it:
>> 1565958615120
>> 1565958615120
>> 1565958615121
>>
>> My code looks something like this:
>>
>>    static class ReadWithEventTime extends DoFn<String, String> {
>>     @ProcessElement
>>     public void processElement(@Element String line, OutputReceiver<String> 
>> out){
>>         out.outputWithTimestamp(line, new Instant(Long.parseLong(line)));
>>     }
>> }
>>
>>     public static void main(String[] args) {
>>         PipelineOptions options = PipelineOptionsFactory.create();
>>         Pipeline pipeline = Pipeline.create(options);
>>
>>         String sourcePath = new File("files/").getPath();
>>
>>         PCollection<String> data = pipeline.apply("ReadData",
>>                 TextIO.read().from(sourcePath + "/test*")
>>                         .watchForNewFiles(Duration.standardSeconds(5), 
>> Watch.Growth.<String>never()));
>>
>>         data.apply("ReadWithEventTime", ParDo.of(new ReadWithEventTime()));
>>
>>         pipeline.run().waitUntilFinish();
>>
>>     }
>>
>>
>> I am trying to understand in the error message where "current input
>> (2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark
>> when I start my application? If that's the case, is there a way that I can
>> change the initial watermark?
>>
>> Also, I can setup `withAllowedTimestampSkew` but it looks like it has
>> been deprecated.
>>
>> Any suggestion would be appreciated. Thank you!
>>
>> Best,
>> Chengzhi
>>
>>
>

Reply via email to