Hi Beam users,

I'm facing a problem with a Beam Python pipeline. It is running on Flink,
reading from Kafka in an unbounded way, and I have use_deprecated_read flag
set. I then have 2 beam.Map() calls, followed by a .windowInto() and then a
write to a file system.

When I send a batch (1000) of small messages (20 bytes), I have no
problems. However, when I send a batch (1000) of large messages (1
kilobytes), the pipeline freezes after some time. The exact location
varies, however, I notice that there is always 10-12 records worth of gap
in Kafka records sent and the Python step records received. This is
remarkably consistent. Also, when I cancel the Flink job, I see a set of
stack traces on the console. [1]

A similar Java pipeline works fine, also if I have only one beam.Map()
call, it works fine. If I add a Reshuffle() to prevent fusion, it makes no
difference.

It seems like we have a problem posting messages from stage to stage within
Python. I'm wondering if there's a buffer in the Python SDK of about 10-12
KB that gets filled up and then blocks the pipeline from progress?

Thanks,
Deepak

[1] Sample Python stack trace,  this is printed automatically when I cancel
the Beam pipeline job on Flink UI

>
INFO:__main__:Stopping worker 1-1
Fatal Python error: could not acquire lock for <_io.BufferedWriter
name='<stdout>'> at interpreter shutdown, possibly due to daemon threads
Python runtime state: finalizing (tstate=0x7f8849f048e0)

Thread 0x000070000e577000 (most recent call first):
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 870 in run
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 932 in _bootstrap_inner
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 890 in _bootstrap

Thread 0x000070000f57a000 (most recent call first):
  File "/Users/deepaknagaraj/dev/utils/kafka_to_fs/worker.py", line 27 in
read_kafka_record
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/transforms/core.py",
line 1639 in <lambda>
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 228 in process_encoded
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1002 in process_bundle
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 625 in process_bundle
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 587 in do_instruction
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 347 in <lambda>
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 274 in _execute
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 346 in task
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py",
line 37 in run
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py",
line 53 in run
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 932 in _bootstrap_inner
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 890 in _bootstrap

Thread 0x000070001a61e000 (most recent call first):
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 302 in wait
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 433 in acquire
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py",
line 57 in run
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 932 in _bootstrap_inner
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 890 in _bootstrap

Thread 0x0000700018618000 (most recent call first):
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 1202 in invoke_excepthook
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 934 in _bootstrap_inner
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 890 in _bootstrap

Thread 0x0000700017615000 (most recent call first):
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 302 in wait
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line
170 in get
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
line 581 in _write_outputs
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py",
line 203 in consume_request_iterator
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 870 in run
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 932 in _bootstrap_inner
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 890 in _bootstrap

Thread 0x000070001460c000 (most recent call first):
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 302 in wait
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line
170 in get
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 987 in request_iter
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py",
line 203 in consume_request_iterator
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 870 in run
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 932 in _bootstrap_inner
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 890 in _bootstrap

Thread 0x000070000c571000 (most recent call first):
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 302 in wait
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 433 in acquire
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py",
line 57 in run
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 932 in _bootstrap_inner
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 890 in _bootstrap

Thread 0x0000700013586000 (most recent call first):
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 302 in wait
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line
170 in get
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 233 in get_responses
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py",
line 203 in consume_request_iterator
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 870 in run
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 932 in _bootstrap_inner
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 890 in _bootstrap

Thread 0x0000700011580000 (most recent call first):
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 306 in wait
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 558 in wait
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py",
line 214 in run
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 932 in _bootstrap_inner
  File
"/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py",
line 890 in _bootstrap

Current thread 0x000000010b173600 (most recent call first):
<no Python frame>
^C

Reply via email to