yong yang created FLINK-32807: --------------------------------- Summary: when i use emitUpdateWithRetract of udtagg,bug error Key: FLINK-32807 URL: https://issues.apache.org/jira/browse/FLINK-32807 Project: Flink Issue Type: Bug Components: API / Scala Affects Versions: 1.17.1 Environment: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId> <artifactId>FlinkLocalDemo</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>FlinkLocalDemo</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.17.1</flink.version> <scala.binary.version>2.12</scala.binary.version> <scala.version>2.12.8</scala.version> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/com.chuusai/shapeless --> <dependency> <groupId>com.chuusai</groupId> <artifactId>shapeless_${scala.binary.version}</artifactId> <version>2.3.10</version> </dependency> <!-- https://mvnrepository.com/artifact/joda-time/joda-time --> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.12.5</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 --> <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.33</version> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> <!-- <version>1.2.17</version>--> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <!-- 引入flink1.13.0 scala2.12.12 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <!-- Either... --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> </dependency> <!-- or... --> <!-- 下面几个是代码中写sql需要的包 四个中一个都不能少 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> <!-- <scope>test</scope>--> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency> <!-- 注意: flink-table-planner-loader 不能和 flink-table-planner_${scala.binary.version} 共存--> <!-- <dependency>--> <!-- <groupId>org.apache.flink</groupId>--> <!-- <artifactId>flink-table-planner_${scala.binary.version}</artifactId>--> <!-- <version>${flink.version}</version>--> <!-- <scope>provided</scope>--> <!-- </dependency>--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>3.1.0-1.17</version> <scope>provided</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.11</version> </dependency> </dependencies> <build> <plugins> <!-- 打jar插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.5.5</version> <configuration> <!--这部分可有可无,加上的话则直接生成可运行jar包--> <!--<archive>--> <!--<manifest>--> <!--<mainClass>${exec.mainClass}</mainClass>--> <!--</manifest>--> <!--</archive>--> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>11</source> <target>11</target> </configuration> </plugin> </plugins> </build> </project> Reporter: yong yang Attachments: Top2WithRetract.scala, UdtaggDemo3.scala 参考: [https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/functions/udfs/#retraction-example] 我的代码: [^Top2WithRetract.scala] bug show error: ``` /Users/thomas990p/Library/Java/JavaVirtualMachines/corretto-11.0.20/Contents/Home/bin/java -javaagent:/Users/thomas990p/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/231.9161.38/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=56941:/Users/thomas990p/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/231.9161.38/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8 -classpath /Users/thomas990p/IdeaProjects/FlinkLocalDemo/target/classes:/Users/thomas990p/.m2/repository/com/chuusai/shapeless_2.12/2.3.10/shapeless_2.12-2.3.10.jar:/Users/thomas990p/.m2/repository/org/scala-lang/scala-library/2.12.15/scala-library-2.12.15.jar:/Users/thomas990p/.m2/repository/joda-time/joda-time/2.12.5/joda-time-2.12.5.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-avro/1.17.1/flink-avro-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/avro/avro/1.11.1/avro-1.11.1.jar:/Users/thomas990p/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.12.7/jackson-core-2.12.7.jar:/Users/thomas990p/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.12.7/jackson-databind-2.12.7.jar:/Users/thomas990p/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.12.7/jackson-annotations-2.12.7.jar:/Users/thomas990p/.m2/repository/org/apache/commons/commons-compress/1.21/commons-compress-1.21.jar:/Users/thomas990p/.m2/repository/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-runtime-web/1.17.1/flink-runtime-web-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-runtime/1.17.1/flink-runtime-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-rpc-core/1.17.1/flink-rpc-core-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-rpc-akka-loader/1.17.1/flink-rpc-akka-loader-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-queryable-state-client-java/1.17.1/flink-queryable-state-client-java-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-hadoop-fs/1.17.1/flink-hadoop-fs-1.17.1.jar:/Users/thomas990p/.m2/repository/commons-io/commons-io/2.11.0/commons-io-2.11.0.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-shaded-zookeeper-3/3.7.1-16.1/flink-shaded-zookeeper-3-3.7.1-16.1.jar:/Users/thomas990p/.m2/repository/org/apache/commons/commons-lang3/3.12.0/commons-lang3-3.12.0.jar:/Users/thomas990p/.m2/repository/org/apache/commons/commons-text/1.10.0/commons-text-1.10.0.jar:/Users/thomas990p/.m2/repository/org/javassist/javassist/3.24.0-GA/javassist-3.24.0-GA.jar:/Users/thomas990p/.m2/repository/org/xerial/snappy/snappy-java/1.1.8.3/snappy-java-1.1.8.3.jar:/Users/thomas990p/.m2/repository/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-shaded-netty/4.1.82.Final-16.1/flink-shaded-netty-4.1.82.Final-16.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-shaded-guava/30.1.1-jre-16.1/flink-shaded-guava-30.1.1-jre-16.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-shaded-jackson/2.13.4-16.1/flink-shaded-jackson-2.13.4-16.1.jar:/Users/thomas990p/.m2/repository/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36.jar:/Users/thomas990p/.m2/repository/com/alibaba/fastjson2/fastjson2/2.0.33/fastjson2-2.0.33.jar:/Users/thomas990p/.m2/repository/com/alibaba/fastjson/1.2.83/fastjson-1.2.83.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-table-common/1.17.1/flink-table-common-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-core/1.17.1/flink-core-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-annotations/1.17.1/flink-annotations-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-metrics-core/1.17.1/flink-metrics-core-1.17.1.jar:/Users/thomas990p/.m2/repository/com/esotericsoftware/kryo/kryo/2.24.0/kryo-2.24.0.jar:/Users/thomas990p/.m2/repository/com/esotericsoftware/minlog/minlog/1.2/minlog-1.2.jar:/Users/thomas990p/.m2/repository/org/objenesis/objenesis/2.1/objenesis-2.1.jar:/Users/thomas990p/.m2/repository/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-shaded-asm-9/9.3-16.1/flink-shaded-asm-9-9.3-16.1.jar:/Users/thomas990p/.m2/repository/com/ibm/icu/icu4j/67.1/icu4j-67.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-connector-kafka/1.17.1/flink-connector-kafka-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-connector-base/1.17.1/flink-connector-base-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/kafka/kafka-clients/3.2.3/kafka-clients-3.2.3.jar:/Users/thomas990p/.m2/repository/com/github/luben/zstd-jni/1.5.2-1/zstd-jni-1.5.2-1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-json/1.17.1/flink-json-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-shaded-force-shading/16.1/flink-shaded-force-shading-16.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-scala_2.12/1.17.1/flink-scala_2.12-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-java/1.17.1/flink-java-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/commons/commons-math3/3.6.1/commons-math3-3.6.1.jar:/Users/thomas990p/.m2/repository/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar:/Users/thomas990p/.m2/repository/org/scala-lang/scala-reflect/2.12.7/scala-reflect-2.12.7.jar:/Users/thomas990p/.m2/repository/org/scala-lang/scala-compiler/2.12.7/scala-compiler-2.12.7.jar:/Users/thomas990p/.m2/repository/org/scala-lang/modules/scala-xml_2.12/1.0.6/scala-xml_2.12-1.0.6.jar:/Users/thomas990p/.m2/repository/com/twitter/chill_2.12/0.7.6/chill_2.12-0.7.6.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-streaming-scala_2.12/1.17.1/flink-streaming-scala_2.12-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-streaming-java/1.17.1/flink-streaming-java-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-csv/1.17.1/flink-csv-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-table-api-java-bridge/1.17.1/flink-table-api-java-bridge-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-table-api-java/1.17.1/flink-table-api-java-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-table-api-bridge-base/1.17.1/flink-table-api-bridge-base-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-table-api-scala-bridge_2.12/1.17.1/flink-table-api-scala-bridge_2.12-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-table-api-scala_2.12/1.17.1/flink-table-api-scala_2.12-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-table-runtime/1.17.1/flink-table-runtime-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-cep/1.17.1/flink-cep-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-connector-files/1.17.1/flink-connector-files-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-file-sink-common/1.17.1/flink-file-sink-common-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-table-planner_2.12/1.17.1/flink-table-planner_2.12-1.17.1.jar:/Users/thomas990p/.m2/repository/org/immutables/value/2.8.8/value-2.8.8.jar:/Users/thomas990p/.m2/repository/org/immutables/value-annotations/2.8.8/value-annotations-2.8.8.jar:/Users/thomas990p/.m2/repository/org/codehaus/janino/commons-compiler/3.0.11/commons-compiler-3.0.11.jar:/Users/thomas990p/.m2/repository/org/codehaus/janino/janino/3.0.11/janino-3.0.11.jar:/Users/thomas990p/.m2/repository/org/apiguardian/apiguardian-api/1.1.2/apiguardian-api-1.1.2.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-clients/1.17.1/flink-clients-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-optimizer/1.17.1/flink-optimizer-1.17.1.jar:/Users/thomas990p/.m2/repository/commons-cli/commons-cli/1.5.0/commons-cli-1.5.0.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar:/Users/thomas990p/.m2/repository/mysql/mysql-connector-java/8.0.11/mysql-connector-java-8.0.11.jar:/Users/thomas990p/.m2/repository/com/google/protobuf/protobuf-java/2.6.0/protobuf-java-2.6.0.jar com.yy.udtaf.UdtaggDemo3 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find an implementation method 'emitValue' in class 'com.yy.udtaf.Top2WithRetract' for function '*com.yy.udtaf.Top2WithRetract*' that matches the following signature: void emitValue(com.yy.udtaf.Top2WithRetractAccumulator, org.apache.flink.util.Collector) at org.apache.flink.table.functions.UserDefinedFunctionHelper.validateClassForRuntime(UserDefinedFunctionHelper.java:323) at org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:496) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.$anonfun$checkNeededMethods$1(AggsHandlerCodeGenerator.scala:1234) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.$anonfun$checkNeededMethods$1$adapted(AggsHandlerCodeGenerator.scala:1234) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1234) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genEmitValue(AggsHandlerCodeGenerator.scala:1170) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateTableAggsHandler(AggsHandlerCodeGenerator.scala:443) at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupTableAggregate.translateToPlanInternal(StreamExecGroupTableAggregate.java:146) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161) at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161) at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257) at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:145) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:197) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1803) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:945) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422) at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:476) at com.yy.udtaf.UdtaggDemo3$.main(UdtaggDemo3.scala:36) at com.yy.udtaf.UdtaggDemo3.main(UdtaggDemo3.scala) Process finished with exit code 1 ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)