On Wed, Nov 7, 2018 at 5:04 AM Kaymak, Tobias <tobias.kay...@ricardo.ch> wrote:
> > On Tue, Nov 6, 2018 at 6:58 PM Raghu Angadi <rang...@google.com> wrote: > >> You seem to be reading from multiple topics and your timestamp policy is >> too simplistic (withTimestampFn() was never meant to be public API, I am >> sending a PR to deprecate it first and then will make it package private). >> So if you have two topics of different sizes, smaller topic might be >> consumed really fast pushing your watermark way ahead. This might or might >> be happening, but this is one of the dangers of using record timestamp for >> watermark (we should never do that). >> >> > To clarify: Test was done consuming from a single topic. I am using a field > inside the element's JSON to get the element's timestamp. Data in a topic > can > go way back to let's say 2017, but that data was pushed to Kafka in one go > and > the timestamp when it arrived is for example wednesday last week. > Sometimes the > producing side does not set the element's timestamp for Kafka (since it's > using > a library that does not support that yet), so it has to be parsed. > That is fine. We can ignore the timestamp as possible suspect for debugging this. Using custom timestamps from records is normal. Raghu. > I could also not fiddle with the timestamp at all and let the system > decide and > then in the BigQuery.IO partitioning step parse it and assign it to a > partition. Is this better? > > > >> On Tue, Nov 6, 2018 at 3:44 AM Kaymak, Tobias <tobias.kay...@ricardo.ch> >> wrote: >> >>> Hi, >>> >>> I am sharing my experience with you after trying to use the following >>> pipeline >>> logic (with Beam 2.6.0 - running on Flink 1.5): >>> >>> 1. Reading from KafkaIO, attaching a timestamp from each parsed element >>> 2. Filtering bad records >>> 3. Writing to a partitioned table in BigQuery with FILE_LOADS (batch >>> jobs) >>> every 15 minutes >>> >>> I had a working pipeline that does not write to BigQuery directly, but >>> to >>> Cloud Storage, so it's 3rd step was >>> >>> 3. Writing files to GCS in daily "subdirectories" >>> >>> I tried to rewrite the pipeline to reduce complexity: Resetting it's >>> state >>> should no longer be tied to thinking about what to delete on GCS, also >>> configurable refresh times directly from within the Pipeline was >>> something I >>> was looking for. The thing that I needed to change was the output in the >>> end, >>> so knew my parsing logic would not change and that should reduce the >>> risk. >>> >>> I tested the pipeline within our testcluster and it looked promising. >>> When I >>> deployed it last week everything seemed to go smoothly. On Friday I >>> noticed >>> that I had holes in the data: in the BigQuery tables there were missing >>> days >>> (tricky was that the recent days looked fine). (To be sure I reset the >>> pipeline >>> and read from the very beginning of each topic from Kafka. Within >>> different >>> runs, different days were missing.) I spent the weekend rolling back the >>> changes and trying to figure out what was going on. >>> >>> I didn't see any error in the logs (the log level was on WARNING for most >>> parts), but I thought, well maybe it's because there are too many >>> partitions >>> and BigQuery has a limit of 1000 partition operations per day. So I >>> started >>> reading from just 90 days in the past, but I still had holes (whole >>> days). >>> >>> I had a windowing step that I needed for the GCS pipeline, I became >>> aware that I >>> wouldn't need this anymore with BigQueryIO so I commented it out and >>> tested >>> again, without luck. >>> >>> What struck me was that the Flink Cluster didn't do any checkpoints for >>> the >>> pipeline that was using BigQueryIO - it does so when writing to GCS and I >>> tested it's failure logic there. Additionally the graph in Flink with >>> BigQueryIO becomes very complex, but this is something I expected. >>> >>> Here is the Pipeline code with the commented out windowing part: >>> >>> pipeline >>> .apply( >>> KafkaIO.<String, String>read() >>> .withBootstrapServers(bootstrap) >>> .withTopics(topics) >>> .withKeyDeserializer(StringDeserializer.class) >>> .withValueDeserializer(ConfigurableDeserializer.class) >>> .updateConsumerProperties( >>> >>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>> inputMessagesConfig)) >>> >>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest")) >>> .updateConsumerProperties(ImmutableMap.of("group.id", >>> "di-beam-consumers")) >>> >>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true")) >>> .withTimestampPolicyFactory( >>> TimestampPolicyFactory.withTimestampFn( >>> new >>> MessageTimestampExtractor(inputMessagesConfig))) >>> .withReadCommitted() >>> .commitOffsetsInFinalize()) >>> .apply(ParDo.of(new ToEventFn())) >>> // .apply( >>> // Window.into(new ZurichTimePartitioningWindowFn()) >>> // .triggering( >>> // Repeatedly.forever( >>> // AfterFirst.of( >>> // >>> AfterPane.elementCountAtLeast(bundleSize), >>> // >>> AfterProcessingTime.pastFirstElementInPane() >>> // >>> .plusDelayOf(refreshFrequency)))) >>> // .withAllowedLateness(Duration.standardDays(1)) >>> // .discardingFiredPanes()) >>> .apply( >>> BigQueryIO.<Event>write() >>> .withMethod(BigQueryIO.Write.Method.FILE_LOADS) >>> .withTriggeringFrequency(refreshFrequency) >>> .withNumFileShards(1) >>> .to(partitionedTableDynamicDestinations) >>> .withFormatFunction( >>> (SerializableFunction<Event, TableRow>) >>> KafkaToBigQuery::convertUserEventToTableRow) >>> >>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>> >>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); >>> >>> >>> I have the feeling that I must make some serious and dumb mistakes as I >>> know >>> the Beam framework is very robust. Thanks for taking the time to read >>> this. >>> >>> Tobi >>> >> > > -- > Tobias Kaymak > Data Engineer > > tobias.kay...@ricardo.ch > www.ricardo.ch >