Hi,

Sounds like exact problem that I have few emails before - https://lists.apache.org/thread/q929lbwp8ylchbn8ngypfqlbvrwpfzph

Does this mean that Parquet IO does not support partitioning, and we need to do some workarounds? Like explicitly mapping each window to a separate Parquet file? This could be a solution in Your case, if it works (just idea worth trying but did not test it and do not have enough experience with Beam), but I am limited only to pure SQL and not sure how I can do it.

Hope This helps with Your problem and Beam support could find some solution to my case too.

Best

Wiśniowski Piotr

On 17.02.2023 02:00, Lydian wrote:
I want to make a simple Beam pipeline which will store the events from kafka to S3 in parquet format every minute.

Here's a simplified version of my pipeline:

|def add_timestamp(event: Any) -> Any: from datetime import datetime from apache_beam import window return window.TimestampedValue(event, datetime.timestamp(event[1].timestamp)) # Actual Pipeline ( pipeline | "Read from Kafka" >> ReadFromKafka(consumer_config, topics, with_metadata=False) | "Transformed" >> beam.Map(my_transform) | "Add timestamp" >> beam.Map(add_timestamp) | "window" >> beam.WindowInto(window.FixedWindows(60)) # 1 mins | "writing to parquet" >> beam.io.WriteToParquet('s3://test-bucket/', pyarrow_schema) ) |

However, the pipeline failed with

|GroupByKey cannot be applied to an unbounded PCollection with global windowing and a default trigger |

This seems to be coming from https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1145-L1146 which always add a |GlobalWindows| and thus causing this error. Wondering what I should do to correctly backup the event from Kafka (Unbounded) to S3. Thanks!

btw, I am running with |portableRunner| with Flink. Beam Version is 2.41.0 (the latest version seems to have the same code) and Flink version is 1.14.5



Sincerely,
Lydian Lee

Reply via email to