[ https://issues.apache.org/jira/browse/BEAM-12586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17475210#comment-17475210 ]
Eugene Moolman commented on BEAM-12586: --------------------------------------- Not sure how this bot works but there has been comments recently? > Python Direct Runner doesn't support both streaming & non streaming sources > --------------------------------------------------------------------------- > > Key: BEAM-12586 > URL: https://issues.apache.org/jira/browse/BEAM-12586 > Project: Beam > Issue Type: Bug > Components: runner-direct, sdk-py-core > Affects Versions: 2.30.0 > Reporter: Christophe Rodriguez > Priority: P2 > Labels: stale-P2 > > Please see Stack Overflow discussion: > [https://stackoverflow.com/questions/68125864/transform-node-appliedptransform-was-not-replaced-as-expected-error-with-the-dir] > When I create a GCS source & a Pub Source and try to flatten both, there is > an error because of some incompatible transformation done by the direct > runner. > Code example: > {code:java} > gcsEventsColl = p | "Read from GCS" >> > beam.io.ReadFromText("gs://sample_events_for_beam/*.log") \ > | 'convert to dict' >> beam.Map(lambda x: json.loads(x)) > liveEventsColl = p | "Read from Pubsub" >> > beam.io.ReadFromPubSub(topic="projects/axxxx/topics/input_topic") \ > | 'convert to dict2' >> beam.Map(lambda x: json.loads(x)) > input_rec = (gcsEventsColl, liveEventsColl) | 'flatten' >> beam.Flatten() > {code} > Error: > {code:java} > File > "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 564, in run > return self.runner.run_pipeline(self, self._options) > File > "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", > line 131, in run_pipeline > return runner.run_pipeline(pipeline, options) > File > "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", > line 529, in run_pipeline > pipeline.replace_all(_get_transform_overrides(options)) > File > "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 504, in replace_all > self._check_replacement(override) > File > "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 478, in _check_replacement > self.visit(ReplacementValidator()) > File > "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 611, in visit > self._root_transform().visit(visitor, self, visited) > File > "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 1195, in visit > part.visit(visitor, pipeline, visited) > File > "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 1195, in visit > part.visit(visitor, pipeline, visited) > File > "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 1195, in visit > part.visit(visitor, pipeline, visited) [Previous line repeated 4 more > times] > File > "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 1198, in visit > visitor.visit_transform(self) > File > "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 476, in visit_transform > transform_node) RuntimeError: Transform node AppliedPTransform(Read from > GCS/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey, > _GroupByKeyOnly) was not replaced as expected. > {code} > The direct runner corrupts the pipeline when it rewrites the transforms. > -- This message was sent by Atlassian Jira (v8.20.1#820001)