Hi Lydian,
there are two parts involved.
a) expansion service (which you run on port 8097) - this service
expands the ReadFromKafka which is Java transform
b) Java SDK environment, which is not the expansion service, it must
be some environment that is able to run the Java ReadFromKafka
transform. In flink, you can use PROCESS environment type (e.g. [1]),
but there might be other options (e.g. DOCKER), see [2]
Hope this helps,
Jan
[1]
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py
[2] https://beam.apache.org/documentation/runtime/sdk-harness-config/
On 9/20/22 10:45, Lydian wrote:
Hi,
I am using protable runner (flink) with python SDK. I am on latest
version of Beam (0.41).
The job is running on kubernetes. I launched the job manager with
sidecar container (using
image: apache/beam_flink1.14_job_server:2.41.0) to start the expansion
service with following command:
```
java
-cp /opt/apache/beam/jars/
org.apache.beam.sdk.expansion.service.ExpansionService
8097
--javaClassLookupAllowlistFile=*
--defaultEnvironmentType=EXTERNAL
--defaultEnvironmentConfig=localhost:8097
```
In the code I am doing:
```
ReadFromKafka(
consumer_config={
"bootstrap.servers": 'BROKER',
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.jaas.config":
f'org.apache.kafka.common.security.scram.ScramLoginModule required
username="{sasl_username}" password="{sasl_password}";',
},
topics=[self.options.topic],
with_metadata=False,
expansion_service="localhost:8097"
)
```
But it shows with error:
```
2022-09-20 08:36:36,549 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
Impulse -> [3]Reading message from
kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
KafkaIO.ReadSourceDescriptors} (1/1)
(da27593d9232b9781fa1db3fd49d228e) switched from INITIALIZING to
FAILED on 10.0.69.250:35101-76f99c @ ip-10-0-69-250.ec2.internal
(dataPort=43553).
org.apache.flink.util.SerializedThrowable:
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException:
UNIMPLEMENTED: Method not found:
org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StartWorker
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
~[?:?]
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
~[?:?]
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
~[?:?]
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
~[?:?]
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
~[?:?]
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)
~[?:?]
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)
~[?:?]
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
~[?:?]
at
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
~[?:?]
at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)
~[?:?]
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:249)
~[?:?]
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
``` Does anyone know how I could fix this issue? Thanks!
Sincerely,
Lydian Lee