Hi,
Sounds like exact problem that I have few emails before -
https://lists.apache.org/thread/q929lbwp8ylchbn8ngypfqlbvrwpfzph
Does this mean that Parquet IO does not support partitioning, and we
need to do some workarounds? Like explicitly mapping each window to a
separate Parquet file? This could be a solution in Your case, if it
works (just idea worth trying but did not test it and do not have enough
experience with Beam), but I am limited only to pure SQL and not sure
how I can do it.
Hope This helps with Your problem and Beam support could find some
solution to my case too.
Best
Wiśniowski Piotr
On 17.02.2023 02:00, Lydian wrote:
I want to make a simple Beam pipeline which will store the events from
kafka to S3 in parquet format every minute.
Here's a simplified version of my pipeline:
|def add_timestamp(event: Any) -> Any: from datetime import datetime
from apache_beam import window return window.TimestampedValue(event,
datetime.timestamp(event[1].timestamp)) # Actual Pipeline ( pipeline |
"Read from Kafka" >> ReadFromKafka(consumer_config, topics,
with_metadata=False) | "Transformed" >> beam.Map(my_transform) | "Add
timestamp" >> beam.Map(add_timestamp) | "window" >>
beam.WindowInto(window.FixedWindows(60)) # 1 mins | "writing to
parquet" >> beam.io.WriteToParquet('s3://test-bucket/', pyarrow_schema) ) |
However, the pipeline failed with
|GroupByKey cannot be applied to an unbounded PCollection with global
windowing and a default trigger |
This seems to be coming from
https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1145-L1146 which
always add a |GlobalWindows| and thus causing this error. Wondering
what I should do to correctly backup the event from Kafka (Unbounded)
to S3. Thanks!
btw, I am running with |portableRunner| with Flink. Beam Version is
2.41.0 (the latest version seems to have the same code) and Flink
version is 1.14.5
Sincerely,
Lydian Lee