I've tested the same window, trigger, allowed_lateness nothing seems to
work.  I think the main issue is in the writerimpl which I linked earlier.

https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1033
According
to the doc:
> Currently only batch workflows support custom sinks.
Which makes me believe that it probably only supports write with batch
(bounded data source) Not sure if there's any plan to fix that, so I filed
a feature request: https://github.com/apache/beam/issues/25598  If anyone
who is also suffer on this issue, please also comment to get more attention
on this issue.


In the meantime, I also found an example that probably could be a
workaround for python pipelines:
https://www.programcreek.com/python/?code=GoogleCloudPlatform/python-docs-samples/python-docs-samples-master/pubsub/streaming-analytics/PubSubToGCS.py







Sincerely,
Lydian Lee



On Fri, Feb 17, 2023 at 2:12 AM Pavel Solomin <p.o.solo...@gmail.com> wrote:

> For me this use-case worked with the following window definition, which
> was a bit of try-and-fail, and I can't claim I got 100% understanding of
> windowing logic.
>
> Here's my java code for Kinesis -> Parquet files which worked:
> https://github.com/psolomin/beam-playground/blob/4968d8f43082113e3e643d7fc3418a7738a67c9a/kinesis-io-with-enhanced-fan-out/src/main/java/com/psolomin/consumer/KinesisToFilePipeline.java#L56
>
> I hope it's not hard to derive beam-python window config from it.
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Fri, 17 Feb 2023 at 08:49, Wiśniowski Piotr <
> contact.wisniowskipi...@gmail.com> wrote:
>
>> Hi,
>>
>> Sounds like exact problem that I have few emails before -
>> https://lists.apache.org/thread/q929lbwp8ylchbn8ngypfqlbvrwpfzph
>>
>> Does this mean that Parquet IO does not support partitioning, and we need
>> to do some workarounds? Like explicitly mapping each window to a separate
>> Parquet file? This could be a solution in Your case, if it works (just idea
>> worth trying but did not test it and do not have enough experience with
>> Beam), but I am limited only to pure SQL and not sure how I can do it.
>>
>> Hope This helps with Your problem and Beam support could find some
>> solution to my case too.
>>
>> Best
>>
>> Wiśniowski Piotr
>> On 17.02.2023 02:00, Lydian wrote:
>>
>> I want to make a simple Beam pipeline which will store the events from
>> kafka to S3 in parquet format every minute.
>>
>> Here's a simplified version of my pipeline:
>>
>> def add_timestamp(event: Any) -> Any:
>>     from datetime import datetime
>>     from apache_beam import window
>>
>>     return window.TimestampedValue(event,  
>> datetime.timestamp(event[1].timestamp))
>> # Actual Pipeline
>> (
>>   pipeline
>>   | "Read from Kafka" >> ReadFromKafka(consumer_config, topics, 
>> with_metadata=False)
>>   | "Transformed" >> beam.Map(my_transform)
>>   | "Add timestamp" >> beam.Map(add_timestamp)
>>   | "window" >> beam.WindowInto(window.FixedWindows(60))  # 1 mins
>>   | "writing to parquet" >> beam.io.WriteToParquet('s3://test-bucket/', 
>> pyarrow_schema)
>> )
>>
>> However, the pipeline failed with
>>
>> GroupByKey cannot be applied to an unbounded PCollection with global 
>> windowing and a default trigger
>>
>> This seems to be coming from
>> https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1145-L1146
>>  which
>> always add a GlobalWindows and thus causing this error. Wondering what I
>> should do to correctly backup the event from Kafka (Unbounded) to S3.
>> Thanks!
>>
>> btw, I am running with portableRunner with Flink. Beam Version is 2.41.0
>> (the latest version seems to have the same code) and Flink version is 1.14.5
>>
>>
>>
>> Sincerely,
>> Lydian Lee
>>
>>

Reply via email to