Hi Lydian,

there are two parts involved.

 a) expansion service (which you run on port 8097) - this service expands the ReadFromKafka which is Java transform

 b) Java SDK environment, which is not the expansion service, it must be some environment that is able to run the Java ReadFromKafka transform. In flink, you can use PROCESS environment type (e.g. [1]), but there might be other options (e.g. DOCKER), see [2]

Hope this helps,

 Jan

[1] https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py

[2] https://beam.apache.org/documentation/runtime/sdk-harness-config/

On 9/20/22 10:45, Lydian wrote:
Hi,
I am using protable runner (flink) with python SDK.  I am on latest version of Beam (0.41). The job is running on kubernetes. I launched the job manager with sidecar container (using image: apache/beam_flink1.14_job_server:2.41.0) to start the expansion service with following command:
```
java
-cp /opt/apache/beam/jars/
org.apache.beam.sdk.expansion.service.ExpansionService
8097
--javaClassLookupAllowlistFile=*
--defaultEnvironmentType=EXTERNAL
--defaultEnvironmentConfig=localhost:8097
```
In the code I am doing:
```
ReadFromKafka(
consumer_config={
"bootstrap.servers": 'BROKER',
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.jaas.config": f'org.apache.kafka.common.security.scram.ScramLoginModule required username="{sasl_username}" password="{sasl_password}";',
},
topics=[self.options.topic],
with_metadata=False,
expansion_service="localhost:8097"
)
```
But it shows with error:
```
2022-09-20 08:36:36,549 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Impulse -> [3]Reading message from kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1) (da27593d9232b9781fa1db3fd49d228e) switched from INITIALIZING to FAILED on 10.0.69.250:35101-76f99c @ ip-10-0-69-250.ec2.internal (dataPort=43553). org.apache.flink.util.SerializedThrowable: org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException: UNIMPLEMENTED: Method not found: org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StartWorker at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050) ~[?:?] at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) ~[?:?] at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[?:?] at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) ~[?:?] at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) ~[?:?] at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451) ~[?:?] at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436) ~[?:?] at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303) ~[?:?] at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38) ~[?:?] at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202) ~[?:?] at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:249) ~[?:?] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
``` Does anyone know how I could fix this issue? Thanks!
Sincerely,
Lydian Lee

Reply via email to