Hi there, I have a Kafka topic where the schema of its values is defined by the "MyRecord" record in the following Avro IDL and registered to the Confluent Schema Registry:
@namespace("org.example") protocol MyProtocol { record MyRecord { string text; } } The topic is consumed with a KafkaSource and then then passed into StreamTableEnvironment. On the temporary view I want to run SQL queries. But the following exception is thrown on startup of the job: Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. >From line 0, column 0 to line 1, column 58: Cannot apply 'LIKE' to arguments of type 'LIKE(<RAW('ORG.APACHE.AVRO.UTIL.UTF8', '...')>, <CHAR(6)>)'. Supported form(s): 'LIKE(<STRING>, <STRING>, <STRING>)' at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704) at com.webtrekk.flink_avro_enum.kafka.ExampleFromKafkaAndSRWithStreams.main(ExampleFromKafkaAndSRWithStreams.java:27) Caused by: org.apache.calcite.runtime.CalciteContextException: >From line 0, column 0 to line 1, column 58: Cannot apply 'LIKE' to arguments of type 'LIKE(<RAW('ORG.APACHE.AVRO.UTIL.UTF8', '...')>, <CHAR(6)>)'. Supported form(s): 'LIKE(<STRING>, <STRING>, <STRING>)' at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4861) at org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:389) at org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:262) at org.apache.calcite.sql.fun.SqlLikeOperator.checkOperandTypes(SqlLikeOperator.java:104) at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444) at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4006) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:3998) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3338) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:151) ... 5 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply 'LIKE' to arguments of type 'LIKE(<RAW('ORG.APACHE.AVRO.UTIL.UTF8', '...')>, <CHAR(6)>)'. Supported form(s): 'LIKE(<STRING>, <STRING>, <STRING>)' at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560) ... 29 more It can be seen, that my attribute "text" is not treated as "String" rather as "RAW / UTF8" in the flink table environment. By default avro uses its own implementation of `CharSequence`, namely `UTF8` on deserializing records (also using Flink DataStream API). Is there a way to deserialize / convert that avro specific type into a real "String" when passing data from DataStreams API to Table API? Just for completion, one workaround would be to annotate the avro schema such that avro uses type "String" on deserialization under the hood. But for my case: The schema is already fixed and can not be changed easily. Please find a minimum on my github account: https://github.com/peterschrott/flinkStreamTableAvro Thanks & best, Peter