For me this use-case worked with the following window definition, which was a bit of try-and-fail, and I can't claim I got 100% understanding of windowing logic.
Here's my java code for Kinesis -> Parquet files which worked: https://github.com/psolomin/beam-playground/blob/4968d8f43082113e3e643d7fc3418a7738a67c9a/kinesis-io-with-enhanced-fan-out/src/main/java/com/psolomin/consumer/KinesisToFilePipeline.java#L56 I hope it's not hard to derive beam-python window config from it. Best Regards, Pavel Solomin Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin <https://www.linkedin.com/in/pavelsolomin> On Fri, 17 Feb 2023 at 08:49, Wiśniowski Piotr < contact.wisniowskipi...@gmail.com> wrote: > Hi, > > Sounds like exact problem that I have few emails before - > https://lists.apache.org/thread/q929lbwp8ylchbn8ngypfqlbvrwpfzph > > Does this mean that Parquet IO does not support partitioning, and we need > to do some workarounds? Like explicitly mapping each window to a separate > Parquet file? This could be a solution in Your case, if it works (just idea > worth trying but did not test it and do not have enough experience with > Beam), but I am limited only to pure SQL and not sure how I can do it. > > Hope This helps with Your problem and Beam support could find some > solution to my case too. > > Best > > Wiśniowski Piotr > On 17.02.2023 02:00, Lydian wrote: > > I want to make a simple Beam pipeline which will store the events from > kafka to S3 in parquet format every minute. > > Here's a simplified version of my pipeline: > > def add_timestamp(event: Any) -> Any: > from datetime import datetime > from apache_beam import window > > return window.TimestampedValue(event, > datetime.timestamp(event[1].timestamp)) > # Actual Pipeline > ( > pipeline > | "Read from Kafka" >> ReadFromKafka(consumer_config, topics, > with_metadata=False) > | "Transformed" >> beam.Map(my_transform) > | "Add timestamp" >> beam.Map(add_timestamp) > | "window" >> beam.WindowInto(window.FixedWindows(60)) # 1 mins > | "writing to parquet" >> beam.io.WriteToParquet('s3://test-bucket/', > pyarrow_schema) > ) > > However, the pipeline failed with > > GroupByKey cannot be applied to an unbounded PCollection with global > windowing and a default trigger > > This seems to be coming from > https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1145-L1146 > which > always add a GlobalWindows and thus causing this error. Wondering what I > should do to correctly backup the event from Kafka (Unbounded) to S3. > Thanks! > > btw, I am running with portableRunner with Flink. Beam Version is 2.41.0 > (the latest version seems to have the same code) and Flink version is 1.14.5 > > > > Sincerely, > Lydian Lee > >