[ https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16579779#comment-16579779 ]
buptljy edited comment on FLINK-10119 at 8/16/18 9:55 AM: ---------------------------------------------------------- [~twalthr] So I am going to add two optional properties. 1. ignore-when-error: ignore the error line if this is configured to be true. 2. additinal-error-field: add error messages to the additional row and the others are null. was (Author: wind_ljy): [~twalthr] I don't agree about setting all fields null, it is too weired and doesn't make sense to pass an empty row. I think we should ignore this line. So I am going to add two optional properties. 1. ignore-when-error: ignore the error line if this is configured to be true. 2. additinal-error-field: add error messages to the additional row and the others are null. > JsonRowDeserializationSchema deserialize kafka message > ------------------------------------------------------ > > Key: FLINK-10119 > URL: https://issues.apache.org/jira/browse/FLINK-10119 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.5.1 > Environment: 无 > Reporter: sean.miao > Assignee: buptljy > Priority: Major > > Recently, we are using Kafka010JsonTableSource to process kafka's json > messages.We turned on checkpoint and auto-restart strategy . > We found that as long as the format of a message is not json, it will cause > the job to not be pulled up. Of course, this is to ensure that only once > processing or at least once processing, but the resulting application is not > available and has a greater impact on us. > the code is : > class : JsonRowDeserializationSchema > function : > @Override > public Row deserialize(byte[] message) throws IOException { > try > { final JsonNode root = objectMapper.readTree(message); return > convertRow(root, (RowTypeInfo) typeInfo); } > catch (Throwable t) > { throw new IOException("Failed to deserialize JSON object.", t); } > } > now ,i change it to : > public Row deserialize(byte[] message) throws IOException { > try > { JsonNode root = this.objectMapper.readTree(message); return > this.convertRow(root, (RowTypeInfo)this.typeInfo); } > catch (Throwable var4) { > message = this.objectMapper.writeValueAsBytes("{}"); > JsonNode root = this.objectMapper.readTree(message); > return this.convertRow(root, (RowTypeInfo)this.typeInfo); > } > } > > I think that data format errors are inevitable during network transmission, > so can we add a new column to the table for the wrong data format? like spark > sql does。 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)