[ https://issues.apache.org/jira/browse/FLINK-19244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu updated FLINK-19244: ---------------------------- Summary: CSV format can't deserialize null ROW field (was: CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.) > CSV format can't deserialize null ROW field > ------------------------------------------- > > Key: FLINK-19244 > URL: https://issues.apache.org/jira/browse/FLINK-19244 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) > Affects Versions: 1.11.2 > Reporter: Ying Z > Assignee: Ying Z > Priority: Major > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > CREATE TABLE csv_table ( > f0 ROW<f0c0 VARCHAR, f0c1 VARCHAR>, > f1 ROW<f1c0 INT, f1c1 VARCHAR> > ) > If f0 is null and f1c0=123, f1c1=456, the serialized data will be: ,123;456 > When deserialize the data, the jsonNode of f0 would be [], then throws cast > exception: Row length mismatch. 2 fields expected but was 0. > In the real scene, I set two streams: > First, read json_table, sink to csv_table, which has the schema above. > Then, read csv_table, do sth. > if json is \{"f0": null, "f1": {"f1c0": 123, "f1c1": 456}}, the second > streams failed with the exception. > If this is a bug, I want to help to fix this and unittests. > > here is the test code: > {code:java} > // code placeholder > val subDataType0 = ROW( > FIELD("f0c0", STRING()), > FIELD("f0c1", STRING()) > ) > val subDataType1 = ROW( > FIELD("f1c0", INT()), > FIELD("f1c1", INT()) > ) > val datatype = ROW( > FIELD("f0", subDataType0), > FIELD("f1", subDataType1)) > val rowType = datatype.getLogicalType.asInstanceOf[RowType] > val serSchema = new CsvRowDataSerializationSchema.Builder(rowType).build() > val deserSchema = new CsvRowDataDeserializationSchema.Builder(rowType, new > RowDataTypeInfo(rowType)).build() > def foo(r: RowData): Unit = { > val serData = new String(serSchema.serialize(r)) > print(s"${serData}") > val deserRow = deserSchema.deserialize(serData.getBytes) > println(s"${deserRow}") > } > val normalRowData = GenericRowData.of( > GenericRowData.of(BinaryStringData.fromString("hello"), > BinaryStringData.fromString("world")), > GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456)) > ) > // correct. > foo(normalRowData) > val nullRowData = GenericRowData.of( > null, > GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456)) > ) > /* > Exception in thread "main" java.io.IOException: Failed to deserialize CSV row > ',123;456 > ... > Caused by: java.lang.RuntimeException: Row length mismatch. 2 fields expected > but was 0. > */ > foo(nullRowData) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)