Ok I think I have an understanding of what happens - somehow.
Flink switched their RocksDB fork in the 1.8 release, this is why the
dependency must now be explicitly added to a project. [0]
I did both actually, adding this dependency to my projects pom (resulting
in beam_pipelines.jar) and to the lib directory of the Flink docker image
to execute the pipeline [1]:

FROM flink:1.8.0-scala_2.11
ADD --chown=flink:flink
http://central.maven.org/maven2/org/apache/flink/flink-statebackend-rocksdb_2.11/1.8.0/flink-statebackend-rocksdb_2.11-1.8.0.jar
/opt/flink/lib/flink-statebackend-rocksdb_2.11-1.8.0.jar
ADD --chown=flink:flink target/di-beam-bundled.jar
/opt/flink/lib/beam_pipelines.jar

Now everything works up the point when I hit the "Stop" button in the Flink
web interface. I think the dependency that the Beam Flink Runner has is
wrong as Flink switched to FRocksDB in 1.8 [2]. I guess that's why the
runner then hits the:
java.lang.NoSuchMethodError: org.rocksdb.ColumnFamilyHandle.
getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor;

But I might also be wrong, I am still investigating.

Best,
Tobi

[0]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#setting-the-per-job-state-backend
[1] https://hub.docker.com/_/flink
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.8.html#rocksdb-version-bump-and-switch-to-frocksdb-flink-10471

On Tue, Aug 13, 2019 at 2:50 PM Kaymak, Tobias <[email protected]>
wrote:

> This is a major issue for us as we are no longer able to do a
> clean-shutdown of the pipelines right now - only cancelling them hard is
> possible.
>
> On Tue, Aug 13, 2019 at 2:46 PM Kaymak, Tobias <[email protected]>
> wrote:
>
>> I just rolled out the upgraded and working 1.8.0/2.14.0 combination to
>> production and noticed that when I try to cleanly shutdown a pipeline via
>> the stop button in the web-interface of Flink 1.8.0 I get exactly the same
>> error:
>>
>> java.lang.NoSuchMethodError:
>> org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor;
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(RocksDBOperationUtils.java:160)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:331)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:362)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:470)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.tryDisposeAllOperators(StreamTask.java:454)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:337)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>> The pipeline then restores from the last snapshot and continues to run,
>> it does not shut-down as expected.
>>
>> Any idea why this could happen?
>>
>> On Mon, Aug 12, 2019 at 9:49 PM Kaymak, Tobias <[email protected]>
>> wrote:
>>
>>> * each time :)
>>>
>>> On Mon, Aug 12, 2019 at 9:48 PM Kaymak, Tobias <[email protected]>
>>> wrote:
>>>
>>>> I've checked multiple times now and it breaks as with the 1.8.1 image -
>>>> I've completely rebuilt the Docker image and teared down the testing
>>>> cluster.
>>>>
>>>> Best,
>>>> Tobi
>>>>
>>>> On Mon, Aug 12, 2019 at 1:45 PM Maximilian Michels <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Tobias!
>>>>>
>>>>> I've checked if there were any relevant changes to the RocksDB state
>>>>> backend in 1.8.1, but I couldn't spot anything. Could it be that an old
>>>>> version of RocksDB is still in the Flink cluster path?
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> On 06.08.19 16:43, Kaymak, Tobias wrote:
>>>>> > And of course the moment I click "send" I find that: 😂
>>>>> >
>>>>> > If you use Scala 2.11 and dependency version 1.8.0 in your Beam
>>>>> projects
>>>>> > pom.xml it *does* work:
>>>>> >
>>>>> >         <dependency>
>>>>> >             <groupId>org.apache.flink</groupId>
>>>>> >             <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
>>>>> >             <version>1.8.0</version>
>>>>> >         </dependency>
>>>>> >
>>>>> > However, if you want to use 1.8.1 - it *does not*.
>>>>> >
>>>>> > I still found it confusing, as I am using the official Flink Docker
>>>>> > images which are currently at version 1.8.1. It would have helped me
>>>>> if
>>>>> > Beam would bundle the statebackend dependency (as already mentioned
>>>>> Beam
>>>>> > allows the user to set a state backend via parameters of the
>>>>> Flink Runner).
>>>>> >
>>>>> > On Tue, Aug 6, 2019 at 4:35 PM Kaymak, Tobias <
>>>>> [email protected]
>>>>> > <mailto:[email protected]>> wrote:
>>>>> >
>>>>> >     Hello,
>>>>> >
>>>>> >     Flink requires in version 1.8, that if one wants to use RocksDB
>>>>> as a
>>>>> >     state backend, that dependency has to be added to the
>>>>> pom.xml file. [0]
>>>>> >
>>>>> >     My cluster stopped working with RocksDB so I did added this
>>>>> >     dependency to the pom.xml of my Beam project (I've tried 1.8.1
>>>>> and
>>>>> >     1.8.0):
>>>>> >
>>>>> >           <dependency>
>>>>> >                 <groupId>org.apache.flink</groupId>
>>>>> >
>>>>> <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
>>>>> >                 <version>1.8.0</version>
>>>>> >             </dependency>
>>>>> >
>>>>> >     I also tried to instead add
>>>>> >     the flink-statebackend-rocksdb_2.11-1.8.0.jar to the lib
>>>>> directory
>>>>> >     of the Flink cluster instead (TaskManagers and JobManager) in all
>>>>> >     cases I get this error:
>>>>> >
>>>>> >     2019-08-06 14:14:15,670 ERROR
>>>>> >     org.apache.flink.streaming.runtime.tasks.StreamTask           -
>>>>> >     Error during disposal of stream operator
>>>>> >     java.lang.NoSuchMethodError:
>>>>> >
>>>>> org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor;
>>>>> >       at
>>>>> >
>>>>> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(RocksDBOperationUtils.java:160)
>>>>> >       at
>>>>> >
>>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:331)
>>>>> >       at
>>>>> >
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:362)
>>>>> >       at
>>>>> >
>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:470)
>>>>> >       at
>>>>> >
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
>>>>> >       at
>>>>> >
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)
>>>>> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>>> >       at java.lang.Thread.run(Thread.java:748)
>>>>> >
>>>>> >     This looks like a version mismatch to me, but I don't know how to
>>>>> >     solve it - could Beam maybe include the dependency for the
>>>>> RocksDB
>>>>> >     backend for Flink 1.8 or higher, as it allows to set this value
>>>>> via
>>>>> >     parameters for the Flink Runner? [1]
>>>>> >
>>>>> >
>>>>> >     [0]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/state_backends.html#setting-the-per-job-state-backend
>>>>> >     [1]
>>>>> https://beam.apache.org/documentation/runners/flink/#pipeline-options-for-the-flink-runner
>>>>> >
>>>>>
>>>>>

Reply via email to