Hello Maximilian, I followed your guide with the wordcount.py example, to the step where I connect to a remote Flink cluster from my laptop, and got stuck. On the Flink server side, I saw that there's a connection established from my laptop to the port 8081 of the server (shown in *netstat),* but nothing showed up in Flink GUI console. On my python console, I could see that my job state changed to *RUNNING*. On the other side, the JobService, the last log entry is "*Submitting job to...*" Are there any logs that could help me debug? Would that be any chance that the Flink job jar file is too big to be sent to my Flink cluster?
More interesting, if I change the port in the command to start the JobService to an invalid one (e.g: from 8081 to 8082), the output on my python console as well as on the JobService console stayed the same. I also have another question regarding writing my stream into parquet files. As mentioned in this site, https://beam.apache.org/documentation/io/built-in/, there's no file-based connectors for Python streaming yet, does that mean I also need to use Java for the IO? Thanks and best regards, Averell Here's the output on my python console - it stuck there: (beam_env) Averell-Macbook:wordcount Averell$ python wordcount.py INFO:root:Using latest locally built Python SDK docker image. INFO:root:==================== <function lift_combiners at 0x119935e60> ==================== INFO:root:==================== <function expand_sdf at 0x119935ed8> ==================== INFO:root:Job state changed to RUNNING And here is the output on the JobService: (beam_env) Averell-Macbook:beam Averell$ ./gradlew :beam-runners-flink-1.7-job-server:runShadow -PflinkMasterUrl= 10.10.64.121:8081 Configuration on demand is an incubating feature. > Task :beam-runners-flink-1.7-job-server:runShadow Listening for transport dt_socket at address: 5005 [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - ArtifactStagingService started on localhost:8098 [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java ExpansionService started on localhost:8097 [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - JobService started on localhost:8099 [grpc-default-executor-0] ERROR org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - Encountered Unexpected Exception for Invocation job_28626ac7-5339-4694-84c5-7e85f3b51a0 org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusException: NOT_FOUND at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status.asException(Status.java:534) at org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getInvocation(InMemoryJobService.java:341) at org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getStateStream(InMemoryJobService.java:262) at org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:770) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job your-wordcount-job_de5850b5-e92f-4179-bcff-19169554aaef [grpc-default-executor-0] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting job invocation your-wordcount-job_de5850b5-e92f-4179-bcff-19169554aaef [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to Flink program. [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Batch Execution Environment. [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink Master URL 10.10.64.126:36215. [flink-runner-job-invoker] WARN org.apache.beam.runners.flink.FlinkExecutionEnvironments - No default parallelism could be found. Defaulting to parallelism 1. Please set an explicit parallelism with --parallelism [flink-runner-job-invoker] INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 0 registered types and 0 default Kryo serializers [flink-runner-job-invoker] WARN org.apache.flink.configuration.Configuration - Config uses deprecated configuration key 'jobmanager.rpc.address' instead of proper key 'rest.address' [flink-runner-job-invoker] INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started. [flink-runner-job-invoker] INFO org.apache.flink.client.program.rest.RestClusterClient - Submitting job 2ee00e434677116a298351eb77cdfaa4 (detached: false). <============-> 98% EXECUTING [23m 3s] > IDLE > :beam-runners-flink-1.7-job-server:runShadow > IDLE > IDLE On Sat, May 11, 2019 at 9:03 AM Averell Huyen Levan <lvhu...@gmail.com> wrote: > Hello Maximilian, > > Thanks for your help. > The other part of my question was with running (Python) pipeline on > Flink-cluster runner. I read that page > https://beam.apache.org/documentation/runners/flink/ but felt confused. > Will try one more time and then come back if I am still stuck with it. > > Again, thanks a lot for your help. > > Regards, > Averell > > On Sat, 11 May 2019, 12:35 am Maximilian Michels, <m...@apache.org> wrote: > >> Hi Averell, >> >> What you want to do is possible today but at this point is an early >> experimental feature. The reason for that is that Kafka is a >> cross-language Java transform in a Python pipeline. We just recently >> enabled cross-language pipelines. >> >> 1. First of all, until 2.13.0 is released you will have to use the >> latest master version, e.g. >> $ git clone https://github.com/apache/beam >> >> 2. Setup and activate a Python virtual environment: >> $ virtualenv ~/beam_environemnt >> $ source ~/beam_environment/bin/activate >> >> 4. Build the Python SDK: >> $ cd beam >> $ ./gradlew :beam-sdks-python:buildSnapshot >> $ cd sdks/python/build/ >> $ unzip apache-beam-2.13.0.dev0.zip >> $ cd apache-beam-2.13.0.dev0 >> $ python setup.py install >> >> 3. Start the Flink JobServer / ExpansionServer >> $ ./gradlew :beam-runners-flink-1.7-job-server:runShadow >> >> 5. Create your Python pipeline and use the ReadFromKafka transform, e.g. >> >> options = ["--runner=PortableRunner", >> "--job_endpoint=localhost:8099"] >> p = Pipeline(options) >> (p >> | >> ReadFromKafka(consumer_config={'bootstrap.servers': >> 'kafka_broker:port'}, >> topics=['myTopic']) >> | >> Map(lambda x,y: ...) >> p.run() >> >> 6. Run your file with the python command :) >> >> Note: If you do not set key_deserializer or value_serializer for >> ReadFromKafka, you will receive the read data as KV[bytes, bytes]. That >> means you have to perform decoding inside Python. If you set a Kafka >> Deserializer, you can also receive the Kafka data already decoded. >> However, you are limited to the coders in ModelCoders. For example, Int, >> Long, KV, Iterable are supported; we also added String recently. >> >> Hope that makes sense. Curious to see how your experiments go. >> >> Cheers, >> Max >> >> PS: The best resources are in the doc comments of kafka.py: >> >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py >> or here: >> https://beam.apache.org/documentation/runners/flink/ >> https://beam.apache.org/roadmap/portability/ >> >> On 10.05.19 14:33, lvhu...@gmail.com wrote: >> > Hi everyone, >> > >> > I am trying to get started with Python on Flink-cluster runner, to >> build a pipeline that reads data from Kafka and write to S3 in parquet >> format. >> > I tried to search on Beam website, but could not find any example (even >> for the basic word count). E.g, in this page >> https://beam.apache.org/get-started/wordcount-example/, in all Python - >> Flink-cluster sections, there's no content but "This runner is not yet >> available for the Python SDK." >> > >> > At this point in time, is that possible to create such a pipeline? From >> all the slides / videos, it seems feasible. But, could you please lead me >> to some step-by-step guide? >> > >> > Thanks and best regards, >> > Averell >> > >> >