Python SDK's ReadFromKafka is an external transform implemented in Java,
which is similar to SqlTransform. InteractiveRunner doesn't support it.

That being said, if you want to implement an interactive interaction with
external transforms, you may follow the workaround for SQL (
https://cloud.google.com/dataflow/docs/guides/notebook-advanced#beam-sql).
The source code is
https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive/sql
.


Ning.

On Wed, Mar 6, 2024 at 9:50 PM Jaehyeon Kim <dott...@gmail.com> wrote:

> Hello,
>
> I'm playing with the interactive runner on a notebook and the flink runner
> is used as the underlying runner. I wonder if it can read messages from
> Kafka. I checked the example notebook
> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/examples/Interactive%20Beam%20Running%20on%20Flink.ipynb>
>  and
> it works. However I cannot read Kafka messages with the following error.
>
>  KeyError: 'beam:transform:org.apache.beam:kafka_read_with_metadata:v2'
>
> Cheers,
> Jaehyeon
>
> *Here is the source.*
>
> pipeline_opts = {
>     "job_name": "kafka-io",
>     "environment_type": "LOOPBACK",
>     "streaming": True,
>     "parallelism": 3,
>     "experiments": [
>         "use_deprecated_read"
>     ],  ## https://github.com/apache/beam/issues/20979
>     "checkpointing_interval": "60000",
> }
> options = PipelineOptions([], **pipeline_opts)
> # Required, else it will complain that when importing worker functions
> options.view_as(SetupOptions).save_main_session = True
>
> p = beam.Pipeline(
>     
> interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()),
> options=options
> )
> events = (
>     p
>     | "Read from Kafka"
>     >> kafka.ReadFromKafka(
>         consumer_config={
>             "bootstrap.servers": os.getenv(
>                 "BOOTSTRAP_SERVERS",
>                 "host.docker.internal:29092",
>             ),
>             "auto.offset.reset": "earliest",
>             # "enable.auto.commit": "true",
>             "group.id": "kafka-io",
>         },
>         topics=["website-visit"],
>     )
>     | "Decode messages" >> beam.Map(decode_message)
>     | "Parse elements" >> beam.Map(parse_json).with_output_types(EventLog)
> )
> results = p.run()
> result.wait_until_finish()
>
> *And here is the full error message.*
>
> WARNING:apache_beam.options.pipeline_options:Discarding invalid overrides: 
> {'checkpointing_interval': '60000'}
>
> ---------------------------------------------------------------------------KeyError
>                                   Traceback (most recent call last)
> Cell In[17], line 36     15 p = beam.Pipeline(     16     
> interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()),
>  options=options     17 )     18 events = (     19     p     20     | "Read 
> from Kafka"   (...)     34     | "Parse elements" >> 
> beam.Map(parse_json).with_output_types(EventLog)     35 )---> 36 results = 
> p.run()     37 result.wait_until_finish()     38 # 
> ib.options.recording_duration = "120s"     39 # ib.show(events)
>
> File 
> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:586
>  
> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=585>,
>  in Pipeline.run(self, test_runner_api)    584     finally:    585       
> shutil.rmtree(tmpdir)--> 586   return self.runner.run_pipeline(self, 
> self._options)    587 finally:    588   if not is_in_ipython():
>
> File 
> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/interactive_runner.py:148
>  
> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/interactive_runner.py#line=147>,
>  in InteractiveRunner.run_pipeline(self, pipeline, options)    145 if 
> isinstance(self._underlying_runner, FlinkRunner):    146   
> self.configure_for_flink(user_pipeline, options)--> 148 pipeline_instrument = 
> inst.build_pipeline_instrument(pipeline, options)    150 # The user_pipeline 
> analyzed might be None if the pipeline given has nothing    151 # to be 
> cached and tracing back to the user defined pipeline is impossible.    152 # 
> When it's None, there is no need to cache including the background    153 # 
> caching job and no result to track since no background caching job is    154 
> # started at all.    155 if user_pipeline:    156   # Should use the 
> underlying runner and run asynchronously.
>
> File 
> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py:756
>  
> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py#line=755>,
>  in build_pipeline_instrument(pipeline, options)    742 def 
> build_pipeline_instrument(pipeline, options=None):    743   """Creates 
> PipelineInstrument for a pipeline and its options with cache.    744     745  
>  Throughout the process, the returned PipelineInstrument snapshots the given  
>  (...)    754   runner pipeline to apply interactivity.    755   """--> 756   
> pi = PipelineInstrument(pipeline, options)    757   pi.preprocess()    758   
> pi.instrument()  # Instruments the pipeline only once.
>
> File 
> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py:71
>  
> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py#line=70>,
>  in PipelineInstrument.__init__(self, pipeline, options)     67 if 
> background_caching_job.has_source_to_cache(self._user_pipeline):     68   
> self._cache_manager = ie.current_env().get_cache_manager(     69       
> self._user_pipeline)---> 71 self._background_caching_pipeline = 
> beam.pipeline.Pipeline.from_runner_api(     72     pipeline.to_runner_api(), 
> pipeline.runner, options)     73 ie.current_env().add_derived_pipeline(     
> 74     self._pipeline, self._background_caching_pipeline)     76 # Snapshot 
> of original pipeline information.
>
> File 
> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1020
>  
> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1019>,
>  in Pipeline.from_runner_api(proto, runner, options, return_context)   1018 
> if proto.root_transform_ids:   1019   root_transform_id, = 
> proto.root_transform_ids-> 1020   p.transforms_stack = 
> [context.transforms.get_by_id(root_transform_id)]   1021 else:   1022   
> p.transforms_stack = [AppliedPTransform(None, None, '', None)]
>
> File 
> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114
>  
> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>,
>  in _PipelineContextMap.get_by_id(self, id)    111 def get_by_id(self, id):   
>  112   # type: (str) -> PortableObjectT    113   if id not in 
> self._id_to_obj:--> 114     self._id_to_obj[id] = 
> self._obj_type.from_runner_api(    115         self._id_to_proto[id], 
> self._pipeline_context)    116   return self._id_to_obj[id]
>
> File 
> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1456
>  
> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1455>,
>  in AppliedPTransform.from_runner_api(proto, context)   1454 result.parts = 
> []   1455 for transform_id in proto.subtransforms:-> 1456   part = 
> context.transforms.get_by_id(transform_id)   1457   part.parent = result   
> 1458   result.add_part(part)
>
> File 
> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114
>  
> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>,
>  in _PipelineContextMap.get_by_id(self, id)    111 def get_by_id(self, id):   
>  112   # type: (str) -> PortableObjectT    113   if id not in 
> self._id_to_obj:--> 114     self._id_to_obj[id] = 
> self._obj_type.from_runner_api(    115         self._id_to_proto[id], 
> self._pipeline_context)    116   return self._id_to_obj[id]
>
> File 
> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1456
>  
> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1455>,
>  in AppliedPTransform.from_runner_api(proto, context)   1454 result.parts = 
> []   1455 for transform_id in proto.subtransforms:-> 1456   part = 
> context.transforms.get_by_id(transform_id)   1457   part.parent = result   
> 1458   result.add_part(part)
>
> File 
> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py:114
>  
> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/pipeline_context.py#line=113>,
>  in _PipelineContextMap.get_by_id(self, id)    111 def get_by_id(self, id):   
>  112   # type: (str) -> PortableObjectT    113   if id not in 
> self._id_to_obj:--> 114     self._id_to_obj[id] = 
> self._obj_type.from_runner_api(    115         self._id_to_proto[id], 
> self._pipeline_context)    116   return self._id_to_obj[id]
>
> File 
> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:1426
>  
> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py#line=1425>,
>  in AppliedPTransform.from_runner_api(proto, context)   1419   
> side_input_tags = []   1421 main_inputs = {   1422     tag: 
> context.pcollections.get_by_id(id)   1423     for (tag, id) in 
> proto.inputs.items() if tag not in side_input_tags   1424 }-> 1426 transform 
> = ptransform.PTransform.from_runner_api(proto, context)   1427 if transform 
> and proto.environment_id:   1428   resource_hints = 
> context.environments.get_by_id(   1429       
> proto.environment_id).resource_hints()
>
> File 
> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py:769
>  
> <http://localhost:8888/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py#line=768>,
>  in PTransform.from_runner_api(cls, proto, context)    767 if proto is None 
> or proto.spec is None or not proto.spec.urn:    768   return None--> 769 
> parameter_type, constructor = cls._known_urns[proto.spec.urn]    771 return 
> constructor(    772     proto,    773     
> proto_utils.parse_Bytes(proto.spec.payload, parameter_type),    774     
> context)
> KeyError: 'beam:transform:org.apache.beam:kafka_read_with_metadata:v2'
>
>
>

Reply via email to