Hi! Sorry for misleading. I mean DataStream#process, see https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#process-org.apache.flink.streaming.api.functions.ProcessFunction-
Peter Schrott <peter.schrot...@googlemail.com> 于2021年10月19日周二 下午3:10写道: > Hi & thanks! > > DataStreamSource does not provide a method processRecord: > > https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/datastream/DataStreamSource.html > > Can you point me to the docs for that? > > Thanks, Peter > > On Tue, Oct 19, 2021 at 4:47 AM Caizhi Weng <tsreape...@gmail.com> wrote: > >> Hi! >> >> You can call streamSource.processRecord to change the CharSequence to a >> String, then change the stream to a table. >> >> Peter Schrott <peter.schrot...@googlemail.com> 于2021年10月18日周一 下午8:40写道: >> >>> Hi there, >>> >>> I have a Kafka topic where the schema of its values is defined by the >>> "MyRecord" record in the following Avro IDL and registered to the >>> Confluent >>> Schema Registry: >>> >>> @namespace("org.example") >>> protocol MyProtocol { >>> record MyRecord { >>> string text; >>> } >>> } >>> >>> The topic is consumed with a KafkaSource and then then passed into >>> StreamTableEnvironment. On the temporary view I want to run SQL queries. >>> >>> But the following exception is thrown on startup of the job: >>> >>> Exception in thread "main" org.apache.flink.table.api.ValidationException: >>> SQL validation failed. From line 0, column 0 to line 1, column 58: Cannot >>> apply 'LIKE' to arguments of type 'LIKE(<RAW('ORG.APACHE.AVRO.UTIL.UTF8', >>> '...')>, <CHAR(6)>)'. Supported form(s): 'LIKE(<STRING>, <STRING>, >>> <STRING>)' >>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>> <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156) >>> at >>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205) >>> at >>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704) >>> at >>> com.webtrekk.flink_avro_enum.kafka.ExampleFromKafkaAndSRWithStreams.main(ExampleFromKafkaAndSRWithStreams.java:27) >>> Caused by: org.apache.calcite.runtime.CalciteContextException: From >>> line 0, column 0 to line 1, column 58: Cannot apply 'LIKE' to arguments of >>> type 'LIKE(<RAW('ORG.APACHE.AVRO.UTIL.UTF8', '...')>, <CHAR(6)>)'. >>> Supported form(s): 'LIKE(<STRING>, <STRING>, <STRING>)' >>> at >>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>> Method) >>> at >>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >>> at >>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>> at >>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) >>> at >>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) >>> at >>> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) >>> at >>> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4861) >>> at >>> org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:389) >>> at >>> org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:262) >>> at >>> org.apache.calcite.sql.fun.SqlLikeOperator.checkOperandTypes(SqlLikeOperator.java:104) >>> at >>> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444) >>> at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697) >>> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4006) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:3998) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3338) >>> at >>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) >>> at >>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975) >>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704) >>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>> <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:151) >>> ... 5 more >>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: >>> Cannot apply 'LIKE' to arguments of type >>> 'LIKE(<RAW('ORG.APACHE.AVRO.UTIL.UTF8', '...')>, <CHAR(6)>)'. Supported >>> form(s): 'LIKE(<STRING>, <STRING>, <STRING>)' >>> at >>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>> Method) >>> at >>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >>> at >>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>> at >>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) >>> at >>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) >>> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560) >>> ... 29 more >>> >>> It can be seen, that my attribute "text" is not treated as "String" >>> rather as "RAW / UTF8" in the flink table environment. By default avro uses >>> its own implementation of `CharSequence`, namely `UTF8` on deserializing >>> records (also using Flink DataStream API). Is there a way to deserialize / >>> convert that avro specific type into a real "String" when passing data from >>> DataStreams API to Table API? >>> >>> Just for completion, one workaround would be to annotate the avro schema >>> such that avro uses type "String" on deserialization under the hood. But >>> for my case: The schema is already fixed and can not be changed easily. >>> >>> Please find a minimum on my github account: >>> https://github.com/peterschrott/flinkStreamTableAvro >>> >>> Thanks & best, Peter >>> >>