Hello Maximilian,

I followed your guide with the wordcount.py example, to the step where I
connect to a remote Flink cluster from my laptop, and got stuck.
On the Flink server side, I saw that there's a connection established from
my laptop to the port 8081 of the server (shown in *netstat),* but nothing
showed up in Flink GUI console.
On my python console, I could see that my job state changed to *RUNNING*.
On the other side, the JobService, the last log entry is "*Submitting job
to...*"
Are there any logs that could help me debug?
Would that be any chance that the Flink job jar file is too big to be sent
to my Flink cluster?

More interesting, if I change the port in the command to start the
JobService to an invalid one (e.g: from 8081 to 8082), the output on my
python console as well as on the JobService console stayed the same.

I also have another question regarding writing my stream into parquet
files. As mentioned in this site,
https://beam.apache.org/documentation/io/built-in/, there's no file-based
connectors for Python streaming yet, does that mean I also need to use Java
for the IO?

Thanks and best regards,
Averell

Here's the output on my python console - it stuck there:

(beam_env) Averell-Macbook:wordcount Averell$ python wordcount.py
INFO:root:Using latest locally built Python SDK docker image.
INFO:root:==================== <function lift_combiners at 0x119935e60>
====================
INFO:root:==================== <function expand_sdf at 0x119935ed8>
====================
INFO:root:Job state changed to RUNNING


And here is the output on the JobService:

(beam_env) Averell-Macbook:beam Averell$ ./gradlew
:beam-runners-flink-1.7-job-server:runShadow -PflinkMasterUrl=
10.10.64.121:8081
Configuration on demand is an incubating feature.

> Task :beam-runners-flink-1.7-job-server:runShadow
Listening for transport dt_socket at address: 5005
[main] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
ArtifactStagingService started on localhost:8098
[main] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java
ExpansionService started on localhost:8097
[main] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver -
JobService started on localhost:8099
[grpc-default-executor-0] ERROR
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService -
Encountered Unexpected Exception for Invocation
job_28626ac7-5339-4694-84c5-7e85f3b51a0
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusException: NOT_FOUND
        at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status.asException(Status.java:534)
        at
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getInvocation(InMemoryJobService.java:341)
        at
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getStateStream(InMemoryJobService.java:262)
        at
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:770)
        at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171)
        at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
        at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
        at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
        at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
        at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)
        at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707)
        at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[grpc-default-executor-0] INFO
org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
your-wordcount-job_de5850b5-e92f-4179-bcff-19169554aaef
[grpc-default-executor-0] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting
job invocation your-wordcount-job_de5850b5-e92f-4179-bcff-19169554aaef
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to
Flink program.
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Batch
Execution Environment.
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink
Master URL 10.10.64.126:36215.
[flink-runner-job-invoker] WARN
org.apache.beam.runners.flink.FlinkExecutionEnvironments - No default
parallelism could be found. Defaulting to parallelism 1. Please set an
explicit parallelism with --parallelism
[flink-runner-job-invoker] INFO
org.apache.flink.api.java.ExecutionEnvironment - The job has 0 registered
types and 0 default Kryo serializers
[flink-runner-job-invoker] WARN
org.apache.flink.configuration.Configuration - Config uses deprecated
configuration key 'jobmanager.rpc.address' instead of proper key
'rest.address'
[flink-runner-job-invoker] INFO org.apache.flink.runtime.rest.RestClient -
Rest client endpoint started.
[flink-runner-job-invoker] INFO
org.apache.flink.client.program.rest.RestClusterClient - Submitting job
2ee00e434677116a298351eb77cdfaa4 (detached: false).
<============-> 98% EXECUTING [23m 3s]
> IDLE
> :beam-runners-flink-1.7-job-server:runShadow
> IDLE
> IDLE


On Sat, May 11, 2019 at 9:03 AM Averell Huyen Levan <lvhu...@gmail.com>
wrote:

> Hello Maximilian,
>
> Thanks for your help.
> The other part of my question was with running (Python) pipeline on
> Flink-cluster runner. I read that page
> https://beam.apache.org/documentation/runners/flink/ but felt confused.
> Will try one more time and then come back if I am still stuck with it.
>
> Again, thanks a lot for your help.
>
> Regards,
> Averell
>
> On Sat, 11 May 2019, 12:35 am Maximilian Michels, <m...@apache.org> wrote:
>
>> Hi Averell,
>>
>> What you want to do is possible today but at this point is an early
>> experimental feature. The reason for that is that Kafka is a
>> cross-language Java transform in a Python pipeline. We just recently
>> enabled cross-language pipelines.
>>
>> 1. First of all, until 2.13.0 is released you will have to use the
>>     latest master version, e.g.
>>     $ git clone https://github.com/apache/beam
>>
>> 2. Setup and activate a Python virtual environment:
>>     $ virtualenv ~/beam_environemnt
>>     $ source ~/beam_environment/bin/activate
>>
>> 4. Build the Python SDK:
>>     $ cd beam
>>     $ ./gradlew :beam-sdks-python:buildSnapshot
>>     $ cd sdks/python/build/
>>     $ unzip apache-beam-2.13.0.dev0.zip
>>     $ cd apache-beam-2.13.0.dev0
>>     $ python setup.py install
>>
>> 3. Start the Flink JobServer / ExpansionServer
>>     $ ./gradlew :beam-runners-flink-1.7-job-server:runShadow
>>
>> 5. Create your Python pipeline and use the ReadFromKafka transform, e.g.
>>
>>     options = ["--runner=PortableRunner",
>>                "--job_endpoint=localhost:8099"]
>>     p = Pipeline(options)
>>     (p
>>      |
>>      ReadFromKafka(consumer_config={'bootstrap.servers':
>>                                     'kafka_broker:port'},
>>                    topics=['myTopic'])
>>      |
>>      Map(lambda x,y: ...)
>>     p.run()
>>
>> 6. Run your file with the python command :)
>>
>> Note: If you do not set key_deserializer or value_serializer for
>> ReadFromKafka, you will receive the read data as KV[bytes, bytes]. That
>> means you have to perform decoding inside Python. If you set a Kafka
>> Deserializer, you can also receive the Kafka data already decoded.
>> However, you are limited to the coders in ModelCoders. For example, Int,
>> Long, KV, Iterable are supported; we also added String recently.
>>
>> Hope that makes sense. Curious to see how your experiments go.
>>
>> Cheers,
>> Max
>>
>> PS: The best resources are in the doc comments of kafka.py:
>>
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py
>> or here:
>> https://beam.apache.org/documentation/runners/flink/
>> https://beam.apache.org/roadmap/portability/
>>
>> On 10.05.19 14:33, lvhu...@gmail.com wrote:
>> > Hi everyone,
>> >
>> > I am trying to get started with Python on Flink-cluster runner, to
>> build a pipeline that reads data from Kafka and write to S3 in parquet
>> format.
>> > I tried to search on Beam website, but could not find any example (even
>> for the basic word count). E.g, in this page
>> https://beam.apache.org/get-started/wordcount-example/, in all Python -
>> Flink-cluster sections, there's no content but "This runner is not yet
>> available for the Python SDK."
>> >
>> > At this point in time, is that possible to create such a pipeline? From
>> all the slides / videos, it seems feasible. But, could you please lead me
>> to some step-by-step guide?
>> >
>> > Thanks and best regards,
>> > Averell
>> >
>>
>

Reply via email to