Python SDK's ReadFromKafka is an external transform implemented in Java, which is similar to SqlTransform. InteractiveRunner doesn't support it.
That being said, if you want to implement an interactive interaction with external transforms, you may follow the workaround for SQL ( https://cloud.google.com/dataflow/docs/guides/notebook-advanced#beam-sql). The source code is https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive/sql . Ning. On Wed, Mar 6, 2024 at 9:50 PM Jaehyeon Kim <dott...@gmail.com> wrote: > Hello, > > I'm playing with the interactive runner on a notebook and the flink runner > is used as the underlying runner. I wonder if it can read messages from > Kafka. I checked the example notebook > <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/examples/Interactive%20Beam%20Running%20on%20Flink.ipynb> > and > it works. However I cannot read Kafka messages with the following error. > > KeyError: 'beam:transform:org.apache.beam:kafka_read_with_metadata:v2' > > Cheers, > Jaehyeon > > *Here is the source.* > > pipeline_opts = { > "job_name": "kafka-io", > "environment_type": "LOOPBACK", > "streaming": True, > "parallelism": 3, > "experiments": [ > "use_deprecated_read" > ], ## https://github.com/apache/beam/issues/20979 > "checkpointing_interval": "60000", > } > options = PipelineOptions([], **pipeline_opts) > # Required, else it will complain that when importing worker functions > options.view_as(SetupOptions).save_main_session = True > > p = beam.Pipeline( > > interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()), > options=options > ) > events = ( > p > | "Read from Kafka" > >> kafka.ReadFromKafka( > consumer_config={ > "bootstrap.servers": os.getenv( > "BOOTSTRAP_SERVERS", > "host.docker.internal:29092", > ), > "auto.offset.reset": "earliest", > # "enable.auto.commit": "true", > "group.id": "kafka-io", > }, > topics=["website-visit"], > ) > | "Decode messages" >> beam.Map(decode_message) > | "Parse elements" >> beam.Map(parse_json).with_output_types(EventLog) > ) > results = p.run() > result.wait_until_finish() > > *And here is the full error message.* > > WARNING:apache_beam.options.pipeline_options:Discarding invalid overrides: > {'checkpointing_interval': '60000'} > > ---------------------------------------------------------------------------KeyError > Traceback (most recent call last) > Cell In[17], line 36 15 p = beam.Pipeline( 16 > interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()), > options=options 17 ) 18 events = ( 19 p 20 | "Read > from Kafka" (...) 34 | "Parse elements" >> > beam.Map(parse_json).with_output_types(EventLog) 35 )---> 36 results = > p.run() 37 result.wait_until_finish() 38 # > ib.options.recording_duration = "120s" 39 # ib.show(events) > > File > ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:586 > > <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=585>, > in Pipeline.run(self, test_runner_api) 584 finally: 585 > shutil.rmtree(tmpdir)--> 586 return self.runner.run_pipeline(self, > self._options) 587 finally: 588 if not is_in_ipython(): > > File > ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/interactive_runner.py:148 > > <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/interactive_runner.py#line=147>, > in InteractiveRunner.run_pipeline(self, pipeline, options) 145 if > isinstance(self._underlying_runner, FlinkRunner): 146 > self.configure_for_flink(user_pipeline, options)--> 148 pipeline_instrument = > inst.build_pipeline_instrument(pipeline, options) 150 # The user_pipeline > analyzed might be None if the pipeline given has nothing 151 # to be > cached and tracing back to the user defined pipeline is impossible. 152 # > When it's None, there is no need to cache including the background 153 # > caching job and no result to track since no background caching job is 154 > # started at all. 155 if user_pipeline: 156 # Should use the > underlying runner and run asynchronously. > > File > ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py:756 > > <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py#line=755>, > in build_pipeline_instrument(pipeline, options) 742 def > build_pipeline_instrument(pipeline, options=None): 743 """Creates > PipelineInstrument for a pipeline and its options with cache. 744 745 > Throughout the process, the returned PipelineInstrument snapshots the given > (...) 754 runner pipeline to apply interactivity. 755 """--> 756 > pi = PipelineInstrument(pipeline, options) 757 pi.preprocess() 758 > pi.instrument() # Instruments the pipeline only once. > > File > ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py:71 > > <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py#line=70>, > in PipelineInstrument.__init__(self, pipeline, options) 67 if > background_caching_job.has_source_to_cache(self._user_pipeline): 68 > self._cache_manager = ie.current_env().get_cache_manager( 69 > self._user_pipeline)---> 71 self._background_caching_pipeline = > beam.pipeline.Pipeline.from_runner_api( 72 pipeline.to_runner_api(), > pipeline.runner, options) 73 ie.current_env().add_derived_pipeline( > 74 self._pipeline, self._background_caching_pipeline) 76 # Snapshot > of original pipeline information. > > File > ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1020 > > <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1019>, > in Pipeline.from_runner_api(proto, runner, options, return_context) 1018 > if proto.root_transform_ids: 1019 root_transform_id, = > proto.root_transform_ids-> 1020 p.transforms_stack = > [context.transforms.get_by_id(root_transform_id)] 1021 else: 1022 > p.transforms_stack = [AppliedPTransform(None, None, '', None)] > > File > ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114 > > <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>, > in _PipelineContextMap.get_by_id(self, id) 111 def get_by_id(self, id): > 112 # type: (str) -> PortableObjectT 113 if id not in > self._id_to_obj:--> 114 self._id_to_obj[id] = > self._obj_type.from_runner_api( 115 self._id_to_proto[id], > self._pipeline_context) 116 return self._id_to_obj[id] > > File > ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1456 > > <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1455>, > in AppliedPTransform.from_runner_api(proto, context) 1454 result.parts = > [] 1455 for transform_id in proto.subtransforms:-> 1456 part = > context.transforms.get_by_id(transform_id) 1457 part.parent = result > 1458 result.add_part(part) > > File > ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114 > > <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>, > in _PipelineContextMap.get_by_id(self, id) 111 def get_by_id(self, id): > 112 # type: (str) -> PortableObjectT 113 if id not in > self._id_to_obj:--> 114 self._id_to_obj[id] = > self._obj_type.from_runner_api( 115 self._id_to_proto[id], > self._pipeline_context) 116 return self._id_to_obj[id] > > File > ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1456 > > <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1455>, > in AppliedPTransform.from_runner_api(proto, context) 1454 result.parts = > [] 1455 for transform_id in proto.subtransforms:-> 1456 part = > context.transforms.get_by_id(transform_id) 1457 part.parent = result > 1458 result.add_part(part) > > File > ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114 > > <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>, > in _PipelineContextMap.get_by_id(self, id) 111 def get_by_id(self, id): > 112 # type: (str) -> PortableObjectT 113 if id not in > self._id_to_obj:--> 114 self._id_to_obj[id] = > self._obj_type.from_runner_api( 115 self._id_to_proto[id], > self._pipeline_context) 116 return self._id_to_obj[id] > > File > ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1426 > > <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1425>, > in AppliedPTransform.from_runner_api(proto, context) 1419 > side_input_tags = [] 1421 main_inputs = { 1422 tag: > context.pcollections.get_by_id(id) 1423 for (tag, id) in > proto.inputs.items() if tag not in side_input_tags 1424 }-> 1426 transform > = ptransform.PTransform.from_runner_api(proto, context) 1427 if transform > and proto.environment_id: 1428 resource_hints = > context.environments.get_by_id( 1429 > proto.environment_id).resource_hints() > > File > ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py:769 > > <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py#line=768>, > in PTransform.from_runner_api(cls, proto, context) 767 if proto is None > or proto.spec is None or not proto.spec.urn: 768 return None--> 769 > parameter_type, constructor = cls._known_urns[proto.spec.urn] 771 return > constructor( 772 proto, 773 > proto_utils.parse_Bytes(proto.spec.payload, parameter_type), 774 > context) > KeyError: 'beam:transform:org.apache.beam:kafka_read_with_metadata:v2' > > >