Hi there,

I have a Kafka topic where the schema of its values is defined by the
"MyRecord" record in the following Avro IDL and registered to the Confluent
Schema Registry:

@namespace("org.example")
protocol MyProtocol {
   record MyRecord {
      string text;
   }
}

The topic is consumed with a KafkaSource and then then passed into
StreamTableEnvironment. On the temporary view I want to run SQL queries.

But the following exception is thrown on startup of the job:

Exception in thread "main"
org.apache.flink.table.api.ValidationException: SQL validation failed.
>From line 0, column 0 to line 1, column 58: Cannot apply 'LIKE' to
arguments of type 'LIKE(<RAW('ORG.APACHE.AVRO.UTIL.UTF8', '...')>,
<CHAR(6)>)'. Supported form(s): 'LIKE(<STRING>, <STRING>, <STRING>)'
      at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
<http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
      at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
      at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
      at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
      at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
      at 
com.webtrekk.flink_avro_enum.kafka.ExampleFromKafkaAndSRWithStreams.main(ExampleFromKafkaAndSRWithStreams.java:27)
      Caused by: org.apache.calcite.runtime.CalciteContextException:
>From line 0, column 0 to line 1, column 58: Cannot apply 'LIKE' to
arguments of type 'LIKE(<RAW('ORG.APACHE.AVRO.UTIL.UTF8', '...')>,
<CHAR(6)>)'. Supported form(s): 'LIKE(<STRING>, <STRING>, <STRING>)'
      at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
      at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
      at 
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
      at 
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
      at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
      at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
      at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
      at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4861)
      at 
org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:389)
      at 
org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:262)
      at 
org.apache.calcite.sql.fun.SqlLikeOperator.checkOperandTypes(SqlLikeOperator.java:104)
      at 
org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
      at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531)
      at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
      at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
      at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
      at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
      at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
      at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4006)
      at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:3998)
      at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3338)
      at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
      at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
      at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
      at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
      at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
      at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
      at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
      at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
<http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:151)
      ... 5 more
      Caused by:
org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply
'LIKE' to arguments of type 'LIKE(<RAW('ORG.APACHE.AVRO.UTIL.UTF8',
'...')>, <CHAR(6)>)'. Supported form(s): 'LIKE(<STRING>, <STRING>,
<STRING>)'
      at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
      at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
      at 
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
      at 
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
      at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
      at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
      ... 29 more

It can be seen, that my attribute "text" is not treated as "String" rather
as "RAW / UTF8" in the flink table environment. By default avro uses its
own implementation of `CharSequence`, namely `UTF8` on deserializing
records (also using Flink DataStream API). Is there a way to deserialize /
convert that avro specific type into a real "String" when passing data from
DataStreams API to Table API?

Just for completion, one workaround would be to annotate the avro schema
such that avro uses type "String" on deserialization under the hood. But
for my case: The schema is already fixed and can not be changed easily.

Please find a minimum on my github account:
https://github.com/peterschrott/flinkStreamTableAvro

Thanks & best, Peter

Reply via email to