Whelp. I think I might have just solved my own problem. It looks like when I downgraded, I was still using flink-runner version 1.10 in my pom.xml. I downgraded this to 1.8 and it seems to have worked!
On Mon, Jun 15, 2020 at 12:41 PM Roger <roger.l...@gmail.com> wrote: > Hello. > I am currently trying to get a Beam application to work on Flink > version 1.8 that currently works on version 1.10. I realized that AWS > kinesis does not support flink version 1.10 so I must downgrade. With that > said when I downgrade I get a stacktrace that I don't really understand. > Any help is appreciated! Thanks. Roger > > > > > > > > *java.lang.VerifyError: Bad type on operand stackException Details: > Location: > org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator.translateNode(Lorg/apache/beam/sdk/transforms/PTransform;Lorg/apache/beam/runners/flink/FlinkStreamingTranslationContext;)V > @467: invokespecial Reason: Type > 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' > (current frame, stack[4]) is not assignable to > 'org/apache/flink/api/dag/Transformation' Current Frame:* > > *Application Details:* > Beam Version: 2.21.0 > Flink Version: 1.8.0 > Full Stack Trace: gist > <https://gist.github.com/roger-link/1ae9c0bc06ecfd4101775479670dcbcc> > Java Application: gist > <https://gist.github.com/roger-link/3a6c8c6c51181c00932dcee8c86b8ae2> > Pom.xml: gist > <https://gist.github.com/roger-link/821db196291063ec2b734c1567e37817> > > > *Stacktrace:* > /usr/local/Cellar/apache-flink/1.8.0/bin/flink run > ./processing/beam/errorrate/target/forseti-errorrate-1.0-SNAPSHOT.jar > --runner=FlinkRunner --awsRegion=us-west-2 --kafkaTopic=Forseti-Topic > --kafkaBrokers=" > b-2.forseti-processing-rog.nrpwiy.c2.kafka.us-west-2.amazonaws.com:9092, > b-1.forseti-processing-rog.nrpwiy.c2.kafka.us-west-2.amazonaws.com:9092, > b-4.forseti-processing-rog.nrpwiy.c2.kafka.us-west-2.amazonaws.com:9092" > --kinesisStream=forseti-processing > Starting execution of program > java.lang.VerifyError: Bad type on operand stack > Exception Details: > Location: > > org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator.translateNode(Lorg/apache/beam/sdk/transforms/PTransform;Lorg/apache/beam/runners/flink/FlinkStreamingTranslationContext;)V > @467: invokespecial > Reason: > Type > 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' > (current frame, stack[4]) is not assignable to > 'org/apache/flink/api/dag/Transformation' > Current Frame: > bci: @467 > flags: { } > locals: { > 'org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator', > 'org/apache/beam/sdk/transforms/PTransform', > 'org/apache/beam/runners/flink/FlinkStreamingTranslationContext', > 'java/lang/String', 'org/apache/beam/sdk/values/PCollection', > 'org/apache/beam/sdk/values/WindowingStrategy', > 'org/apache/beam/sdk/coders/KvCoder', > 'org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder', > 'org/apache/flink/streaming/api/datastream/DataStream', > 'org/apache/beam/sdk/util/WindowedValue$FullWindowedValueCoder', > 'org/apache/beam/runners/flink/translation/types/CoderTypeInformation', > 'org/apache/flink/streaming/api/datastream/DataStream', > 'org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector', > 'org/apache/flink/streaming/api/datastream/KeyedStream', > 'org/apache/beam/sdk/transforms/CombineFnBase$GlobalCombineFn', > 'org/apache/beam/runners/core/SystemReduceFn', > 'org/apache/beam/sdk/coders/Coder', > 'org/apache/flink/api/common/typeinfo/TypeInformation', 'java/util/List', > 'org/apache/flink/api/java/tuple/Tuple2', > 'org/apache/beam/sdk/values/TupleTag', > 'org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator', > 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' } > stack: { uninitialized 455, uninitialized 455, > 'org/apache/beam/runners/flink/FlinkStreamingTransformTranslators$CombinePerKeyTranslator', > 'org/apache/flink/streaming/api/environment/StreamExecutionEnvironment', > 'org/apache/flink/streaming/api/transformations/TwoInputTransformation' } > Bytecode: > 0x0000000: 2cb8 0065 4e2c 2bb6 003b c000 3d3a 0419 > 0x0000010: 04b6 0041 3a05 1904 b600 69c0 006b 3a06 > 0x0000020: 1906 b600 6e19 06b6 0071 1904 b600 41b6 > 0x0000030: 0047 b600 74b8 007a 3a07 2c19 04b6 007e > 0x0000040: 3a08 1907 1904 b600 41b6 0047 b600 74b8 > 0x0000050: 0082 3a09 bb00 8459 1909 b700 873a 0a19 > 0x0000060: 08bb 0021 592c b600 8bb7 008e b600 9419 > 0x0000070: 0ab6 009a 129b b600 9f3a 0bbb 00a1 5919 > 0x0000080: 06b6 006e b700 a23a 0c19 0b19 0cb6 00a6 > 0x0000090: 3a0d 2bc0 001c b600 aa3a 0e19 06b6 006e > 0x00000a0: 190e 1904 b600 aeb6 00b4 1906 b800 bab8 > 0x00000b0: 00c0 3a0f 2c2c 2bb6 00c3 c000 3db6 00c7 > 0x00000c0: 3a10 2c2c 2bb6 00c3 c000 3db6 00cb 3a11 > 0x00000d0: 2bc0 001c b600 513a 1219 12b9 0056 0100 > 0x00000e0: 9900 61bb 00cd 5912 cfb7 00d2 3a13 bb00 > 0x00000f0: d459 190f 2d19 0919 13b8 00d9 bb00 2459 > 0x0000100: 1913 1910 b700 dc19 05bb 00de 59b7 00df > 0x0000110: b800 d92c b600 8b19 06b6 006e 190c b700 > 0x0000120: e23a 1419 0d2d 1911 1914 b600 e72d b600 > 0x0000130: ea3a 152c 2c2b b600 c319 15b6 00ee a700 > 0x0000140: af19 122c b800 fa3a 13bb 00cd 5912 cfb7 > 0x0000150: 00d2 3a14 bb00 d459 190f 2d19 0919 14b8 > 0x0000160: 00d9 bb00 2459 1914 1910 b700 dc19 0519 > 0x0000170: 13b4 0100 c001 0219 122c b600 8b19 06b6 > 0x0000180: 006e 190c b700 e23a 15bb 0104 5919 0db6 > 0x0000190: 0108 1913 b401 0bc0 0090 b601 0fb6 0110 > 0x00001a0: 2bb6 0114 1915 1911 190d b601 18b7 011b > 0x00001b0: 3a16 1916 190d b601 1fb6 0123 1916 190d > 0x00001c0: b601 2701 b601 2bbb 000b 592a 190d b601 > 0x00001d0: 2f19 16b7 0132 3a17 190d b601 2f19 16b6 > 0x00001e0: 0138 2c2c 2bb6 00c3 1917 b600 eeb1 > Stackmap Table: > > full_frame(@321,{Object[#2],Object[#240],Object[#55],Object[#242],Object[#61],Object[#67],Object[#107],Object[#118],Object[#144],Object[#13],Object[#132],Object[#144],Object[#161],Object[#228],Object[#18],Object[#188],Object[#244],Object[#246],Object[#83]},{}) > same_frame_extended(@493) > > at > org.apache.beam.runners.flink.FlinkStreamingTransformTranslators.<clinit>(FlinkStreamingTransformTranslators.java:156) > at > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.enterCompositeTransform(FlinkStreamingPipelineTranslator.java:103) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251) > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463) > at > org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38) > at > org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:88) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:116) > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:107) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) > at > com.cerner.forseti.processing.beam.errorrate.ErrorRate.runErrorRate(ErrorRate.java:113) > at > com.cerner.forseti.processing.beam.errorrate.ErrorRate.main(ErrorRate.java:54) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) >