sean.miao created FLINK-10119: --------------------------------- Summary: 存在数据非json格式,使用KafkaJsonTableSource的话,job无法拉起。 Key: FLINK-10119 URL: https://issues.apache.org/jira/browse/FLINK-10119 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.5.1 Environment: 无 Reporter: sean.miao
开启checkpoint和savepoint,同时开启了job的自动拉起。 flink从kafka消费数据,使用的是Kafka010JsonTableSource。发现只要有一条数据非json格式,就会导致应用挂掉无法拉起。 当前,这仅是满足了处理语义,但是导致应用不可以用就不太好了吧。能不能改成像spark sql一样,不满足格式的数据,增加到一个专门存储无法解析的数据的列里面。 我们目前的做法是 JsonRowDeserializationSchema @Override public Row deserialize(byte[] message) throws IOException { try { final JsonNode root = objectMapper.readTree(message); return convertRow(root, (RowTypeInfo) typeInfo); } catch (Throwable t) { throw new IOException("Failed to deserialize JSON object.", t); } } catch 里抛异常改成了传入一个 “{}”,会使得所有不能解析数据给所有列返回空值。 -- This message was sent by Atlassian JIRA (v7.6.3#76005)