Hello. I am currently trying to get a Beam application to work on Flink version 1.8 that currently works on version 1.10. I realized that AWS kinesis does not support flink version 1.10 so I must downgrade. With that said when I downgrade I get a stacktrace that I don't really understand. Any help is appreciated! Thanks. Roger
*java.lang.VerifyError: Bad type on operand stackException Details: Location: org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator.translateNode(Lorg/apache/beam/sdk/transforms/PTransform;Lorg/apache/beam/runners/flink/FlinkStreamingTranslationContext;)V @467: invokespecial Reason: Type 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' (current frame, stack[4]) is not assignable to 'org/apache/flink/api/dag/Transformation' Current Frame:* *Application Details:* Beam Version: 2.21.0 Flink Version: 1.8.0 Full Stack Trace: gist <https://gist.github.com/roger-link/1ae9c0bc06ecfd4101775479670dcbcc> Java Application: gist <https://gist.github.com/roger-link/3a6c8c6c51181c00932dcee8c86b8ae2> Pom.xml: gist <https://gist.github.com/roger-link/821db196291063ec2b734c1567e37817> *Stacktrace:* /usr/local/Cellar/apache-flink/1.8.0/bin/flink run ./processing/beam/errorrate/target/forseti-errorrate-1.0-SNAPSHOT.jar --runner=FlinkRunner --awsRegion=us-west-2 --kafkaTopic=Forseti-Topic --kafkaBrokers=" b-2.forseti-processing-rog.nrpwiy.c2.kafka.us-west-2.amazonaws.com:9092, b-1.forseti-processing-rog.nrpwiy.c2.kafka.us-west-2.amazonaws.com:9092, b-4.forseti-processing-rog.nrpwiy.c2.kafka.us-west-2.amazonaws.com:9092" --kinesisStream=forseti-processing Starting execution of program java.lang.VerifyError: Bad type on operand stack Exception Details: Location: org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator.translateNode(Lorg/apache/beam/sdk/transforms/PTransform;Lorg/apache/beam/runners/flink/FlinkStreamingTranslationContext;)V @467: invokespecial Reason: Type 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' (current frame, stack[4]) is not assignable to 'org/apache/flink/api/dag/Transformation' Current Frame: bci: @467 flags: { } locals: { 'org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator', 'org/apache/beam/sdk/transforms/PTransform', 'org/apache/beam/runners/flink/FlinkStreamingTranslationContext', 'java/lang/String', 'org/apache/beam/sdk/values/PCollection', 'org/apache/beam/sdk/values/WindowingStrategy', 'org/apache/beam/sdk/coders/KvCoder', 'org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder', 'org/apache/flink/streaming/api/datastream/DataStream', 'org/apache/beam/sdk/util/WindowedValue$FullWindowedValueCoder', 'org/apache/beam/runners/flink/translation/types/CoderTypeInformation', 'org/apache/flink/streaming/api/datastream/DataStream', 'org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector', 'org/apache/flink/streaming/api/datastream/KeyedStream', 'org/apache/beam/sdk/transforms/CombineFnBase$GlobalCombineFn', 'org/apache/beam/runners/core/SystemReduceFn', 'org/apache/beam/sdk/coders/Coder', 'org/apache/flink/api/common/typeinfo/TypeInformation', 'java/util/List', 'org/apache/flink/api/java/tuple/Tuple2', 'org/apache/beam/sdk/values/TupleTag', 'org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator', 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' } stack: { uninitialized 455, uninitialized 455, 'org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator', 'org/apache/flink/streaming/api/environment/StreamExecutionEnvironment', 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' } Bytecode: 0x0000000: 2cb8 0065 4e2c 2bb6 003b c000 3d3a 0419 0x0000010: 04b6 0041 3a05 1904 b600 69c0 006b 3a06 0x0000020: 1906 b600 6e19 06b6 0071 1904 b600 41b6 0x0000030: 0047 b600 74b8 007a 3a07 2c19 04b6 007e 0x0000040: 3a08 1907 1904 b600 41b6 0047 b600 74b8 0x0000050: 0082 3a09 bb00 8459 1909 b700 873a 0a19 0x0000060: 08bb 0021 592c b600 8bb7 008e b600 9419 0x0000070: 0ab6 009a 129b b600 9f3a 0bbb 00a1 5919 0x0000080: 06b6 006e b700 a23a 0c19 0b19 0cb6 00a6 0x0000090: 3a0d 2bc0 001c b600 aa3a 0e19 06b6 006e 0x00000a0: 190e 1904 b600 aeb6 00b4 1906 b800 bab8 0x00000b0: 00c0 3a0f 2c2c 2bb6 00c3 c000 3db6 00c7 0x00000c0: 3a10 2c2c 2bb6 00c3 c000 3db6 00cb 3a11 0x00000d0: 2bc0 001c b600 513a 1219 12b9 0056 0100 0x00000e0: 9900 61bb 00cd 5912 cfb7 00d2 3a13 bb00 0x00000f0: d459 190f 2d19 0919 13b8 00d9 bb00 2459 0x0000100: 1913 1910 b700 dc19 05bb 00de 59b7 00df 0x0000110: b800 d92c b600 8b19 06b6 006e 190c b700 0x0000120: e23a 1419 0d2d 1911 1914 b600 e72d b600 0x0000130: ea3a 152c 2c2b b600 c319 15b6 00ee a700 0x0000140: af19 122c b800 fa3a 13bb 00cd 5912 cfb7 0x0000150: 00d2 3a14 bb00 d459 190f 2d19 0919 14b8 0x0000160: 00d9 bb00 2459 1914 1910 b700 dc19 0519 0x0000170: 13b4 0100 c001 0219 122c b600 8b19 06b6 0x0000180: 006e 190c b700 e23a 15bb 0104 5919 0db6 0x0000190: 0108 1913 b401 0bc0 0090 b601 0fb6 0110 0x00001a0: 2bb6 0114 1915 1911 190d b601 18b7 011b 0x00001b0: 3a16 1916 190d b601 1fb6 0123 1916 190d 0x00001c0: b601 2701 b601 2bbb 000b 592a 190d b601 0x00001d0: 2f19 16b7 0132 3a17 190d b601 2f19 16b6 0x00001e0: 0138 2c2c 2bb6 00c3 1917 b600 eeb1 Stackmap Table: full_frame(@321,{Object[#2],Object[#240],Object[#55],Object[#242],Object[#61],Object[#67],Object[#107],Object[#118],Object[#144],Object[#13],Object[#132],Object[#144],Object[#161],Object[#228],Object[#18],Object[#188],Object[#244],Object[#246],Object[#83]},{}) same_frame_extended(@493) at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators.<clinit>(FlinkStreamingTransformTranslators.java:156) at org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.enterCompositeTransform(FlinkStreamingPipelineTranslator.java:103) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317) at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251) at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463) at org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38) at org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:88) at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:116) at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:107) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) at com.cerner.forseti.processing.beam.errorrate.ErrorRate.runErrorRate(ErrorRate.java:113) at com.cerner.forseti.processing.beam.errorrate.ErrorRate.main(ErrorRate.java:54) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)