On Wed, Nov 7, 2018 at 5:04 AM Kaymak, Tobias <tobias.kay...@ricardo.ch>
wrote:

>
> On Tue, Nov 6, 2018 at 6:58 PM Raghu Angadi <rang...@google.com> wrote:
>
>> You seem to be reading from multiple topics and your timestamp policy is
>> too simplistic (withTimestampFn() was never meant to be public API, I am
>> sending a PR to deprecate it first and then will make it package private).
>> So if you have two topics of different sizes, smaller topic might be
>> consumed really fast pushing your watermark way ahead. This might or might
>> be happening, but this is one of the dangers of using record timestamp for
>> watermark (we should never do that).
>>
>>
> To clarify: Test was done consuming from a single topic. I am using a field
> inside the element's JSON to get the element's timestamp. Data in a topic
> can
> go way back to let's say 2017, but that data was pushed to Kafka in one go
> and
> the timestamp when it arrived is for example wednesday last week.
> Sometimes the
> producing side does not set the element's timestamp for Kafka (since it's
> using
> a library that does not support that yet), so it has to be parsed.
>

That is fine. We can ignore the timestamp as possible suspect for debugging
this. Using custom timestamps from records is normal.

Raghu.


> I could also not fiddle with the timestamp at all and let the system
> decide and
> then in the BigQuery.IO partitioning step parse it and assign it to a
> partition. Is this better?
>
>
>
>> On Tue, Nov 6, 2018 at 3:44 AM Kaymak, Tobias <tobias.kay...@ricardo.ch>
>> wrote:
>>
>>> Hi,
>>>
>>> I am sharing my experience with you after trying to use the following
>>> pipeline
>>> logic (with Beam 2.6.0 - running on Flink 1.5):
>>>
>>> 1. Reading from KafkaIO, attaching a timestamp from each parsed element
>>> 2. Filtering bad records
>>> 3. Writing to a partitioned table in BigQuery with FILE_LOADS (batch
>>> jobs)
>>> every 15 minutes
>>>
>>> I had a working pipeline that does not write to BigQuery directly, but
>>> to
>>> Cloud Storage, so it's 3rd step was
>>>
>>> 3. Writing files to GCS in daily "subdirectories"
>>>
>>> I tried to rewrite the pipeline to reduce complexity: Resetting it's
>>> state
>>> should no longer be tied to thinking about what to delete on GCS, also
>>> configurable refresh times directly from within the Pipeline was
>>> something I
>>> was looking for. The thing that I needed to change was the output in the
>>> end,
>>> so knew my parsing logic would not change and that should reduce the
>>> risk.
>>>
>>> I tested the pipeline within our testcluster and it looked promising.
>>> When I
>>> deployed it last week everything seemed to go smoothly. On Friday I
>>> noticed
>>> that I had holes in the data: in the BigQuery tables there were missing
>>> days
>>> (tricky was that the recent days looked fine). (To be sure I reset the
>>> pipeline
>>> and read from the very beginning of each topic from Kafka. Within
>>> different
>>> runs, different days were missing.) I spent the weekend rolling back the
>>> changes and trying to figure out what was going on.
>>>
>>> I didn't see any error in the logs (the log level was on WARNING for most
>>> parts), but I thought, well maybe it's because there are too many
>>> partitions
>>> and BigQuery has a limit of 1000 partition operations per day. So I
>>> started
>>> reading from just 90 days in the past, but I still had holes (whole
>>> days).
>>>
>>> I had a windowing step that I needed for the GCS pipeline, I became
>>> aware that I
>>> wouldn't need this anymore with BigQueryIO so I commented it out and
>>> tested
>>> again, without luck.
>>>
>>> What struck me was that the Flink Cluster didn't do any checkpoints for
>>> the
>>> pipeline that was using BigQueryIO - it does so when writing to GCS and I
>>> tested it's failure logic there. Additionally the graph in Flink with
>>> BigQueryIO becomes very complex, but this is something I expected.
>>>
>>> Here is the Pipeline code with the commented out windowing part:
>>>
>>>   pipeline
>>>         .apply(
>>>             KafkaIO.<String, String>read()
>>>                 .withBootstrapServers(bootstrap)
>>>                 .withTopics(topics)
>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>                 .withValueDeserializer(ConfigurableDeserializer.class)
>>>                 .updateConsumerProperties(
>>>
>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>> inputMessagesConfig))
>>>
>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
>>>                 .updateConsumerProperties(ImmutableMap.of("group.id",
>>> "di-beam-consumers"))
>>>
>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
>>>                 .withTimestampPolicyFactory(
>>>                     TimestampPolicyFactory.withTimestampFn(
>>>                         new
>>> MessageTimestampExtractor(inputMessagesConfig)))
>>>                 .withReadCommitted()
>>>                 .commitOffsetsInFinalize())
>>>         .apply(ParDo.of(new ToEventFn()))
>>>         //        .apply(
>>>         //            Window.into(new ZurichTimePartitioningWindowFn())
>>>         //                .triggering(
>>>         //                    Repeatedly.forever(
>>>         //                        AfterFirst.of(
>>>         //
>>> AfterPane.elementCountAtLeast(bundleSize),
>>>         //
>>> AfterProcessingTime.pastFirstElementInPane()
>>>         //
>>> .plusDelayOf(refreshFrequency))))
>>>         //                .withAllowedLateness(Duration.standardDays(1))
>>>         //                .discardingFiredPanes())
>>>         .apply(
>>>             BigQueryIO.<Event>write()
>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>>>                 .withTriggeringFrequency(refreshFrequency)
>>>                 .withNumFileShards(1)
>>>                 .to(partitionedTableDynamicDestinations)
>>>                 .withFormatFunction(
>>>                     (SerializableFunction<Event, TableRow>)
>>>                         KafkaToBigQuery::convertUserEventToTableRow)
>>>
>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>
>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>
>>>
>>> I have the feeling that I must make some serious and dumb mistakes as I
>>> know
>>> the Beam framework is very robust. Thanks for taking the time to read
>>> this.
>>>
>>> Tobi
>>>
>>
>
> --
> Tobias Kaymak
> Data Engineer
>
> tobias.kay...@ricardo.ch
> www.ricardo.ch
>

Reply via email to