[ https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
buptljy reassigned FLINK-10119: ------------------------------- Assignee: buptljy > JsonRowDeserializationSchema deserialize kafka message > ------------------------------------------------------ > > Key: FLINK-10119 > URL: https://issues.apache.org/jira/browse/FLINK-10119 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.5.1 > Environment: 无 > Reporter: sean.miao > Assignee: buptljy > Priority: Major > > Recently, we are using Kafka010JsonTableSource to process kafka's json > messages.We turned on checkpoint and auto-restart strategy . > We found that as long as the format of a message is not json, it will cause > the job to not be pulled up. Of course, this is to ensure that only once > processing or at least once processing, but the resulting application is not > available and has a greater impact on us. > the code is : > class : JsonRowDeserializationSchema > function : > @Override > public Row deserialize(byte[] message) throws IOException { > try > { final JsonNode root = objectMapper.readTree(message); return > convertRow(root, (RowTypeInfo) typeInfo); } > catch (Throwable t) > { throw new IOException("Failed to deserialize JSON object.", t); } > } > now ,i change it to : > public Row deserialize(byte[] message) throws IOException { > try > { JsonNode root = this.objectMapper.readTree(message); return > this.convertRow(root, (RowTypeInfo)this.typeInfo); } > catch (Throwable var4) { > message = this.objectMapper.writeValueAsBytes("{}"); > JsonNode root = this.objectMapper.readTree(message); > return this.convertRow(root, (RowTypeInfo)this.typeInfo); > } > } > > I think that data format errors are inevitable during network transmission, > so can we add a new column to the table for the wrong data format? like spark > sql does。 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)