On Tue, Nov 6, 2018 at 6:06 PM Lukasz Cwik <lc...@google.com> wrote: > Since your using FILE_LOADS as the BigQueryIO write method, did you see a > file being created for each partitioned table based upon the requested > triggering frequency? > > Figuring out whether the problem was upstream from creating the file or > downstream after the file was created would help debug this issue. > > That is a very good point! I have to admit that it's not so easy for me to figure out postmortem if this happened for each one. As every load job I can see in the logs was successful, I need to dig through the temporary file structure in the GCS bucket used by BigQuery.IO before loading the tables, but that is quite a challenge.
> On Tue, Nov 6, 2018 at 3:44 AM Kaymak, Tobias <tobias.kay...@ricardo.ch> > wrote: > >> Hi, >> >> I am sharing my experience with you after trying to use the following >> pipeline >> logic (with Beam 2.6.0 - running on Flink 1.5): >> >> 1. Reading from KafkaIO, attaching a timestamp from each parsed element >> 2. Filtering bad records >> 3. Writing to a partitioned table in BigQuery with FILE_LOADS (batch jobs) >> every 15 minutes >> >> I had a working pipeline that does not write to BigQuery directly, but to >> Cloud Storage, so it's 3rd step was >> >> 3. Writing files to GCS in daily "subdirectories" >> >> I tried to rewrite the pipeline to reduce complexity: Resetting it's state >> should no longer be tied to thinking about what to delete on GCS, also >> configurable refresh times directly from within the Pipeline was >> something I >> was looking for. The thing that I needed to change was the output in the >> end, >> so knew my parsing logic would not change and that should reduce the >> risk. >> >> I tested the pipeline within our testcluster and it looked promising. >> When I >> deployed it last week everything seemed to go smoothly. On Friday I >> noticed >> that I had holes in the data: in the BigQuery tables there were missing >> days >> (tricky was that the recent days looked fine). (To be sure I reset the >> pipeline >> and read from the very beginning of each topic from Kafka. Within >> different >> runs, different days were missing.) I spent the weekend rolling back the >> changes and trying to figure out what was going on. >> >> I didn't see any error in the logs (the log level was on WARNING for most >> parts), but I thought, well maybe it's because there are too many >> partitions >> and BigQuery has a limit of 1000 partition operations per day. So I >> started >> reading from just 90 days in the past, but I still had holes (whole days). >> >> I had a windowing step that I needed for the GCS pipeline, I became aware >> that I >> wouldn't need this anymore with BigQueryIO so I commented it out and >> tested >> again, without luck. >> >> What struck me was that the Flink Cluster didn't do any checkpoints for >> the >> pipeline that was using BigQueryIO - it does so when writing to GCS and I >> tested it's failure logic there. Additionally the graph in Flink with >> BigQueryIO becomes very complex, but this is something I expected. >> >> Here is the Pipeline code with the commented out windowing part: >> >> pipeline >> .apply( >> KafkaIO.<String, String>read() >> .withBootstrapServers(bootstrap) >> .withTopics(topics) >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializer(ConfigurableDeserializer.class) >> .updateConsumerProperties( >> >> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >> inputMessagesConfig)) >> >> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest")) >> .updateConsumerProperties(ImmutableMap.of("group.id", >> "di-beam-consumers")) >> >> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true")) >> .withTimestampPolicyFactory( >> TimestampPolicyFactory.withTimestampFn( >> new >> MessageTimestampExtractor(inputMessagesConfig))) >> .withReadCommitted() >> .commitOffsetsInFinalize()) >> .apply(ParDo.of(new ToEventFn())) >> // .apply( >> // Window.into(new ZurichTimePartitioningWindowFn()) >> // .triggering( >> // Repeatedly.forever( >> // AfterFirst.of( >> // >> AfterPane.elementCountAtLeast(bundleSize), >> // >> AfterProcessingTime.pastFirstElementInPane() >> // >> .plusDelayOf(refreshFrequency)))) >> // .withAllowedLateness(Duration.standardDays(1)) >> // .discardingFiredPanes()) >> .apply( >> BigQueryIO.<Event>write() >> .withMethod(BigQueryIO.Write.Method.FILE_LOADS) >> .withTriggeringFrequency(refreshFrequency) >> .withNumFileShards(1) >> .to(partitionedTableDynamicDestinations) >> .withFormatFunction( >> (SerializableFunction<Event, TableRow>) >> KafkaToBigQuery::convertUserEventToTableRow) >> >> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >> >> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); >> >> >> I have the feeling that I must make some serious and dumb mistakes as I >> know >> the Beam framework is very robust. Thanks for taking the time to read >> this. >> >> Tobi >> >