[
https://issues.apache.org/jira/browse/FLINK-39401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18083241#comment-18083241
]
Yuepeng Pan commented on FLINK-39401:
-------------------------------------
BP-2.1 Merged into 2.1 via: 93be407de41ac4e9f9ba54d0e3f091ef74fac91b
> Extend raw format to support line-delimiter option
> --------------------------------------------------
>
> Key: FLINK-39401
> URL: https://issues.apache.org/jira/browse/FLINK-39401
> Project: Flink
> Issue Type: Improvement
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 2.4.0
> Reporter: featzhang
> Assignee: featzhang
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.4.0
>
>
> Add a new optional configuration option {{raw.line-delimiter}} to the Raw
> format, enabling deserialization of line-delimited messages into multiple
> records and appending delimiters during serialization.
> ----
> h2. Motivation
> The Raw format currently treats each incoming message as a single record.
> This makes it difficult to handle common scenarios where messages contain
> multiple lines (e.g., newline-separated text) or use a custom delimiter to
> separate logical records within a single message payload.
> A concrete use case: a Kafka topic where each message contains multiple
> records separated by {{\\n}}, and users want Flink to split each message into
> individual rows automatically, without writing custom UDFs or pre-processing
> logic.
> This feature adds a {{raw.line-delimiter}} option that enables:
> # *Deserialization*: Split incoming byte messages by the configured
> delimiter, emitting one {{RowData}} per segment.
> # *Serialization*: Append the configured delimiter bytes after each
> serialized record.
> ----
> h2. Public Interfaces
> A new optional config option is added to {{RawFormatOptions}}:
> {code:java}
> public static final ConfigOption<String> LINE_DELIMITER =
> ConfigOptions.key("raw.line-delimiter")
> .stringType()
> .noDefaultValue()
> .withDescription(
> "Optional line delimiter for the raw format. "
> + "When set, deserialization splits the input by this
> delimiter and emits one record per segment. "
> + "Serialization appends the delimiter after each record. "
> + "Supports Java escape sequences such as '\\n' and
> '\\r\\n'.");
> {code}
> *Example DDL usage:*
> {code:sql}
> CREATE TABLE source_table (
> data STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'my-topic',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'format' = 'raw',
> 'raw.line-delimiter' = '\n'
> );
> {code}
> ----
> h2. Proposed Changes
> h3. RawFormatOptions.java
> Add a new {{ConfigOption}}:
> {code:java}
> public static final ConfigOption<String> LINE_DELIMITER =
> ConfigOptions.key("raw.line-delimiter")
> .stringType()
> .noDefaultValue()
> .withDescription("...");
> {code}
> h3. RawFormatDeserializationSchema.java
> * Add a new constructor parameter {{@Nullable String lineDelimiter}}.
> * Override {{deserialize(byte[] message, Collector<RowData> out)}} to split
> by delimiter.
> * When {{lineDelimiter == null}}: fall back to original single-record
> behavior.
> * When {{lineDelimiter}} is set: decode message with the configured charset,
> split using {{Pattern.quote(lineDelimiter)}}, and collect one {{RowData}} per
> segment.
> {code:java}
> @Override
> public void deserialize(byte[] message, Collector<RowData> out) throws
> IOException {
> if (message == null) {
> return;
> }
> if (lineDelimiter == null) {
> out.collect(deserialize(message));
> return;
> }
> String decoded = new String(message, Charset.forName(charsetName));
> String[] parts = decoded.split(Pattern.quote(lineDelimiter), -1);
> for (String part : parts) {
>
> out.collect(converter.convert(part.getBytes(Charset.forName(charsetName))));
> }
> }
> {code}
> h3. RawFormatSerializationSchema.java
> * Add a new constructor parameter {{@Nullable String lineDelimiter}}.
> * In {{serialize()}}, append the delimiter bytes after the serialized value
> when {{lineDelimiter != null}}.
> {code:java}
> @Override
> public byte[] serialize(RowData row) {
> byte[] valueBytes = converter.convert(row);
> if (lineDelimiter == null || valueBytes == null) {
> return valueBytes;
> }
> byte[] delimiterBytes =
> lineDelimiter.getBytes(Charset.forName(charsetName));
> byte[] result = Arrays.copyOf(valueBytes, valueBytes.length +
> delimiterBytes.length);
> System.arraycopy(delimiterBytes, 0, result, valueBytes.length,
> delimiterBytes.length);
> return result;
> }
> {code}
> h3. RawFormatFactory.java
> * Register {{LINE_DELIMITER}} in {{optionalOptions()}}.
> * Read the option and pass it to both schema constructors.
> ----
> h2. Compatibility, Deprecation, and Migration Plan
> * *Fully backward compatible*: {{raw.line-delimiter}} is optional with no
> default value. Existing jobs without this option continue to behave
> identically.
> * No deprecation or migration required.
> * The change follows the existing pattern of optional format options in the
> Raw format (e.g., {{raw.charset}}, {{raw.endianness}}).
> ----
> h2. Test Plan
> A new test class {{RawFormatLineDelimiterTest}} covers the following
> scenarios:
> || Test Case || Description ||
> | {{testDeserializeWithoutDelimiter_singleRow}} | Backward compatibility: no
> delimiter, single record emitted |
> | {{testDeserializeWithNewlineDelimiter_multipleRows}} | Split on {{\\n}},
> multiple records emitted |
> | {{testDeserializeWithCustomMultiCharDelimiter}} | Split on custom
> multi-char delimiter {{||}} |
> | {{testDeserializeWithNullMessage_noOutput}} | Null message is handled
> gracefully (no output) |
> | {{testDeserializeWithGbkCharset}} | Multi-byte charset (GBK) support |
> | {{testSerializeWithoutDelimiter_noAppend}} | Backward compatibility: no
> delimiter appended |
> | {{testSerializeWithNewlineDelimiter_appendsDelimiter}} | Delimiter {{\\n}}
> appended after serialized value |
> | {{testSerializeWithCustomDelimiter_appendsDelimiter}} | Custom delimiter
> {{||}} appended |
> | {{testSerializeNullRow_returnsNull}} | Null row returns null (no delimiter
> appended) |
> Factory wiring is validated in
> {{RawFormatFactoryTest#testLineDelimiterOption}}.
> ----
> h2. Rejected Alternatives
> * *Handling trailing empty strings from split*: Java's {{String.split(regex,
> -1)}} with a negative limit retains trailing empty strings, which may produce
> unexpected empty records when the input ends with the delimiter. The current
> implementation uses {{-1}} for correctness; filtering trailing empty segments
> could be considered as a follow-up improvement.
> * *Defaulting to {{\\n}}*: Making {{\\n}} the default would break backward
> compatibility for existing users who rely on raw format treating the entire
> message as a single record.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)