sean.miao created FLINK-10119:
---------------------------------

             Summary: 存在数据非json格式,使用KafkaJsonTableSource的话,job无法拉起。
                 Key: FLINK-10119
                 URL: https://issues.apache.org/jira/browse/FLINK-10119
             Project: Flink
          Issue Type: Bug
          Components: Kafka Connector
    Affects Versions: 1.5.1
         Environment: 无
            Reporter: sean.miao


开启checkpoint和savepoint,同时开启了job的自动拉起。

flink从kafka消费数据,使用的是Kafka010JsonTableSource。发现只要有一条数据非json格式,就会导致应用挂掉无法拉起。

当前,这仅是满足了处理语义,但是导致应用不可以用就不太好了吧。能不能改成像spark sql一样,不满足格式的数据,增加到一个专门存储无法解析的数据的列里面。

 

我们目前的做法是

JsonRowDeserializationSchema

@Override
public Row deserialize(byte[] message) throws IOException {
 try {
 final JsonNode root = objectMapper.readTree(message);
 return convertRow(root, (RowTypeInfo) typeInfo);
 } catch (Throwable t) {
 throw new IOException("Failed to deserialize JSON object.", t);
 }
}

catch 里抛异常改成了传入一个 “{}”,会使得所有不能解析数据给所有列返回空值。



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to