Hi Ondřej,

I'll start with a disclaimer; I'm not exactly an expert on neither python SDK nor ParquetIO, so please take these just as a suggestions from the top of my head.

First, it seems that the current implementation of WriteToParquet really does not play well with streaming pipelines. There are several options that could be used to overcome this limitation:

 a) you can try fixing the sink, maybe adding AfterWatermark.pastEndOfWindow() trigger might be enough to make it work (need to be tested)

 b) if the Java implementation of ParquetIO works for streaming pipelines (and I would suppose it does), you can use cross-language transform to run ParquetIO from python, see [1] for quick start

 c) generally speaking, using a full-blown streaming engine for tasks like "buffer this and store it in bulk after a timeout" is inefficient. Alternative approach would be just to use KafkaConsumer, create parquet files on local disk, push them to GCS and commit offsets afterwards. Streaming engines buffer data in replicated distributed state which adds unneeded complexity

 d) if there is some non-trivial processing between consuming elements from Kafka and writing outputs, then it might be an alternative to process the data in streaming pipeline, write outputs back to Kafka and then use approach (c) to get it to GCS

The specific solution depends on the actual requirements of your customers.

Best,

 Jan

[1] https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/

On 3/14/24 09:34, Ondřej Pánek wrote:

Basically, this is the error we receive when trying to use avro or parquet sinks (attached image).

Also, check the sample pipeline that triggers this error (when deploying with DataflowRunner). So obviously, there is no global window or default trigger. That’s, I believe, what’s described in the issue: https://github.com/apache/beam/issues/25598

*From: *Ondřej Pánek <[email protected]>
*Date: *Thursday, March 14, 2024 at 07:57
*To: *[email protected] <[email protected]>
*Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python

Hello, thanks for the reply!

Please, refer to these:

  * 
https://www.googlecloudcommunity.com/gc/Data-Analytics/kafka-to-parquet/m-p/646836
  * https://github.com/apache/beam/issues/25598

Best,

Ondrej

*From: *XQ Hu via user <[email protected]>
*Date: *Thursday, March 14, 2024 at 02:32
*To: *[email protected] <[email protected]>
*Cc: *XQ Hu <[email protected]>
*Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python

Can you explain more about " that current sinks for Avro and Parquet with the destination of GCS are not supported"?

We do have AvroIO and ParquetIO (https://beam.apache.org/documentation/io/connectors/) in Python.

On Wed, Mar 13, 2024 at 5:35 PM Ondřej Pánek <[email protected]> wrote:

    Hello Beam team!

    We’re currently onboarding customer’s infrastructure to the Google
    Cloud Platform. The decision was made that one of the technologies
    they will use is Dataflow. Let me briefly the usecase specification:

    They have kafka cluster where data from CDC data source is stored.
    The data in the topics is stored as Avro format. Their other
    requirement is they want to have a streaming solution reading from
    these Kafka topics, and writing to the Google Cloud Storage again
    in Avro. What’s more, the component should be written in Python,
    since their Data Engineers heavily prefer Python instead of Java.

    We’ve been struggling with the design of the solution for couple
    of weeks now, and we’re facing quite unfortunate situation now,
    not really finding any solution that would fit these requirements.

    So the question is: Is there any existing Dataflow
    template/solution with the following specifications:

      * Streaming connector
      * Written in Python
      * Consumes from Kafka topics
      * Reads Avro with Schema Registry
      * Writes Avro to GCS

    We found out, that current sinks for Avro and Parquet with the
    destination of GCS are not supported for Python at the moment,
    which is basically the main blocker now.

    Any recommendations/suggestions would be really highly appreciated!

    Maybe the solution really does not exist and we need to create our
    own custom connector for it. The question in this case would be if
    that’s even possible theoretically, since we would really need to
    avoid another dead end.

    Thanks a lot for any help!

    Kind regards,

    Ondrej

Reply via email to