[ https://issues.apache.org/jira/browse/FLINK-16334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17048463#comment-17048463 ]
roncenzhao commented on FLINK-16334: ------------------------------------ [~libenchao] Thank you and I will close this issue. > flink-sql kafka-connector support ignore the invalid data during parsing > bytes to json row > ------------------------------------------------------------------------------------------ > > Key: FLINK-16334 > URL: https://issues.apache.org/jira/browse/FLINK-16334 > Project: Flink > Issue Type: Wish > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) > Affects Versions: 1.10.0 > Environment: flink1.10+kafka+json > Reporter: roncenzhao > Priority: Major > > We found that, if we create table like this: > > {code:java} > CREATE TABLE MyUserTable ( > id BIGINT, > name STRING > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'test_topic', > 'connector.properties.bootstrap.servers' = 'xxx', > 'connector.properties.zookeeper.connect' = 'xxx', > 'connector.properties.group.id' = 'g_test', > --'connector.startup-mode' = 'earliest-offset', > --'connector.startup-mode' = 'latest-offset', > 'connector.startup-mode' = 'group-offsets', > 'format.type' = 'json', > 'format.fail-on-missing-field' = 'false' > ); > {code} > If execute `select * from MyUserTable` and the current row is not json type, > the job will be failed and the offset of the consumer group will be reset to > the latest offset. > I think we should add some configuration like 'format.fail-on-missing-field' > e.g 'format.fail-on-invalid-json' to ignore current invalid row. > Looking forward to your reply! > -- This message was sent by Atlassian Jira (v8.3.4#803005)