I want to make a simple Beam pipeline which will store the events from
kafka to S3 in parquet format every minute.

Here's a simplified version of my pipeline:

def add_timestamp(event: Any) -> Any:
    from datetime import datetime
    from apache_beam import window

    return window.TimestampedValue(event,
datetime.timestamp(event[1].timestamp))
# Actual Pipeline
(
  pipeline
  | "Read from Kafka" >> ReadFromKafka(consumer_config, topics,
with_metadata=False)
  | "Transformed" >> beam.Map(my_transform)
  | "Add timestamp" >> beam.Map(add_timestamp)
  | "window" >> beam.WindowInto(window.FixedWindows(60))  # 1 mins
  | "writing to parquet" >>
beam.io.WriteToParquet('s3://test-bucket/', pyarrow_schema)
)

However, the pipeline failed with

GroupByKey cannot be applied to an unbounded PCollection with global
windowing and a default trigger

This seems to be coming from
https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1145-L1146
which
always add a GlobalWindows and thus causing this error. Wondering what I
should do to correctly backup the event from Kafka (Unbounded) to S3.
Thanks!

btw, I am running with portableRunner with Flink. Beam Version is 2.41.0
(the latest version seems to have the same code) and Flink version is 1.14.5



Sincerely,
Lydian Lee

Reply via email to