On Tue, Nov 6, 2018 at 6:06 PM Lukasz Cwik <lc...@google.com> wrote:

> Since your using FILE_LOADS as the BigQueryIO write method, did you see a
> file being created for each partitioned table based upon the requested
> triggering frequency?
>
> Figuring out whether the problem was upstream from creating the file or
> downstream after the file was created would help debug this issue.
>
>
That is a very good point! I have to admit that it's not so easy for me to
figure out postmortem if this happened for each one. As every load job I can
see in the logs was successful, I need to dig through the temporary file
structure in the GCS bucket used by BigQuery.IO before loading the tables,
but
that is quite a challenge.



> On Tue, Nov 6, 2018 at 3:44 AM Kaymak, Tobias <tobias.kay...@ricardo.ch>
> wrote:
>
>> Hi,
>>
>> I am sharing my experience with you after trying to use the following
>> pipeline
>> logic (with Beam 2.6.0 - running on Flink 1.5):
>>
>> 1. Reading from KafkaIO, attaching a timestamp from each parsed element
>> 2. Filtering bad records
>> 3. Writing to a partitioned table in BigQuery with FILE_LOADS (batch jobs)
>> every 15 minutes
>>
>> I had a working pipeline that does not write to BigQuery directly, but to
>> Cloud Storage, so it's 3rd step was
>>
>> 3. Writing files to GCS in daily "subdirectories"
>>
>> I tried to rewrite the pipeline to reduce complexity: Resetting it's state
>> should no longer be tied to thinking about what to delete on GCS, also
>> configurable refresh times directly from within the Pipeline was
>> something I
>> was looking for. The thing that I needed to change was the output in the
>> end,
>> so knew my parsing logic would not change and that should reduce the
>> risk.
>>
>> I tested the pipeline within our testcluster and it looked promising.
>> When I
>> deployed it last week everything seemed to go smoothly. On Friday I
>> noticed
>> that I had holes in the data: in the BigQuery tables there were missing
>> days
>> (tricky was that the recent days looked fine). (To be sure I reset the
>> pipeline
>> and read from the very beginning of each topic from Kafka. Within
>> different
>> runs, different days were missing.) I spent the weekend rolling back the
>> changes and trying to figure out what was going on.
>>
>> I didn't see any error in the logs (the log level was on WARNING for most
>> parts), but I thought, well maybe it's because there are too many
>> partitions
>> and BigQuery has a limit of 1000 partition operations per day. So I
>> started
>> reading from just 90 days in the past, but I still had holes (whole days).
>>
>> I had a windowing step that I needed for the GCS pipeline, I became aware
>> that I
>> wouldn't need this anymore with BigQueryIO so I commented it out and
>> tested
>> again, without luck.
>>
>> What struck me was that the Flink Cluster didn't do any checkpoints for
>> the
>> pipeline that was using BigQueryIO - it does so when writing to GCS and I
>> tested it's failure logic there. Additionally the graph in Flink with
>> BigQueryIO becomes very complex, but this is something I expected.
>>
>> Here is the Pipeline code with the commented out windowing part:
>>
>>   pipeline
>>         .apply(
>>             KafkaIO.<String, String>read()
>>                 .withBootstrapServers(bootstrap)
>>                 .withTopics(topics)
>>                 .withKeyDeserializer(StringDeserializer.class)
>>                 .withValueDeserializer(ConfigurableDeserializer.class)
>>                 .updateConsumerProperties(
>>
>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>> inputMessagesConfig))
>>
>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
>>                 .updateConsumerProperties(ImmutableMap.of("group.id",
>> "di-beam-consumers"))
>>
>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
>>                 .withTimestampPolicyFactory(
>>                     TimestampPolicyFactory.withTimestampFn(
>>                         new
>> MessageTimestampExtractor(inputMessagesConfig)))
>>                 .withReadCommitted()
>>                 .commitOffsetsInFinalize())
>>         .apply(ParDo.of(new ToEventFn()))
>>         //        .apply(
>>         //            Window.into(new ZurichTimePartitioningWindowFn())
>>         //                .triggering(
>>         //                    Repeatedly.forever(
>>         //                        AfterFirst.of(
>>         //
>> AfterPane.elementCountAtLeast(bundleSize),
>>         //
>> AfterProcessingTime.pastFirstElementInPane()
>>         //
>> .plusDelayOf(refreshFrequency))))
>>         //                .withAllowedLateness(Duration.standardDays(1))
>>         //                .discardingFiredPanes())
>>         .apply(
>>             BigQueryIO.<Event>write()
>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>>                 .withTriggeringFrequency(refreshFrequency)
>>                 .withNumFileShards(1)
>>                 .to(partitionedTableDynamicDestinations)
>>                 .withFormatFunction(
>>                     (SerializableFunction<Event, TableRow>)
>>                         KafkaToBigQuery::convertUserEventToTableRow)
>>
>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>
>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>
>>
>> I have the feeling that I must make some serious and dumb mistakes as I
>> know
>> the Beam framework is very robust. Thanks for taking the time to read
>> this.
>>
>> Tobi
>>
>

Reply via email to