For me this use-case worked with the following window definition, which was
a bit of try-and-fail, and I can't claim I got 100% understanding of
windowing logic.

Here's my java code for Kinesis -> Parquet files which worked:
https://github.com/psolomin/beam-playground/blob/4968d8f43082113e3e643d7fc3418a7738a67c9a/kinesis-io-with-enhanced-fan-out/src/main/java/com/psolomin/consumer/KinesisToFilePipeline.java#L56

I hope it's not hard to derive beam-python window config from it.

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Fri, 17 Feb 2023 at 08:49, Wiśniowski Piotr <
contact.wisniowskipi...@gmail.com> wrote:

> Hi,
>
> Sounds like exact problem that I have few emails before -
> https://lists.apache.org/thread/q929lbwp8ylchbn8ngypfqlbvrwpfzph
>
> Does this mean that Parquet IO does not support partitioning, and we need
> to do some workarounds? Like explicitly mapping each window to a separate
> Parquet file? This could be a solution in Your case, if it works (just idea
> worth trying but did not test it and do not have enough experience with
> Beam), but I am limited only to pure SQL and not sure how I can do it.
>
> Hope This helps with Your problem and Beam support could find some
> solution to my case too.
>
> Best
>
> Wiśniowski Piotr
> On 17.02.2023 02:00, Lydian wrote:
>
> I want to make a simple Beam pipeline which will store the events from
> kafka to S3 in parquet format every minute.
>
> Here's a simplified version of my pipeline:
>
> def add_timestamp(event: Any) -> Any:
>     from datetime import datetime
>     from apache_beam import window
>
>     return window.TimestampedValue(event,  
> datetime.timestamp(event[1].timestamp))
> # Actual Pipeline
> (
>   pipeline
>   | "Read from Kafka" >> ReadFromKafka(consumer_config, topics, 
> with_metadata=False)
>   | "Transformed" >> beam.Map(my_transform)
>   | "Add timestamp" >> beam.Map(add_timestamp)
>   | "window" >> beam.WindowInto(window.FixedWindows(60))  # 1 mins
>   | "writing to parquet" >> beam.io.WriteToParquet('s3://test-bucket/', 
> pyarrow_schema)
> )
>
> However, the pipeline failed with
>
> GroupByKey cannot be applied to an unbounded PCollection with global 
> windowing and a default trigger
>
> This seems to be coming from
> https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1145-L1146
>  which
> always add a GlobalWindows and thus causing this error. Wondering what I
> should do to correctly backup the event from Kafka (Unbounded) to S3.
> Thanks!
>
> btw, I am running with portableRunner with Flink. Beam Version is 2.41.0
> (the latest version seems to have the same code) and Flink version is 1.14.5
>
>
>
> Sincerely,
> Lydian Lee
>
>

Reply via email to