[ https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16582317#comment-16582317 ]
Timo Walther edited comment on FLINK-10119 at 8/16/18 10:15 AM: ---------------------------------------------------------------- [~wind_ljy] I think the problem with your option 1 is that the serialization schema interface does not allow ignoring lines right? Update: Sorry, I didn't know that deserialization schemas can return null. Sounds good to me. Regarding the properties. What about just one property: {{failure-handler: fail, ignore, error-field}} was (Author: twalthr): [~wind_ljy] I think the problem with your option 1 is that the serialization schema interface does not allow ignoring lines right? Update: Sorry, I didn't know that deserialization schemas can return null. Sounds good to me. Regarding the properties. What about just one property: {{failure-handler: fail, ignore, error-field}} > JsonRowDeserializationSchema deserialize kafka message > ------------------------------------------------------ > > Key: FLINK-10119 > URL: https://issues.apache.org/jira/browse/FLINK-10119 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.5.1 > Environment: 无 > Reporter: sean.miao > Assignee: buptljy > Priority: Major > > Recently, we are using Kafka010JsonTableSource to process kafka's json > messages.We turned on checkpoint and auto-restart strategy . > We found that as long as the format of a message is not json, it will cause > the job to not be pulled up. Of course, this is to ensure that only once > processing or at least once processing, but the resulting application is not > available and has a greater impact on us. > the code is : > class : JsonRowDeserializationSchema > function : > @Override > public Row deserialize(byte[] message) throws IOException { > try > { final JsonNode root = objectMapper.readTree(message); return > convertRow(root, (RowTypeInfo) typeInfo); } > catch (Throwable t) > { throw new IOException("Failed to deserialize JSON object.", t); } > } > now ,i change it to : > public Row deserialize(byte[] message) throws IOException { > try > { JsonNode root = this.objectMapper.readTree(message); return > this.convertRow(root, (RowTypeInfo)this.typeInfo); } > catch (Throwable var4) { > message = this.objectMapper.writeValueAsBytes("{}"); > JsonNode root = this.objectMapper.readTree(message); > return this.convertRow(root, (RowTypeInfo)this.typeInfo); > } > } > > I think that data format errors are inevitable during network transmission, > so can we add a new column to the table for the wrong data format? like spark > sql does。 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)