Hi Lydia, The LOOPBACK environment is nothing but an EXTERNAL environment with automated setup of SDK Harness process manager in the pipeline submission process. The LOOPBACK environment setup a server on a random port to start the SDK Harness process on a random port. This is what you are probably hitting at the moment.
[1] You can overcome this particular issue by using EXTERNAL environment directly and manually start https://github.com/apache/beam/blob/e439f4120ef4c25aa36e5b03756dc7391bdbd211/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L201 with the same port. However, I think the next problem that you will hit will be ports used by the actual SDK Harness process to communicate with Flink Tasks (Runner Harness). My recommendation would be to start the EXTERNAL environment mentioned in [1] inside the Flink taskmanager docker container though this doesn't fit single process per docker container model. Thanks, Ankur On Mon, 14 Nov 2022 at 02:30, Lydian <lydia...@gmail.com> wrote: > Hi, > > I am working on implementing a local beam flink runner for faster > development. > I have made the docker image which contains the required flink and Beam > dependencies, and then launched different containers:Job manager, task > manager and beam job server, via docker-compose. I am using the bridge mode > (because docker doesn't support "host" network in mac) and exposed all the > related ports to localhost. > > The test pipeline is written in python and runs as Portable runner, but > setting up the `--environment-type` to be `LOOPBACK` so that it uses my > local python code to run the change. (Our pipeline is written in python, > but we need to use cross language for accessing data from Kafka) > > Here's my understanding on what would happened > 1. Start my short python code with the following arg: > ``` > '--streaming', > '--runner=portableRunner', > '--environment_type=LOOPBACK', > '--job_endpoint=localhost:8099', > '--artifact_endpoint=localhost:8098', > '--defaultEnvironmentType=EXTERNAL', > '--defaultEnvironmentConfig=host.docker.internal:5000', > ``` > 2. The job launches Beam Java Expansion Service with process mode, > because I am using this function: > ``` > ReadFromKafka( > consumer_config={"bootstrap.servers": "kafka:9092", > 'auto.offset.reset': 'earliest'}, > topics=["test.topic"], > with_metadata=False, > expansion_service=default_io_expansion_service( > append_args=[ > '--defaultEnvironmentType=PROCESS', > > "--defaultEnvironmentConfig={\"command\":\"/opt/apache/beam/java_boot\"}", > '--experiments=use_deprecated_read', > ] > ) > ) > ``` > 3. The job is then submitted to Beam Job Server > 4. The job server then submit the actual job to Flink Job Manager > 5. Flink Job manager distributes the work to Task Manager > 6. Task Manager launched a Java Worker > 7. Once the Java worker is done, it returns the processed content back to > original python process (because we are running in LOOPBACK) > > However, on the very last step, it failed to run because it looks like > LOOPBACK opened a random port on the localhost and I have no idea how to > make the Java Worker talk to the "Host" with the random port. > > I know the problem could be easily fixed by setting up network_mode to > host. However, we are using Mac for development, and the host network is > not supported for Docker on Mac. Wondering if anyone tried the same thing > before and if there's any suggested workaround for mac user? Thanks! > > I also have my script and infra in this gist [1], hopefully that would > make it easy to understand. Thanks! > > [1] https://gist.github.com/lydian/0db7614652c2ccdc733884134bf67f9b > > Sincerely, > Lydian Lee > >