Hi all,
This error came as a bit of a surprise.
Here’s a snippet of the traceback (full traceback below).
File "apache_beam/runners/common.py", line 751, in
apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 423, in
apache_beam.runners.common.SimpleInvoker.invoke_process
windowed_value, self.process_method(windowed_value.value))
File
"/Users/chad/dev/beam-tests/.venv/lib/python2.7/site-packages/apache_beam/io/iobase.py",
line 860, in split_source
AttributeError: '_PubSubSource' object has no attribute
'estimate_size' [while running 'PubSubInflow/Read/Split']
Flink is using _PubSubSource which is, as far as I can tell, a stub that
should be replaced at runtime by an actual streaming source. Is this error
a bug or a known limitation? If the latter, is there a Jira issue and any
momentum to solve this?
I’m pretty confused by this because the Apache Beam Portability Support
Matrix [1] makes it pretty clear that Flink supports streaming, and the
docs for “Built-in I/O Transforms” lists Google PubSub and BigQuery as the
only IO transforms that support streaming, so if streaming works with
Flink, PubSub should probably be the thing it works with.
I'm using beam 2.13.0 and flink 1.8.
thanks,
chad
[1]
https://docs.google.com/spreadsheets/d/1KDa_FGn1ShjomGd-UUDOhuh2q73de2tPz6BqHpzqvNI/edit#gid=0
[2] https://beam.apache.org/documentation/io/built-in/
Full traceback:
Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: Error received from SDK harness for
instruction 5: Traceback (most recent call last):
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 157, in _execute
response = task()
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 190, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 333, in do_instruction
request.instruction_id)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 359, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 589, in process_bundle
].process_encoded(data.data)
File
"/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 143, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 246, in
apache_beam.runners.worker.operations.Operation.output
def output(self, windowed_value, output_index=0):
File "apache_beam/runners/worker/operations.py", line 247, in
apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 143, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
self.consumer.process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 583, in
apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 584, in
apache_beam.runners.worker.operations.DoOperation.process
delayed_application = self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 747, in
apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 753, in
apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 807, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 751, in
apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 423, in
apache_beam.runners.common.SimpleInvoker.invoke_process
windowed_value, self.process_method(windowed_value.value))
File
"/Users/chad/dev/beam-tests/.venv/lib/python2.7/site-packages/apache_beam/io/iobase.py",
line 860, in split_source
AttributeError: '_PubSubSource' object has no attribute
'estimate_size' [while running 'PubSubInflow/Read/Split']