hello
我在使用flinksql1.11写出数据到jdbc是遇到了field type类型不匹配的问题,是我类型设置有问题吗?
下面是我的异常日志以及sql文件
SET stream.enableCheckpointing=1000*60;
SET stream.setParallelism=3;
-- Kafka cdbp zdao source 表
create TABLE cloud_behavior_source(
operation STRING,
operation_channel STRING,
`time` STRING,
ip STRING,
lat STRING,
lng STRING,
user_id STRING,
device_id STRING,
imei STRING,
targets ARRAY<ROW<`type` STRING,`value` STRING>>,
product_name STRING,
product_version STRING,
product_vendor STRING,
platform STRING,
platform_version STRING,
`languaage` STRING,
locale STRING,
other_para MAP<STRING, STRING NULL>
) with (
'connector'='kafka',
'topic'='cloud_behavior',
'properties.bootstrap.servers'='',
'properties.group.id'='testGroup',
'format'='avro',
'scan.startup.mode'='earliest-offset'
);
-- Hbase zdao uv 统计 Sink 表
create TABLE cloud_behavior_sink(
operation STRING,
operation_channel STRING,
ip STRING,
lat STRING,
lng STRING,
user_id STRING,
device_id STRING
) with (
'connector'='jdbc',
'url'='jdbc:mysql://hosts:3306/d_bigdata',
'table-name'='flink_sql_test',
'username'='',
'password'='',
'sink.buffer-flush.max-rows'='100'
);
-- 业务过程
insert into cloud_behavior_sink
select
*
from cloud_behavior_source;
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/data1/flink/flink-1.11.1-log/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type
[org.apache.logging.slf4j.Log4jLoggerFactory]
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Field types of query result and registered TableSink
default_catalog.default_database.cloud_behavior_sink do not match.
Query schema: [operation: VARCHAR(2147483647), operation_channel:
VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647),
lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id:
VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei:
VARCHAR(2147483647), targets: ARRAY<ROW<`type` VARCHAR(2147483647),
`value` VARCHAR(2147483647)>>, product_name: VARCHAR(2147483647),
product_version: VARCHAR(2147483647), product_vendor: VARCHAR(2147483647),
platform: VARCHAR(2147483647), platform_version: VARCHAR(2147483647),
languaage: VARCHAR(2147483647), locale: VARCHAR(2147483647), other_para:
MAP<VARCHAR(2147483647), VARCHAR(2147483647)>]
Sink schema: [operation: VARCHAR(2147483647), operation_channel:
VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647), lng:
VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id:
VARCHAR(2147483647)]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.ValidationException: Field types of
query result and registered TableSink
default_catalog.default_database.cloud_behavior_sink do not match.
Query schema: [operation: VARCHAR(2147483647), operation_channel:
VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647),
lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id:
VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei:
VARCHAR(2147483647), targets: ARRAY<ROW<`type` VARCHAR(2147483647),
`value` VARCHAR(2147483647)>>, product_name: VARCHAR(2147483647),
product_version: VARCHAR(2147483647), product_vendor: VARCHAR(2147483647),
platform: VARCHAR(2147483647), platform_version: VARCHAR(2147483647),
languaage: VARCHAR(2147483647), locale: VARCHAR(2147483647), other_para:
MAP<VARCHAR(2147483647), VARCHAR(2147483647)>]
Sink schema: [operation: VARCHAR(2147483647), operation_channel:
VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647), lng:
VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id:
VARCHAR(2147483647)]
at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:100)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
at scala.Option.map(Option.scala:146)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:97)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:72)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:53)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
--
Sent from: http://apache-flink.147419.n8.nabble.com/