On Fri, Nov 9, 2018 at 1:23 AM Raghu Angadi <rang...@google.com> wrote:
> >>> That is fine. We can ignore the timestamp as possible suspect for >>> debugging this. Using custom timestamps from records is normal. >>> >>> >> Could you clarify of what you meant with "withTimestampFn() was never >> meant to >> be public"? I am using it to attach the right timestamp to an element to >> be >> able to window into days with the local time zone in the windowing >> function. If >> this is not used in the correct way could you tell me how I can do it >> better? >> > > The problem with watermark part of the policy. A source needs to provide > both a timestamp for a record as well as a watermark for the stream. A > TimestampPolicy in KafkaIO is responsible for both of these for each Kafka > partition. > > `withTimestampFn()` lets user provide a function to extract timestamp. But > for watermark, it just uses most recent record's timestamp. Say record A > has timestamp 9:00:01 and arrives at 9:00:05, and B has a timestamp of > 8:59:58 and arrives at 9:00:15. > That implies once is A is processed at 9:00:05, your pipelines watermark > could jump to 9:00:01, that implies a hourly window for [8:00, 9:00) will > close. When B arrives 10 seconds later, it would be considered late. The > problem is not just that such watermark policy is too brittle, it is the > fact that users have no idea that is happening. > > Deprecation documentation for this API[1] suggests using > `CustomTimestampPolicyWithLimitedDelay()` [2] in stead. > > [1]: > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L100 > [2]: > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L29 > > Thank you so much, that explanation was very helpful! I will adapt to the new approach and continue digging into the missing partition's problem afterwards. Tobi. > Raghu. > > After the rollback I am busy making the existing pipeline to GCS so robust >> that >> it never fails to deliver all files so that there is always a backup. As >> I am >> under a lot of pressure right now I don't want to fuck it up with >> easy-to-avoid >> mistakes and the GCS pipeline has the same logic, but just a different >> sink >> that uses a FileIO to write out different days to different "folders". >> >> Thank you, >> >> Tobi >> >> >> >>> Raghu. >>> >>> >>>> I could also not fiddle with the timestamp at all and let the system >>>> decide and >>>> then in the BigQuery.IO partitioning step parse it and assign it to a >>>> partition. Is this better? >>>> >>>> >>>> >>>>> On Tue, Nov 6, 2018 at 3:44 AM Kaymak, Tobias < >>>>> tobias.kay...@ricardo.ch> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I am sharing my experience with you after trying to use the following >>>>>> pipeline >>>>>> logic (with Beam 2.6.0 - running on Flink 1.5): >>>>>> >>>>>> 1. Reading from KafkaIO, attaching a timestamp from each parsed >>>>>> element >>>>>> 2. Filtering bad records >>>>>> 3. Writing to a partitioned table in BigQuery with FILE_LOADS (batch >>>>>> jobs) >>>>>> every 15 minutes >>>>>> >>>>>> I had a working pipeline that does not write to BigQuery directly, >>>>>> but to >>>>>> Cloud Storage, so it's 3rd step was >>>>>> >>>>>> 3. Writing files to GCS in daily "subdirectories" >>>>>> >>>>>> I tried to rewrite the pipeline to reduce complexity: Resetting it's >>>>>> state >>>>>> should no longer be tied to thinking about what to delete on GCS, also >>>>>> configurable refresh times directly from within the Pipeline was >>>>>> something I >>>>>> was looking for. The thing that I needed to change was the output in >>>>>> the end, >>>>>> so knew my parsing logic would not change and that should reduce the >>>>>> risk. >>>>>> >>>>>> I tested the pipeline within our testcluster and it looked promising. >>>>>> When I >>>>>> deployed it last week everything seemed to go smoothly. On Friday I >>>>>> noticed >>>>>> that I had holes in the data: in the BigQuery tables there were >>>>>> missing days >>>>>> (tricky was that the recent days looked fine). (To be sure I reset >>>>>> the pipeline >>>>>> and read from the very beginning of each topic from Kafka. Within >>>>>> different >>>>>> runs, different days were missing.) I spent the weekend rolling back >>>>>> the >>>>>> changes and trying to figure out what was going on. >>>>>> >>>>>> I didn't see any error in the logs (the log level was on WARNING for >>>>>> most >>>>>> parts), but I thought, well maybe it's because there are too many >>>>>> partitions >>>>>> and BigQuery has a limit of 1000 partition operations per day. So I >>>>>> started >>>>>> reading from just 90 days in the past, but I still had holes (whole >>>>>> days). >>>>>> >>>>>> I had a windowing step that I needed for the GCS pipeline, I became >>>>>> aware that I >>>>>> wouldn't need this anymore with BigQueryIO so I commented it out and >>>>>> tested >>>>>> again, without luck. >>>>>> >>>>>> What struck me was that the Flink Cluster didn't do any checkpoints >>>>>> for the >>>>>> pipeline that was using BigQueryIO - it does so when writing to GCS >>>>>> and I >>>>>> tested it's failure logic there. Additionally the graph in Flink with >>>>>> BigQueryIO becomes very complex, but this is something I expected. >>>>>> >>>>>> Here is the Pipeline code with the commented out windowing part: >>>>>> >>>>>> pipeline >>>>>> .apply( >>>>>> KafkaIO.<String, String>read() >>>>>> .withBootstrapServers(bootstrap) >>>>>> .withTopics(topics) >>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>>> .updateConsumerProperties( >>>>>> >>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>>> inputMessagesConfig)) >>>>>> >>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>>> "earliest")) >>>>>> .updateConsumerProperties(ImmutableMap.of("group.id", >>>>>> "di-beam-consumers")) >>>>>> >>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true")) >>>>>> .withTimestampPolicyFactory( >>>>>> TimestampPolicyFactory.withTimestampFn( >>>>>> new >>>>>> MessageTimestampExtractor(inputMessagesConfig))) >>>>>> .withReadCommitted() >>>>>> .commitOffsetsInFinalize()) >>>>>> .apply(ParDo.of(new ToEventFn())) >>>>>> // .apply( >>>>>> // Window.into(new >>>>>> ZurichTimePartitioningWindowFn()) >>>>>> // .triggering( >>>>>> // Repeatedly.forever( >>>>>> // AfterFirst.of( >>>>>> // >>>>>> AfterPane.elementCountAtLeast(bundleSize), >>>>>> // >>>>>> AfterProcessingTime.pastFirstElementInPane() >>>>>> // >>>>>> .plusDelayOf(refreshFrequency)))) >>>>>> // >>>>>> .withAllowedLateness(Duration.standardDays(1)) >>>>>> // .discardingFiredPanes()) >>>>>> .apply( >>>>>> BigQueryIO.<Event>write() >>>>>> .withMethod(BigQueryIO.Write.Method.FILE_LOADS) >>>>>> .withTriggeringFrequency(refreshFrequency) >>>>>> .withNumFileShards(1) >>>>>> .to(partitionedTableDynamicDestinations) >>>>>> .withFormatFunction( >>>>>> (SerializableFunction<Event, TableRow>) >>>>>> KafkaToBigQuery::convertUserEventToTableRow) >>>>>> >>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>>>>> >>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); >>>>>> >>>>>> >>>>>> I have the feeling that I must make some serious and dumb mistakes as >>>>>> I know >>>>>> the Beam framework is very robust. Thanks for taking the time to read >>>>>> this. >>>>>> >>>>>> Tobi >>>>>> >>>>> >>>> >>>> -- >>>> Tobias Kaymak >>>> Data Engineer >>>> >>>> tobias.kay...@ricardo.ch >>>> www.ricardo.ch >>>> >>> >>