We are cutting the release branch for 2.20.0 next wednesday, so not sure if
these tickets will make it, but hopefully.

For ref,
BEAM-9295 Add Flink 1.10 build target and Make FlinkRunner compatible with
Flink 1.10
BEAM-9299 Upgrade Flink Runner to 1.8.3 and 1.9.2

In any case if you have cycles to help test any of the related tickets  PRs
that would help too.


On Mon, Feb 24, 2020 at 8:47 AM Kaymak, Tobias <tobias.kay...@ricardo.ch>
wrote:

> Hi Kyle,
>
> thank you for creating the JIRA ticket, I think my best option right now
> is to wait for a Beam version that is running on Flink 1.10 then - unless
> there is a new Beam release around the corner :)
>
> Best,
> Tobi
>
> On Thu, Feb 20, 2020 at 11:52 PM Kyle Weaver <kcwea...@google.com> wrote:
>
>> Hi Tobi,
>>
>> This seems like a bug with Beam 2.19. I filed
>> https://issues.apache.org/jira/browse/BEAM-9345 to track the issue.
>>
>> > What puzzles me is that the session cluster should be allowed to have
>> multiple environments in detached mode - or am I wrong?
>>
>> It looks like that check is removed in Flink 1.10:
>> https://issues.apache.org/jira/browse/FLINK-15201
>>
>> Thanks for reporting.
>> Kyle
>>
>> On Thu, Feb 20, 2020 at 4:10 AM Kaymak, Tobias <tobias.kay...@ricardo.ch>
>> wrote:
>>
>>> Hello,
>>>
>>> I am trying to upgrade from a Flink session cluster 1.8 to 1.9 and from
>>> Beam 2.16.0 to 2.19.0.
>>> Everything went quite smoothly, the local runner and the local Flink
>>> runner work flawlessly.
>>>
>>> However when I:
>>>   1. Generate a Beam jar for the FlinkRunner via maven (mvn package
>>> -PFlinkRunner)
>>>   2. Glue that into a Flink 1.9 docker image
>>>   3. Start the image as a Standalone Session Cluster
>>>
>>> When I try to launch the first pipeline I get the following exception
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Failed to construct instance from factory method
>>> FlinkRunner#fromOptions(interface
>>> org.apache.beam.sdk.options.PipelineOptions)
>>>         at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
>>>         at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>>         at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>>>         at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>>>         at
>>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>>>         at
>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>>>         at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>>>         at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>>>         at
>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>>         at
>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
>>> Caused by: java.lang.RuntimeException: Failed to construct instance from
>>> factory method FlinkRunner#fromOptions(interface
>>> org.apache.beam.sdk.options.PipelineOptions)
>>>         at
>>> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
>>>         at
>>> org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:155)
>>>         at
>>> org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:55)
>>>         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
>>>         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
>>>         at
>>> ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:180)
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>         at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>         at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>>         at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>>         ... 9 more
>>> Caused by: java.lang.reflect.InvocationTargetException
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>         at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>         at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>>         at
>>> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:214)
>>>         ... 19 more
>>> Caused by: org.apache.flink.api.common.InvalidProgramException: Multiple
>>> environments cannot be created in detached mode
>>>         at
>>> org.apache.flink.client.program.ContextEnvironmentFactory.createExecutionEnvironment(ContextEnvironmentFactory.java:67)
>>>         at java.util.Optional.map(Optional.java:215)
>>>         at
>>> org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment(ExecutionEnvironment.java:1068)
>>>         at
>>> org.apache.beam.runners.flink.translation.utils.Workarounds.restoreOriginalStdOutAndStdErrIfApplicable(Workarounds.java:43)
>>>         at
>>> org.apache.beam.runners.flink.FlinkRunner.<init>(FlinkRunner.java:96)
>>>         at
>>> org.apache.beam.runners.flink.FlinkRunner.fromOptions(FlinkRunner.java:90)
>>>         ... 24 more
>>>
>>> I've checked the release notes and the issues and couldn't find anything
>>> that relates to this. What puzzles me is that the session cluster should be
>>> allowed to have multiple environments in detached mode - or am I wrong?
>>>
>>> Best,
>>> Tobi
>>>
>>
>
> --
>
> Tobias Kaymak
> Data Engineer
> Data Intelligence
>
> tobias.kay...@ricardo.ch
> www.ricardo.ch
> Theilerstrasse 1a, 6300 Zug
>

Reply via email to