[ 
https://issues.apache.org/jira/browse/FLINK-39401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18083241#comment-18083241
 ] 

Yuepeng Pan commented on FLINK-39401:
-------------------------------------

BP-2.1 Merged into 2.1 via: 93be407de41ac4e9f9ba54d0e3f091ef74fac91b

> Extend raw format to support line-delimiter option
> --------------------------------------------------
>
>                 Key: FLINK-39401
>                 URL: https://issues.apache.org/jira/browse/FLINK-39401
>             Project: Flink
>          Issue Type: Improvement
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 2.4.0
>            Reporter: featzhang
>            Assignee: featzhang
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.4.0
>
>
> Add a new optional configuration option {{raw.line-delimiter}} to the Raw 
> format, enabling deserialization of line-delimited messages into multiple 
> records and appending delimiters during serialization.
> ----
> h2. Motivation
> The Raw format currently treats each incoming message as a single record. 
> This makes it difficult to handle common scenarios where messages contain 
> multiple lines (e.g., newline-separated text) or use a custom delimiter to 
> separate logical records within a single message payload.
> A concrete use case: a Kafka topic where each message contains multiple 
> records separated by {{\\n}}, and users want Flink to split each message into 
> individual rows automatically, without writing custom UDFs or pre-processing 
> logic.
> This feature adds a {{raw.line-delimiter}} option that enables:
> # *Deserialization*: Split incoming byte messages by the configured 
> delimiter, emitting one {{RowData}} per segment.
> # *Serialization*: Append the configured delimiter bytes after each 
> serialized record.
> ----
> h2. Public Interfaces
> A new optional config option is added to {{RawFormatOptions}}:
> {code:java}
> public static final ConfigOption<String> LINE_DELIMITER =
>     ConfigOptions.key("raw.line-delimiter")
>         .stringType()
>         .noDefaultValue()
>         .withDescription(
>             "Optional line delimiter for the raw format. "
>                 + "When set, deserialization splits the input by this 
> delimiter and emits one record per segment. "
>                 + "Serialization appends the delimiter after each record. "
>                 + "Supports Java escape sequences such as '\\n' and 
> '\\r\\n'.");
> {code}
> *Example DDL usage:*
> {code:sql}
> CREATE TABLE source_table (
>   data STRING
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'my-topic',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'format' = 'raw',
>   'raw.line-delimiter' = '\n'
> );
> {code}
> ----
> h2. Proposed Changes
> h3. RawFormatOptions.java
> Add a new {{ConfigOption}}:
> {code:java}
> public static final ConfigOption<String> LINE_DELIMITER =
>     ConfigOptions.key("raw.line-delimiter")
>         .stringType()
>         .noDefaultValue()
>         .withDescription("...");
> {code}
> h3. RawFormatDeserializationSchema.java
> * Add a new constructor parameter {{@Nullable String lineDelimiter}}.
> * Override {{deserialize(byte[] message, Collector<RowData> out)}} to split 
> by delimiter.
> * When {{lineDelimiter == null}}: fall back to original single-record 
> behavior.
> * When {{lineDelimiter}} is set: decode message with the configured charset, 
> split using {{Pattern.quote(lineDelimiter)}}, and collect one {{RowData}} per 
> segment.
> {code:java}
> @Override
> public void deserialize(byte[] message, Collector<RowData> out) throws 
> IOException {
>     if (message == null) {
>         return;
>     }
>     if (lineDelimiter == null) {
>         out.collect(deserialize(message));
>         return;
>     }
>     String decoded = new String(message, Charset.forName(charsetName));
>     String[] parts = decoded.split(Pattern.quote(lineDelimiter), -1);
>     for (String part : parts) {
>         
> out.collect(converter.convert(part.getBytes(Charset.forName(charsetName))));
>     }
> }
> {code}
> h3. RawFormatSerializationSchema.java
> * Add a new constructor parameter {{@Nullable String lineDelimiter}}.
> * In {{serialize()}}, append the delimiter bytes after the serialized value 
> when {{lineDelimiter != null}}.
> {code:java}
> @Override
> public byte[] serialize(RowData row) {
>     byte[] valueBytes = converter.convert(row);
>     if (lineDelimiter == null || valueBytes == null) {
>         return valueBytes;
>     }
>     byte[] delimiterBytes = 
> lineDelimiter.getBytes(Charset.forName(charsetName));
>     byte[] result = Arrays.copyOf(valueBytes, valueBytes.length + 
> delimiterBytes.length);
>     System.arraycopy(delimiterBytes, 0, result, valueBytes.length, 
> delimiterBytes.length);
>     return result;
> }
> {code}
> h3. RawFormatFactory.java
> * Register {{LINE_DELIMITER}} in {{optionalOptions()}}.
> * Read the option and pass it to both schema constructors.
> ----
> h2. Compatibility, Deprecation, and Migration Plan
> * *Fully backward compatible*: {{raw.line-delimiter}} is optional with no 
> default value. Existing jobs without this option continue to behave 
> identically.
> * No deprecation or migration required.
> * The change follows the existing pattern of optional format options in the 
> Raw format (e.g., {{raw.charset}}, {{raw.endianness}}).
> ----
> h2. Test Plan
> A new test class {{RawFormatLineDelimiterTest}} covers the following 
> scenarios:
> || Test Case || Description ||
> | {{testDeserializeWithoutDelimiter_singleRow}} | Backward compatibility: no 
> delimiter, single record emitted |
> | {{testDeserializeWithNewlineDelimiter_multipleRows}} | Split on {{\\n}}, 
> multiple records emitted |
> | {{testDeserializeWithCustomMultiCharDelimiter}} | Split on custom 
> multi-char delimiter {{||}} |
> | {{testDeserializeWithNullMessage_noOutput}} | Null message is handled 
> gracefully (no output) |
> | {{testDeserializeWithGbkCharset}} | Multi-byte charset (GBK) support |
> | {{testSerializeWithoutDelimiter_noAppend}} | Backward compatibility: no 
> delimiter appended |
> | {{testSerializeWithNewlineDelimiter_appendsDelimiter}} | Delimiter {{\\n}} 
> appended after serialized value |
> | {{testSerializeWithCustomDelimiter_appendsDelimiter}} | Custom delimiter 
> {{||}} appended |
> | {{testSerializeNullRow_returnsNull}} | Null row returns null (no delimiter 
> appended) |
> Factory wiring is validated in 
> {{RawFormatFactoryTest#testLineDelimiterOption}}.
> ----
> h2. Rejected Alternatives
> * *Handling trailing empty strings from split*: Java's {{String.split(regex, 
> -1)}} with a negative limit retains trailing empty strings, which may produce 
> unexpected empty records when the input ends with the delimiter. The current 
> implementation uses {{-1}} for correctness; filtering trailing empty segments 
> could be considered as a follow-up improvement.
> * *Defaulting to {{\\n}}*: Making {{\\n}} the default would break backward 
> compatibility for existing users who rely on raw format treating the entire 
> message as a single record.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to