twalthr commented on a change in pull request #13909: URL: https://github.com/apache/flink/pull/13909#discussion_r518007361
########## File path: docs/dev/table/connectors/formats/index.md ########## @@ -68,5 +68,10 @@ Flink supports the following formats: <td><a href="{% link dev/table/connectors/formats/orc.md %}">Apache ORC</a></td> <td><a href="{% link dev/table/connectors/filesystem.md %}">Filesystem</a></td> </tr> + <tr> + <td><a href="{% link dev/table/connectors/formats/singleValue.md %}">Single Value</a></td> Review comment: According to the offline discussion, call the format `raw` instead. ########## File path: docs/dev/table/connectors/formats/single-field.md ########## @@ -0,0 +1,151 @@ +--- +title: "Single Field Format" +nav-title: SingleField +nav-parent_id: sql-formats +nav-pos: 7 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<span class="label label-info">Format: Serialization Schema</span> +<span class="label label-info">Format: Deserialization Schema</span> + +* This will be replaced by the TOC +{:toc} + +The SingleField format allows to read and write data where the data contains only a single field, and that field is not wrapped within a JSON object, or an Avro record. + +Currently, the SingleField format supports `String`, `byte[]` and primitive type. + +Note: this format encodes `null` values as `null` `byte[]`. This may have limitation when used in `upsert-kafka`, because `upsert-kafka` treats `null` values as a tombstone message (DELETE on the key). Therefore, we recommend avoiding using `upsert-kafka` connector and `single-field` format if the field can have a `null` value. Review comment: ``` we recommend avoiding using `upsert-kafka` connector and the `raw` format as a `value.format` ``` We should emphasize that a key format is fine. ########## File path: docs/dev/table/connectors/formats/single-field.md ########## @@ -0,0 +1,151 @@ +--- +title: "Single Field Format" +nav-title: SingleField +nav-parent_id: sql-formats +nav-pos: 7 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<span class="label label-info">Format: Serialization Schema</span> +<span class="label label-info">Format: Deserialization Schema</span> + +* This will be replaced by the TOC +{:toc} + +The SingleField format allows to read and write data where the data contains only a single field, and that field is not wrapped within a JSON object, or an Avro record. + +Currently, the SingleField format supports `String`, `byte[]` and primitive type. + +Note: this format encodes `null` values as `null` `byte[]`. This may have limitation when used in `upsert-kafka`, because `upsert-kafka` treats `null` values as a tombstone message (DELETE on the key). Therefore, we recommend avoiding using `upsert-kafka` connector and `single-field` format if the field can have a `null` value. + +Example +---------------- + +For example, you may have following raw log data in Kafka and want to read and analyse such data using Flink SQL. + +``` +47.29.201.179 - - [28/Feb/2019:13:17:10 +0000] "GET /?p=1 HTTP/2.0" 200 5316 "https://domain.com/?p=1" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "2.75" +``` + +The following creates a table where it reads from (and writes to) the underlying Kafka topic as an anonymous string value by using `single-field` format: + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +CREATE TABLE nginx_log ( + log STRING +) WITH ( + 'connector' = 'kafka', + 'topic' = 'nginx_log', + 'properties.bootstrap.servers' = 'localhost:9092', + 'properties.group.id' = 'testGroup', + 'format' = 'single-field' +) +{% endhighlight %} +</div> +</div> + +Then you can read out the raw data as a pure string, and split it into multiple fields using user-defined-function for further analysing, e.g. `my_split` in the example. + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +SELECT t.hostname, t.datetime, t.url, t.browser, ... +FROM( + SELECT my_split(log) as t FROM nginx_log +); +{% endhighlight %} +</div> +</div> + +In contrast, you can also write a single field of STRING type into Kafka topic as an anonymous string value. + +Format Options +---------------- + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-center" style="width: 8%">Required</th> + <th class="text-center" style="width: 7%">Default</th> + <th class="text-center" style="width: 10%">Type</th> + <th class="text-center" style="width: 50%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>format</h5></td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Specify what format to use, here should be 'single-field'.</td> + </tr> + </tbody> +</table> + +Data Type Mapping +---------------- + +The table below details the SQL types the format supports, including details of the serializer and deserializer class for encoding and decoding. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left">Flink SQL type</th> + <th class="text-left">Value</th> + </tr> + </thead> + <tbody> + <tr> + <td><code>CHAR / VARCHAR / STRING</code></td> + <td>A UTF-8 encoded text string</td> + </tr> + <tr> + <td><code>BOOLEAN</code></td> + <td>A single byte to indicate boolean value, 0 means false, 1 means true.</td> + </tr> + <tr> + <td><code>TINYINT</code></td> + <td>A 8-bit signed number</td> + </tr> + <tr> + <td><code>SMALLINT</code></td> + <td>A 16-bit signed number</td> + </tr> + <tr> + <td><code>INT</code></td> + <td>A 32-bit signed integer</td> + </tr> + <tr> + <td><code>BIGINT</code></td> + <td>A 64-bit signed integer</td> + </tr> + <tr> + <td><code>FLOAT</code></td> + <td>A 32-bit floating point number</td> + </tr> + <tr> Review comment: let's also support BINARY/VARBINARY/BYTES and the RAW type itself to finalize this story ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/singlefield/SingleFieldSerializationSchema.java ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats.singlefield; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ByteSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.FloatSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.ShortSerializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.RowData.FieldGetter; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.LogicalType; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Objects; + +/** + * Serialization schema that serializes an {@link RowData} object into a single field bytes. + */ +@Internal +public class SingleFieldSerializationSchema implements SerializationSchema<RowData> { + + private static final long serialVersionUID = 1L; + + private final LogicalType fieldType; + + private final SerializationRuntimeConverter converter; + + private final FieldGetter fieldGetter; + + public SingleFieldSerializationSchema(LogicalType fieldType) { + this.fieldType = fieldType; + this.fieldGetter = RowData.createFieldGetter(fieldType, 0); + this.converter = createConverter(fieldType); + } + + @Override + public byte[] serialize(RowData row) { + try { + return converter.convert(fieldGetter.getFieldOrNull(row)); + } catch (IOException e) { + throw new RuntimeException("Could not serialize row '" + row + "'. ", e); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SingleFieldSerializationSchema that = (SingleFieldSerializationSchema) o; + return fieldType.equals(that.fieldType); + } + + @Override + public int hashCode() { + return Objects.hash(fieldType); + } + + // ------------------------------------------------------------------------ + + /** + * Runtime converter that convert an object of internal data structure to byte[]. + */ + @FunctionalInterface + private interface SerializationRuntimeConverter extends Serializable { + byte[] convert(Object value) throws IOException; + } + + /** + * Creates a runtime converter. + */ + private SerializationRuntimeConverter createConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case CHAR: + case VARCHAR: + return value -> { + // put null check logic in the lambda instead of wrapping outside + // to avoid virtual method invoking. + if (value == null) { + return null; + } + return ((StringData) value).toBytes(); + }; + case VARBINARY: + case BINARY: + return value -> { + if (value == null) { + return null; + } + return (byte[]) value; + }; + case TINYINT: + return createConverterUsingSerializer(ByteSerializer.INSTANCE); + case SMALLINT: + return createConverterUsingSerializer(ShortSerializer.INSTANCE); + case INTEGER: + return createConverterUsingSerializer(IntSerializer.INSTANCE); + case BIGINT: + return createConverterUsingSerializer(LongSerializer.INSTANCE); + case FLOAT: + return createConverterUsingSerializer(FloatSerializer.INSTANCE); + case DOUBLE: + return createConverterUsingSerializer(DoubleSerializer.INSTANCE); + case BOOLEAN: + return createConverterUsingSerializer(BooleanSerializer.INSTANCE); + default: + throw new UnsupportedOperationException("'single-format' currently doesn't support type: " + type); + } + } + + @SuppressWarnings("unchecked") + private static SerializationRuntimeConverter createConverterUsingSerializer( + TypeSerializer<?> serializer) { + return new DelegatingSerializationConverter((TypeSerializer<Object>) serializer); + } + + private static final class DelegatingSerializationConverter + implements SerializationRuntimeConverter { + private static final long serialVersionUID = 1L; + private final DataOutputSerializer dos = new DataOutputSerializer(16); + private final TypeSerializer<Object> delegatingSerializer; + + protected DelegatingSerializationConverter(TypeSerializer<Object> delegatingSerializer) { + this.delegatingSerializer = delegatingSerializer; + } + + @Override + public byte[] convert(Object value) throws IOException { + if (value == null) { + return null; + } + delegatingSerializer.serialize(value, dos); + byte[] ret = dos.getCopyOfBuffer(); Review comment: Same comment as before, this looks overly complicated to just convert a a couple of data types to bytes. We should think about doing it manually. ########## File path: flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/formats/singlefield/SingleFieldSerDeSchemaTest.java ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats.singlefield; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.table.api.DataTypes.BIGINT; +import static org.apache.flink.table.api.DataTypes.BINARY; +import static org.apache.flink.table.api.DataTypes.BOOLEAN; +import static org.apache.flink.table.api.DataTypes.BYTES; +import static org.apache.flink.table.api.DataTypes.DOUBLE; +import static org.apache.flink.table.api.DataTypes.FIELD; +import static org.apache.flink.table.api.DataTypes.FLOAT; +import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.ROW; +import static org.apache.flink.table.api.DataTypes.SMALLINT; +import static org.apache.flink.table.api.DataTypes.STRING; +import static org.apache.flink.table.api.DataTypes.TINYINT; +import static org.apache.flink.table.api.DataTypes.VARCHAR; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link SingleFieldDeserializationSchema} {@link SingleFieldSerializationSchema}. + */ +public class SingleFieldSerDeSchemaTest { + + @Test + public void testSerializationAndDeserialization() throws Exception { + for (TestSpec testSpec : testData) { + runTest(testSpec); + } + } + + private void runTest(TestSpec testSpec) throws Exception { + SingleFieldDeserializationSchema deserializationSchema = new SingleFieldDeserializationSchema( + testSpec.type.getLogicalType(), TypeInformation.of(RowData.class)); + SingleFieldSerializationSchema serializationSchema = new SingleFieldSerializationSchema( + testSpec.type.getLogicalType()); + deserializationSchema.open(mock(DeserializationSchema.InitializationContext.class)); + serializationSchema.open(mock(SerializationSchema.InitializationContext.class)); + + Row row = Row.of(testSpec.value); + DataStructureConverter<Object, Object> converter = DataStructureConverters.getConverter( + ROW(FIELD("single", testSpec.type))); + RowData originalRowData = (RowData) converter.toInternal(row); + + byte[] serializedBytes = serializationSchema.serialize(originalRowData); + RowData deserializeRowData = deserializationSchema.deserialize(serializedBytes); + + Row actual = (Row) converter.toExternal(deserializeRowData); + assertEquals(row, actual); + } + + private static List<TestSpec> testData = Arrays.asList( Review comment: Use a JUnit `@Parameterized` instead? ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/singlefield/SingleFieldDeserializationSchema.java ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats.singlefield; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ByteSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.FloatSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.ShortSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.LogicalType; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Deserialization schema from single field data to Flink Table/SQL internal data structure {@link RowData}. + */ +@Internal +public class SingleFieldDeserializationSchema implements DeserializationSchema<RowData> { + + private static final long serialVersionUID = 1L; + + private final LogicalType fieldType; + private final DeserializationRuntimeConverter converter; + private TypeInformation<RowData> typeInfo; Review comment: nit: call this`producedTypeInfo` for consistency final? ########## File path: docs/dev/table/connectors/formats/single-field.md ########## @@ -0,0 +1,151 @@ +--- +title: "Single Field Format" +nav-title: SingleField +nav-parent_id: sql-formats +nav-pos: 7 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<span class="label label-info">Format: Serialization Schema</span> +<span class="label label-info">Format: Deserialization Schema</span> + +* This will be replaced by the TOC +{:toc} + +The SingleField format allows to read and write data where the data contains only a single field, and that field is not wrapped within a JSON object, or an Avro record. + +Currently, the SingleField format supports `String`, `byte[]` and primitive type. + +Note: this format encodes `null` values as `null` `byte[]`. This may have limitation when used in `upsert-kafka`, because `upsert-kafka` treats `null` values as a tombstone message (DELETE on the key). Therefore, we recommend avoiding using `upsert-kafka` connector and `single-field` format if the field can have a `null` value. + +Example +---------------- + +For example, you may have following raw log data in Kafka and want to read and analyse such data using Flink SQL. + +``` +47.29.201.179 - - [28/Feb/2019:13:17:10 +0000] "GET /?p=1 HTTP/2.0" 200 5316 "https://domain.com/?p=1" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "2.75" +``` + +The following creates a table where it reads from (and writes to) the underlying Kafka topic as an anonymous string value by using `single-field` format: + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +CREATE TABLE nginx_log ( + log STRING +) WITH ( + 'connector' = 'kafka', + 'topic' = 'nginx_log', + 'properties.bootstrap.servers' = 'localhost:9092', + 'properties.group.id' = 'testGroup', + 'format' = 'single-field' +) +{% endhighlight %} +</div> +</div> + +Then you can read out the raw data as a pure string, and split it into multiple fields using user-defined-function for further analysing, e.g. `my_split` in the example. + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +SELECT t.hostname, t.datetime, t.url, t.browser, ... +FROM( + SELECT my_split(log) as t FROM nginx_log +); +{% endhighlight %} +</div> +</div> + +In contrast, you can also write a single field of STRING type into Kafka topic as an anonymous string value. + +Format Options +---------------- + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-center" style="width: 8%">Required</th> + <th class="text-center" style="width: 7%">Default</th> + <th class="text-center" style="width: 10%">Type</th> + <th class="text-center" style="width: 50%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>format</h5></td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Specify what format to use, here should be 'single-field'.</td> + </tr> + </tbody> +</table> + +Data Type Mapping +---------------- + +The table below details the SQL types the format supports, including details of the serializer and deserializer class for encoding and decoding. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left">Flink SQL type</th> + <th class="text-left">Value</th> + </tr> + </thead> + <tbody> + <tr> + <td><code>CHAR / VARCHAR / STRING</code></td> + <td>A UTF-8 encoded text string</td> Review comment: We should also allow to specify a different charset. This should be configurable. ########## File path: docs/dev/table/connectors/formats/single-field.md ########## @@ -0,0 +1,151 @@ +--- +title: "Single Field Format" +nav-title: SingleField +nav-parent_id: sql-formats +nav-pos: 7 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<span class="label label-info">Format: Serialization Schema</span> +<span class="label label-info">Format: Deserialization Schema</span> + +* This will be replaced by the TOC +{:toc} + +The SingleField format allows to read and write data where the data contains only a single field, and that field is not wrapped within a JSON object, or an Avro record. + +Currently, the SingleField format supports `String`, `byte[]` and primitive type. + +Note: this format encodes `null` values as `null` `byte[]`. This may have limitation when used in `upsert-kafka`, because `upsert-kafka` treats `null` values as a tombstone message (DELETE on the key). Therefore, we recommend avoiding using `upsert-kafka` connector and `single-field` format if the field can have a `null` value. + +Example +---------------- + +For example, you may have following raw log data in Kafka and want to read and analyse such data using Flink SQL. + +``` +47.29.201.179 - - [28/Feb/2019:13:17:10 +0000] "GET /?p=1 HTTP/2.0" 200 5316 "https://domain.com/?p=1" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "2.75" +``` + +The following creates a table where it reads from (and writes to) the underlying Kafka topic as an anonymous string value by using `single-field` format: + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +CREATE TABLE nginx_log ( + log STRING +) WITH ( + 'connector' = 'kafka', + 'topic' = 'nginx_log', + 'properties.bootstrap.servers' = 'localhost:9092', + 'properties.group.id' = 'testGroup', + 'format' = 'single-field' +) +{% endhighlight %} +</div> +</div> + +Then you can read out the raw data as a pure string, and split it into multiple fields using user-defined-function for further analysing, e.g. `my_split` in the example. + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +SELECT t.hostname, t.datetime, t.url, t.browser, ... +FROM( + SELECT my_split(log) as t FROM nginx_log +); +{% endhighlight %} +</div> +</div> + +In contrast, you can also write a single field of STRING type into Kafka topic as an anonymous string value. + +Format Options +---------------- + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-center" style="width: 8%">Required</th> + <th class="text-center" style="width: 7%">Default</th> + <th class="text-center" style="width: 10%">Type</th> + <th class="text-center" style="width: 50%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>format</h5></td> Review comment: We should also allow to configure the endianness. Every format that encodes to bytes requires this information. ########## File path: docs/dev/table/connectors/formats/single-field.md ########## @@ -0,0 +1,151 @@ +--- +title: "Single Field Format" +nav-title: SingleField +nav-parent_id: sql-formats +nav-pos: 7 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<span class="label label-info">Format: Serialization Schema</span> +<span class="label label-info">Format: Deserialization Schema</span> + +* This will be replaced by the TOC +{:toc} + +The SingleField format allows to read and write data where the data contains only a single field, and that field is not wrapped within a JSON object, or an Avro record. + +Currently, the SingleField format supports `String`, `byte[]` and primitive type. + +Note: this format encodes `null` values as `null` `byte[]`. This may have limitation when used in `upsert-kafka`, because `upsert-kafka` treats `null` values as a tombstone message (DELETE on the key). Therefore, we recommend avoiding using `upsert-kafka` connector and `single-field` format if the field can have a `null` value. Review comment: I don't understand this comment: ``` encodes `null` values as `null` `byte[]` ``` Do you mean: ``` encodes `null` values as `null` of `byte[]` type ``` ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/singlefield/SingleFieldDeserializationSchema.java ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats.singlefield; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ByteSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.FloatSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.ShortSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.LogicalType; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Deserialization schema from single field data to Flink Table/SQL internal data structure {@link RowData}. + */ +@Internal +public class SingleFieldDeserializationSchema implements DeserializationSchema<RowData> { + + private static final long serialVersionUID = 1L; + + private final LogicalType fieldType; Review comment: nit: call this `deserializedType`? ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/singlefield/SingleFieldDeserializationSchema.java ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats.singlefield; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ByteSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.FloatSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.ShortSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.LogicalType; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Deserialization schema from single field data to Flink Table/SQL internal data structure {@link RowData}. + */ +@Internal +public class SingleFieldDeserializationSchema implements DeserializationSchema<RowData> { + + private static final long serialVersionUID = 1L; + + private final LogicalType fieldType; + private final DeserializationRuntimeConverter converter; + private TypeInformation<RowData> typeInfo; + + private transient GenericRowData reuse; + + public SingleFieldDeserializationSchema( + LogicalType fieldType, + TypeInformation<RowData> resultTypeInfo) { + this.fieldType = checkNotNull(fieldType); + this.typeInfo = checkNotNull(resultTypeInfo); + this.converter = createConverter(fieldType); + } + + @Override + public void open(InitializationContext context) throws Exception { + reuse = new GenericRowData(1); + } + + @Override + public RowData deserialize(byte[] message) throws IOException { + reuse.setField(0, converter.convert(message)); + return reuse; + } + + @Override + public boolean isEndOfStream(RowData nextElement) { + return false; + } + + @Override + public TypeInformation<RowData> getProducedType() { + return typeInfo; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SingleFieldDeserializationSchema that = (SingleFieldDeserializationSchema) o; + return typeInfo.equals(that.typeInfo) && fieldType.equals(that.fieldType); + } + + @Override + public int hashCode() { + return Objects.hash(typeInfo, fieldType); + } + + // ------------------------------------------------------------------------ + + /** + * Runtime converter that convert byte[] to internal data structure object. + */ + @FunctionalInterface + private interface DeserializationRuntimeConverter extends Serializable { + Object convert(byte[] data) throws IOException; + } + + /** + * Creates a runtime converter. + */ + private static DeserializationRuntimeConverter createConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case CHAR: + case VARCHAR: + return data -> { + // put null check logic in the lambda instead of wrapping outside + // to avoid virtual method invoking. + if (data == null) { + return null; + } + return StringData.fromBytes(data); + }; + case VARBINARY: + case BINARY: + return data -> data; + case TINYINT: + return createConverterUsingSerializer(ByteSerializer.INSTANCE); + case SMALLINT: + return createConverterUsingSerializer(ShortSerializer.INSTANCE); + case INTEGER: + return createConverterUsingSerializer(IntSerializer.INSTANCE); + case BIGINT: + return createConverterUsingSerializer(LongSerializer.INSTANCE); + case FLOAT: + return createConverterUsingSerializer(FloatSerializer.INSTANCE); + case DOUBLE: + return createConverterUsingSerializer(DoubleSerializer.INSTANCE); + case BOOLEAN: + return createConverterUsingSerializer(BooleanSerializer.INSTANCE); + default: + throw new UnsupportedOperationException("'single-format' currently doesn't support type: " + type); + } + } + + private static DeserializationRuntimeConverter createConverterUsingSerializer( + final TypeSerializer<?> serializer) { + return new DelegatingDeserializationConverter(serializer); + } + + private static final class DelegatingDeserializationConverter + implements DeserializationRuntimeConverter { + private static final long serialVersionUID = 1L; + private final DataInputDeserializer source = new DataInputDeserializer(); + private final TypeSerializer<?> serializer; + + protected DelegatingDeserializationConverter(TypeSerializer<?> serializer) { + this.serializer = serializer; + } + + @Override + public Object convert(byte[] data) throws IOException { + if (data == null) { + return null; + } + source.setBuffer(data); Review comment: Would we have a performance benefit of implementing the deserialization logic ourselves? instead of setting fields and delegating to other classes. ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/singlefield/SingleFieldSerializationSchema.java ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.formats.singlefield; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; +import org.apache.flink.api.common.typeutils.base.ByteSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.FloatSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.ShortSerializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.RowData.FieldGetter; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.LogicalType; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Objects; + +/** + * Serialization schema that serializes an {@link RowData} object into a single field bytes. + */ +@Internal +public class SingleFieldSerializationSchema implements SerializationSchema<RowData> { + + private static final long serialVersionUID = 1L; + + private final LogicalType fieldType; + + private final SerializationRuntimeConverter converter; + + private final FieldGetter fieldGetter; + + public SingleFieldSerializationSchema(LogicalType fieldType) { + this.fieldType = fieldType; + this.fieldGetter = RowData.createFieldGetter(fieldType, 0); + this.converter = createConverter(fieldType); + } + + @Override + public byte[] serialize(RowData row) { + try { + return converter.convert(fieldGetter.getFieldOrNull(row)); + } catch (IOException e) { + throw new RuntimeException("Could not serialize row '" + row + "'. ", e); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SingleFieldSerializationSchema that = (SingleFieldSerializationSchema) o; + return fieldType.equals(that.fieldType); + } + + @Override + public int hashCode() { + return Objects.hash(fieldType); + } + + // ------------------------------------------------------------------------ + + /** + * Runtime converter that convert an object of internal data structure to byte[]. + */ + @FunctionalInterface + private interface SerializationRuntimeConverter extends Serializable { + byte[] convert(Object value) throws IOException; + } + + /** + * Creates a runtime converter. + */ + private SerializationRuntimeConverter createConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case CHAR: + case VARCHAR: + return value -> { + // put null check logic in the lambda instead of wrapping outside + // to avoid virtual method invoking. + if (value == null) { + return null; + } + return ((StringData) value).toBytes(); Review comment: Is this Flink's representation of strings in bytes? I could imagine that most people assume `"string".getBytes()` semantics. Is the string length included in the bytes are as well? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org