Forgot to add that I'm using beam 2.21.0 and kafka version 2.3.1.
On 2020/06/19 22:41:10, Roger <[email protected]> wrote:
> Hello.>
> I am having issues with an unbounded streaming processing pipeline using>
> event times for processing. I can confirm that all the pieces of the>
> pipeline are running including the aggregation. The problem is that it
is>
> only running for one window. This makes me think I've done something
wrong>
> with the watermarking and/or timestamping. The value of the kafka object
is>
> an object with a timestamp set as an epoch string in milliseconds. For>
> example, "1592600152163". I have tried everything I can think of to
debug>
> including adding to logging statements but can only confirm that things>
> seem to be set correctly. I can see that the CustomTimeFieldPolicy
values>
> for currentWatermark are getting set. I can print out the aggregated
values>
> that ultimately get created as well. For some reason, though, the window>
> does not advance and I cannot continuously process the pipeline data.>
> Things just stop.>
>
> Any suggestions or ideas on what I've done wrong would be very much>
> appreciated.>
> Thanks for looking!>
> Roger>
>
>
>
> *Approach: Use withTimestampPolicyFactory from
org.apache.beam.sdk.io.kafka>
> here>
> <
https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory->>
> along>
> with withTimestamps.of*>
>
> pipeline>
> .apply("ReadFromKafka", KafkaIO.<Long, NewRelicRecord>read()>
> .withKeyDeserializer(LongDeserializer.class)>
> .withValueDeserializerAndCoder(>
> KafkaJsonDeserializer.class, SerializableCoder.of(NewRelicRecord.class))>
> .withBootstrapServers(options.getKafkaBrokers())>
> .withTopic(options.getKafkaTopic())>
> .withMaxReadTime(Duration.standardSeconds(Integer.parseInt(options.>
> getKafkaReadTimeout())))>
> .withConsumerConfigUpdates(ImmutableMap.of(>
> "group.id", options.getKafkaGroupId()))>
> .withTimestampPolicyFactory((tp, previousWaterMark) -> new>
> CustomFieldTimePolicy(previousWaterMark))>
> .withoutMetadata()>
> )>
> .apply("ExtractPayload", Values.<NewRelicRecord>create())>
> .apply("Append event time for PCollection records",>
> WithTimestamps.of((NewRelicRecord record) -> {>
>
> long millisecondsFromEpoch = Long.parseLong(record.getTimestamp());>
> DateTime jodaDateTime = new DateTime(millisecondsFromEpoch);>
> Instant instant = jodaDateTime.toInstant();>
>
> return instant;>
> }));>
>
> Where CustomFieldTimePolicy looks like this:>
>
> public class CustomFieldTimePolicy extends TimestampPolicy<Long,>
> NewRelicRecord> {>
>
> /** current watermark holder. */>
> private Instant currentWatermark;>
>
> /** watermark logic.>
> * @param previousWatermark the watermark used to determine current
watermark>
> */>
> public CustomFieldTimePolicy(final Optional<Instant> previousWatermark)
{>
> currentWatermark = previousWatermark.orElse(BoundedWindow.>
> TIMESTAMP_MIN_VALUE);>
> }>
>
> @Override>
> public final Instant getTimestampForRecord(>
> final PartitionContext ctx, final KafkaRecord<Long, NewRelicRecord>
record)>
> {>
>
> long millisecondsFromEpoch = Long.parseLong(record.getKV().getValue().>
> getTimestamp());>
> DateTime jodaDateTime = new DateTime(millisecondsFromEpoch);>
>
> Instant instant = jodaDateTime.toInstant();>
>
> if (instant.isAfter(currentWatermark)) {>
> currentWatermark = instant;>
> }>
>
> return instant;>
> }>
>
> @Override>
> public final Instant getWatermark(final PartitionContext ctx) {>
> return currentWatermark;>
> }>
> }>
>