Hi Beam users, I'm facing a problem with a Beam Python pipeline. It is running on Flink, reading from Kafka in an unbounded way, and I have use_deprecated_read flag set. I then have 2 beam.Map() calls, followed by a .windowInto() and then a write to a file system.
When I send a batch (1000) of small messages (20 bytes), I have no problems. However, when I send a batch (1000) of large messages (1 kilobytes), the pipeline freezes after some time. The exact location varies, however, I notice that there is always 10-12 records worth of gap in Kafka records sent and the Python step records received. This is remarkably consistent. Also, when I cancel the Flink job, I see a set of stack traces on the console. [1] A similar Java pipeline works fine, also if I have only one beam.Map() call, it works fine. If I add a Reshuffle() to prevent fusion, it makes no difference. It seems like we have a problem posting messages from stage to stage within Python. I'm wondering if there's a buffer in the Python SDK of about 10-12 KB that gets filled up and then blocks the pipeline from progress? Thanks, Deepak [1] Sample Python stack trace, this is printed automatically when I cancel the Beam pipeline job on Flink UI > INFO:__main__:Stopping worker 1-1 Fatal Python error: could not acquire lock for <_io.BufferedWriter name='<stdout>'> at interpreter shutdown, possibly due to daemon threads Python runtime state: finalizing (tstate=0x7f8849f048e0) Thread 0x000070000e577000 (most recent call first): File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 870 in run File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap Thread 0x000070000f57a000 (most recent call first): File "/Users/deepaknagaraj/dev/utils/kafka_to_fs/worker.py", line 27 in read_kafka_record File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1639 in <lambda> File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228 in process_encoded File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1002 in process_bundle File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 625 in process_bundle File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 587 in do_instruction File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 347 in <lambda> File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 274 in _execute File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 346 in task File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 37 in run File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 53 in run File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap Thread 0x000070001a61e000 (most recent call first): File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 302 in wait File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 433 in acquire File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 57 in run File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap Thread 0x0000700018618000 (most recent call first): File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 1202 in invoke_excepthook File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 934 in _bootstrap_inner File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap Thread 0x0000700017615000 (most recent call first): File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 302 in wait File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line 170 in get File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py", line 581 in _write_outputs File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py", line 203 in consume_request_iterator File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 870 in run File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap Thread 0x000070001460c000 (most recent call first): File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 302 in wait File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line 170 in get File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 987 in request_iter File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py", line 203 in consume_request_iterator File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 870 in run File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap Thread 0x000070000c571000 (most recent call first): File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 302 in wait File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 433 in acquire File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 57 in run File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap Thread 0x0000700013586000 (most recent call first): File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 302 in wait File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/queue.py", line 170 in get File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 233 in get_responses File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/grpc/_channel.py", line 203 in consume_request_iterator File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 870 in run File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap Thread 0x0000700011580000 (most recent call first): File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 306 in wait File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 558 in wait File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/site-packages/apache_beam/runners/worker/data_plane.py", line 214 in run File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 932 in _bootstrap_inner File "/Users/deepaknagaraj/.pyenv/versions/3.8.12/lib/python3.8/threading.py", line 890 in _bootstrap Current thread 0x000000010b173600 (most recent call first): <no Python frame> ^C