twalthr commented on a change in pull request #13909:
URL: https://github.com/apache/flink/pull/13909#discussion_r518007361



##########
File path: docs/dev/table/connectors/formats/index.md
##########
@@ -68,5 +68,10 @@ Flink supports the following formats:
          <td><a href="{% link dev/table/connectors/formats/orc.md %}">Apache 
ORC</a></td>
          <td><a href="{% link dev/table/connectors/filesystem.md 
%}">Filesystem</a></td>
         </tr>
+        <tr>
+        <td><a href="{% link dev/table/connectors/formats/singleValue.md 
%}">Single Value</a></td>

Review comment:
       According to the offline discussion, call the format `raw` instead.

##########
File path: docs/dev/table/connectors/formats/single-field.md
##########
@@ -0,0 +1,151 @@
+---
+title: "Single Field Format"
+nav-title: SingleField
+nav-parent_id: sql-formats
+nav-pos: 7
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-info">Format: Serialization Schema</span>
+<span class="label label-info">Format: Deserialization Schema</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The SingleField format allows to read and write data where the data contains 
only a single field, and that field is not wrapped within a JSON object, or an 
Avro record.
+
+Currently, the SingleField format supports `String`, `byte[]` and primitive 
type.
+
+Note: this format encodes `null` values as `null` `byte[]`. This may have 
limitation when used in `upsert-kafka`, because `upsert-kafka` treats `null` 
values as a tombstone message (DELETE on the key). Therefore, we recommend 
avoiding using `upsert-kafka` connector and `single-field` format if the field 
can have a `null` value.

Review comment:
       ```
   we recommend avoiding using `upsert-kafka` connector and the `raw` format as 
a `value.format`
   ```
   
   We should emphasize that a key format is fine.

##########
File path: docs/dev/table/connectors/formats/single-field.md
##########
@@ -0,0 +1,151 @@
+---
+title: "Single Field Format"
+nav-title: SingleField
+nav-parent_id: sql-formats
+nav-pos: 7
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-info">Format: Serialization Schema</span>
+<span class="label label-info">Format: Deserialization Schema</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The SingleField format allows to read and write data where the data contains 
only a single field, and that field is not wrapped within a JSON object, or an 
Avro record.
+
+Currently, the SingleField format supports `String`, `byte[]` and primitive 
type.
+
+Note: this format encodes `null` values as `null` `byte[]`. This may have 
limitation when used in `upsert-kafka`, because `upsert-kafka` treats `null` 
values as a tombstone message (DELETE on the key). Therefore, we recommend 
avoiding using `upsert-kafka` connector and `single-field` format if the field 
can have a `null` value.
+
+Example
+----------------
+
+For example, you may have following raw log data in Kafka and want to read and 
analyse such data using Flink SQL.
+
+```
+47.29.201.179 - - [28/Feb/2019:13:17:10 +0000] "GET /?p=1 HTTP/2.0" 200 5316 
"https://domain.com/?p=1"; "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 
(KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "2.75"
+```
+
+The following creates a table where it reads from (and writes to) the 
underlying Kafka topic as an anonymous string value by using `single-field` 
format:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE nginx_log (
+  log STRING
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'nginx_log',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'properties.group.id' = 'testGroup',
+  'format' = 'single-field'
+)
+{% endhighlight %}
+</div>
+</div>
+
+Then you can read out the raw data as a pure string, and split it into 
multiple fields using user-defined-function for further analysing, e.g. 
`my_split` in the example.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+SELECT t.hostname, t.datetime, t.url, t.browser, ...
+FROM(
+  SELECT my_split(log) as t FROM nginx_log
+);
+{% endhighlight %}
+</div>
+</div>
+
+In contrast, you can also write a single field of STRING type into Kafka topic 
as an anonymous string value.
+
+Format Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 50%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what format to use, here should be 'single-field'.</td>
+    </tr>
+    </tbody>
+</table>
+
+Data Type Mapping
+----------------
+
+The table below details the SQL types the format supports, including details 
of the serializer and deserializer class for encoding and decoding.
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left">Flink SQL type</th>
+        <th class="text-left">Value</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code>CHAR / VARCHAR / STRING</code></td>
+      <td>A UTF-8 encoded text string</td>
+    </tr>
+    <tr>
+      <td><code>BOOLEAN</code></td>
+      <td>A single byte to indicate boolean value, 0 means false, 1 means 
true.</td>
+    </tr>
+    <tr>
+      <td><code>TINYINT</code></td>
+      <td>A 8-bit signed number</td>
+    </tr>
+    <tr>
+      <td><code>SMALLINT</code></td>
+      <td>A 16-bit signed number</td>
+    </tr>
+    <tr>
+      <td><code>INT</code></td>
+      <td>A 32-bit signed integer</td>
+    </tr>
+    <tr>
+      <td><code>BIGINT</code></td>
+      <td>A 64-bit signed integer</td>
+    </tr>
+    <tr>
+      <td><code>FLOAT</code></td>
+      <td>A 32-bit floating point number</td>
+    </tr>
+    <tr>

Review comment:
       let's also support BINARY/VARBINARY/BYTES and the RAW type itself to 
finalize this story

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/singlefield/SingleFieldSerializationSchema.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.singlefield;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.ShortSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Serialization schema that serializes an {@link RowData} object into a 
single field bytes.
+ */
+@Internal
+public class SingleFieldSerializationSchema implements 
SerializationSchema<RowData> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final LogicalType fieldType;
+
+       private final SerializationRuntimeConverter converter;
+
+       private final FieldGetter fieldGetter;
+
+       public SingleFieldSerializationSchema(LogicalType fieldType) {
+               this.fieldType = fieldType;
+               this.fieldGetter = RowData.createFieldGetter(fieldType, 0);
+               this.converter = createConverter(fieldType);
+       }
+
+       @Override
+       public byte[] serialize(RowData row) {
+               try {
+                       return 
converter.convert(fieldGetter.getFieldOrNull(row));
+               } catch (IOException e) {
+                       throw new RuntimeException("Could not serialize row '" 
+ row + "'. ", e);
+               }
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               SingleFieldSerializationSchema that = 
(SingleFieldSerializationSchema) o;
+               return fieldType.equals(that.fieldType);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(fieldType);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Runtime converter that convert an object of internal data structure 
to byte[].
+        */
+       @FunctionalInterface
+       private interface SerializationRuntimeConverter extends Serializable {
+               byte[] convert(Object value) throws IOException;
+       }
+
+       /**
+        * Creates a runtime converter.
+        */
+       private SerializationRuntimeConverter createConverter(LogicalType type) 
{
+               switch (type.getTypeRoot()) {
+                       case CHAR:
+                       case VARCHAR:
+                               return value -> {
+                                       // put null check logic in the lambda 
instead of wrapping outside
+                                       // to avoid virtual method invoking.
+                                       if (value == null) {
+                                               return null;
+                                       }
+                                       return ((StringData) value).toBytes();
+                               };
+                       case VARBINARY:
+                       case BINARY:
+                               return value -> {
+                                       if (value == null) {
+                                               return null;
+                                       }
+                                       return (byte[]) value;
+                               };
+                       case TINYINT:
+                               return 
createConverterUsingSerializer(ByteSerializer.INSTANCE);
+                       case SMALLINT:
+                               return 
createConverterUsingSerializer(ShortSerializer.INSTANCE);
+                       case INTEGER:
+                               return 
createConverterUsingSerializer(IntSerializer.INSTANCE);
+                       case BIGINT:
+                               return 
createConverterUsingSerializer(LongSerializer.INSTANCE);
+                       case FLOAT:
+                               return 
createConverterUsingSerializer(FloatSerializer.INSTANCE);
+                       case DOUBLE:
+                               return 
createConverterUsingSerializer(DoubleSerializer.INSTANCE);
+                       case BOOLEAN:
+                               return 
createConverterUsingSerializer(BooleanSerializer.INSTANCE);
+                       default:
+                               throw new 
UnsupportedOperationException("'single-format' currently doesn't support type: 
" + type);
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       private static SerializationRuntimeConverter 
createConverterUsingSerializer(
+                       TypeSerializer<?> serializer) {
+               return new 
DelegatingSerializationConverter((TypeSerializer<Object>) serializer);
+       }
+
+       private static final class DelegatingSerializationConverter
+               implements SerializationRuntimeConverter {
+               private static final long serialVersionUID = 1L;
+               private final DataOutputSerializer dos = new 
DataOutputSerializer(16);
+               private final TypeSerializer<Object> delegatingSerializer;
+
+               protected 
DelegatingSerializationConverter(TypeSerializer<Object> delegatingSerializer) {
+                       this.delegatingSerializer = delegatingSerializer;
+               }
+
+               @Override
+               public byte[] convert(Object value) throws IOException {
+                       if (value == null) {
+                               return null;
+                       }
+                       delegatingSerializer.serialize(value, dos);
+                       byte[] ret = dos.getCopyOfBuffer();

Review comment:
       Same comment as before, this looks overly complicated to just convert a 
a couple of data types to bytes. We should think about doing it manually.

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/formats/singlefield/SingleFieldSerDeSchemaTest.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.singlefield;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link SingleFieldDeserializationSchema} {@link 
SingleFieldSerializationSchema}.
+ */
+public class SingleFieldSerDeSchemaTest {
+
+       @Test
+       public void testSerializationAndDeserialization() throws Exception {
+               for (TestSpec testSpec : testData) {
+                       runTest(testSpec);
+               }
+       }
+
+       private void runTest(TestSpec testSpec) throws Exception {
+               SingleFieldDeserializationSchema deserializationSchema = new 
SingleFieldDeserializationSchema(
+                       testSpec.type.getLogicalType(), 
TypeInformation.of(RowData.class));
+               SingleFieldSerializationSchema serializationSchema = new 
SingleFieldSerializationSchema(
+                       testSpec.type.getLogicalType());
+               
deserializationSchema.open(mock(DeserializationSchema.InitializationContext.class));
+               
serializationSchema.open(mock(SerializationSchema.InitializationContext.class));
+
+               Row row = Row.of(testSpec.value);
+               DataStructureConverter<Object, Object> converter = 
DataStructureConverters.getConverter(
+                       ROW(FIELD("single", testSpec.type)));
+               RowData originalRowData = (RowData) converter.toInternal(row);
+
+               byte[] serializedBytes = 
serializationSchema.serialize(originalRowData);
+               RowData deserializeRowData = 
deserializationSchema.deserialize(serializedBytes);
+
+               Row actual = (Row) converter.toExternal(deserializeRowData);
+               assertEquals(row, actual);
+       }
+
+       private static List<TestSpec> testData = Arrays.asList(

Review comment:
       Use a JUnit `@Parameterized` instead?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/singlefield/SingleFieldDeserializationSchema.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.singlefield;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.ShortSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deserialization schema from single field data to Flink Table/SQL internal 
data structure {@link RowData}.
+ */
+@Internal
+public class SingleFieldDeserializationSchema implements 
DeserializationSchema<RowData> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final LogicalType fieldType;
+       private final DeserializationRuntimeConverter converter;
+       private TypeInformation<RowData> typeInfo;

Review comment:
       nit: call this`producedTypeInfo` for consistency
   
   final?

##########
File path: docs/dev/table/connectors/formats/single-field.md
##########
@@ -0,0 +1,151 @@
+---
+title: "Single Field Format"
+nav-title: SingleField
+nav-parent_id: sql-formats
+nav-pos: 7
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-info">Format: Serialization Schema</span>
+<span class="label label-info">Format: Deserialization Schema</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The SingleField format allows to read and write data where the data contains 
only a single field, and that field is not wrapped within a JSON object, or an 
Avro record.
+
+Currently, the SingleField format supports `String`, `byte[]` and primitive 
type.
+
+Note: this format encodes `null` values as `null` `byte[]`. This may have 
limitation when used in `upsert-kafka`, because `upsert-kafka` treats `null` 
values as a tombstone message (DELETE on the key). Therefore, we recommend 
avoiding using `upsert-kafka` connector and `single-field` format if the field 
can have a `null` value.
+
+Example
+----------------
+
+For example, you may have following raw log data in Kafka and want to read and 
analyse such data using Flink SQL.
+
+```
+47.29.201.179 - - [28/Feb/2019:13:17:10 +0000] "GET /?p=1 HTTP/2.0" 200 5316 
"https://domain.com/?p=1"; "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 
(KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "2.75"
+```
+
+The following creates a table where it reads from (and writes to) the 
underlying Kafka topic as an anonymous string value by using `single-field` 
format:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE nginx_log (
+  log STRING
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'nginx_log',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'properties.group.id' = 'testGroup',
+  'format' = 'single-field'
+)
+{% endhighlight %}
+</div>
+</div>
+
+Then you can read out the raw data as a pure string, and split it into 
multiple fields using user-defined-function for further analysing, e.g. 
`my_split` in the example.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+SELECT t.hostname, t.datetime, t.url, t.browser, ...
+FROM(
+  SELECT my_split(log) as t FROM nginx_log
+);
+{% endhighlight %}
+</div>
+</div>
+
+In contrast, you can also write a single field of STRING type into Kafka topic 
as an anonymous string value.
+
+Format Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 50%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what format to use, here should be 'single-field'.</td>
+    </tr>
+    </tbody>
+</table>
+
+Data Type Mapping
+----------------
+
+The table below details the SQL types the format supports, including details 
of the serializer and deserializer class for encoding and decoding.
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left">Flink SQL type</th>
+        <th class="text-left">Value</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code>CHAR / VARCHAR / STRING</code></td>
+      <td>A UTF-8 encoded text string</td>

Review comment:
       We should also allow to specify a different charset. This should be 
configurable.

##########
File path: docs/dev/table/connectors/formats/single-field.md
##########
@@ -0,0 +1,151 @@
+---
+title: "Single Field Format"
+nav-title: SingleField
+nav-parent_id: sql-formats
+nav-pos: 7
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-info">Format: Serialization Schema</span>
+<span class="label label-info">Format: Deserialization Schema</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The SingleField format allows to read and write data where the data contains 
only a single field, and that field is not wrapped within a JSON object, or an 
Avro record.
+
+Currently, the SingleField format supports `String`, `byte[]` and primitive 
type.
+
+Note: this format encodes `null` values as `null` `byte[]`. This may have 
limitation when used in `upsert-kafka`, because `upsert-kafka` treats `null` 
values as a tombstone message (DELETE on the key). Therefore, we recommend 
avoiding using `upsert-kafka` connector and `single-field` format if the field 
can have a `null` value.
+
+Example
+----------------
+
+For example, you may have following raw log data in Kafka and want to read and 
analyse such data using Flink SQL.
+
+```
+47.29.201.179 - - [28/Feb/2019:13:17:10 +0000] "GET /?p=1 HTTP/2.0" 200 5316 
"https://domain.com/?p=1"; "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 
(KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "2.75"
+```
+
+The following creates a table where it reads from (and writes to) the 
underlying Kafka topic as an anonymous string value by using `single-field` 
format:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE nginx_log (
+  log STRING
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'nginx_log',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'properties.group.id' = 'testGroup',
+  'format' = 'single-field'
+)
+{% endhighlight %}
+</div>
+</div>
+
+Then you can read out the raw data as a pure string, and split it into 
multiple fields using user-defined-function for further analysing, e.g. 
`my_split` in the example.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+SELECT t.hostname, t.datetime, t.url, t.browser, ...
+FROM(
+  SELECT my_split(log) as t FROM nginx_log
+);
+{% endhighlight %}
+</div>
+</div>
+
+In contrast, you can also write a single field of STRING type into Kafka topic 
as an anonymous string value.
+
+Format Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 50%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>format</h5></td>

Review comment:
       We should also allow to configure the endianness. Every format that 
encodes to bytes requires this information.

##########
File path: docs/dev/table/connectors/formats/single-field.md
##########
@@ -0,0 +1,151 @@
+---
+title: "Single Field Format"
+nav-title: SingleField
+nav-parent_id: sql-formats
+nav-pos: 7
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-info">Format: Serialization Schema</span>
+<span class="label label-info">Format: Deserialization Schema</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The SingleField format allows to read and write data where the data contains 
only a single field, and that field is not wrapped within a JSON object, or an 
Avro record.
+
+Currently, the SingleField format supports `String`, `byte[]` and primitive 
type.
+
+Note: this format encodes `null` values as `null` `byte[]`. This may have 
limitation when used in `upsert-kafka`, because `upsert-kafka` treats `null` 
values as a tombstone message (DELETE on the key). Therefore, we recommend 
avoiding using `upsert-kafka` connector and `single-field` format if the field 
can have a `null` value.

Review comment:
       I don't understand this comment:
   ```
   encodes `null` values as `null` `byte[]`
   ```
   
   Do you mean:
   ```
   encodes `null` values as `null` of `byte[]` type
   ```

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/singlefield/SingleFieldDeserializationSchema.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.singlefield;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.ShortSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deserialization schema from single field data to Flink Table/SQL internal 
data structure {@link RowData}.
+ */
+@Internal
+public class SingleFieldDeserializationSchema implements 
DeserializationSchema<RowData> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final LogicalType fieldType;

Review comment:
       nit: call this `deserializedType`?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/singlefield/SingleFieldDeserializationSchema.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.singlefield;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.ShortSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deserialization schema from single field data to Flink Table/SQL internal 
data structure {@link RowData}.
+ */
+@Internal
+public class SingleFieldDeserializationSchema implements 
DeserializationSchema<RowData> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final LogicalType fieldType;
+       private final DeserializationRuntimeConverter converter;
+       private TypeInformation<RowData> typeInfo;
+
+       private transient GenericRowData reuse;
+
+       public SingleFieldDeserializationSchema(
+                       LogicalType fieldType,
+                       TypeInformation<RowData> resultTypeInfo) {
+               this.fieldType = checkNotNull(fieldType);
+               this.typeInfo = checkNotNull(resultTypeInfo);
+               this.converter = createConverter(fieldType);
+       }
+
+       @Override
+       public void open(InitializationContext context) throws Exception {
+               reuse = new GenericRowData(1);
+       }
+
+       @Override
+       public RowData deserialize(byte[] message) throws IOException {
+               reuse.setField(0, converter.convert(message));
+               return reuse;
+       }
+
+       @Override
+       public boolean isEndOfStream(RowData nextElement) {
+               return false;
+       }
+
+       @Override
+       public TypeInformation<RowData> getProducedType() {
+               return typeInfo;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               SingleFieldDeserializationSchema that = 
(SingleFieldDeserializationSchema) o;
+               return typeInfo.equals(that.typeInfo) && 
fieldType.equals(that.fieldType);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(typeInfo, fieldType);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Runtime converter that convert byte[] to internal data structure 
object.
+        */
+       @FunctionalInterface
+       private interface DeserializationRuntimeConverter extends Serializable {
+               Object convert(byte[] data) throws IOException;
+       }
+
+       /**
+        * Creates a runtime converter.
+        */
+       private static DeserializationRuntimeConverter 
createConverter(LogicalType type) {
+               switch (type.getTypeRoot()) {
+                       case CHAR:
+                       case VARCHAR:
+                               return data -> {
+                                       // put null check logic in the lambda 
instead of wrapping outside
+                                       // to avoid virtual method invoking.
+                                       if (data == null) {
+                                               return null;
+                                       }
+                                       return StringData.fromBytes(data);
+                               };
+                       case VARBINARY:
+                       case BINARY:
+                               return data -> data;
+                       case TINYINT:
+                               return 
createConverterUsingSerializer(ByteSerializer.INSTANCE);
+                       case SMALLINT:
+                               return 
createConverterUsingSerializer(ShortSerializer.INSTANCE);
+                       case INTEGER:
+                               return 
createConverterUsingSerializer(IntSerializer.INSTANCE);
+                       case BIGINT:
+                               return 
createConverterUsingSerializer(LongSerializer.INSTANCE);
+                       case FLOAT:
+                               return 
createConverterUsingSerializer(FloatSerializer.INSTANCE);
+                       case DOUBLE:
+                               return 
createConverterUsingSerializer(DoubleSerializer.INSTANCE);
+                       case BOOLEAN:
+                               return 
createConverterUsingSerializer(BooleanSerializer.INSTANCE);
+                       default:
+                               throw new 
UnsupportedOperationException("'single-format' currently doesn't support type: 
" + type);
+               }
+       }
+
+       private static DeserializationRuntimeConverter 
createConverterUsingSerializer(
+                       final TypeSerializer<?> serializer) {
+               return new DelegatingDeserializationConverter(serializer);
+       }
+
+       private static final class DelegatingDeserializationConverter
+               implements DeserializationRuntimeConverter {
+               private static final long serialVersionUID = 1L;
+               private final DataInputDeserializer source = new 
DataInputDeserializer();
+               private final TypeSerializer<?> serializer;
+
+               protected DelegatingDeserializationConverter(TypeSerializer<?> 
serializer) {
+                       this.serializer = serializer;
+               }
+
+               @Override
+               public Object convert(byte[] data) throws IOException {
+                       if (data == null) {
+                               return null;
+                       }
+                       source.setBuffer(data);

Review comment:
       Would we have a performance benefit of implementing the deserialization 
logic ourselves? instead of setting fields and delegating to other classes.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/singlefield/SingleFieldSerializationSchema.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.singlefield;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.ShortSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Serialization schema that serializes an {@link RowData} object into a 
single field bytes.
+ */
+@Internal
+public class SingleFieldSerializationSchema implements 
SerializationSchema<RowData> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final LogicalType fieldType;
+
+       private final SerializationRuntimeConverter converter;
+
+       private final FieldGetter fieldGetter;
+
+       public SingleFieldSerializationSchema(LogicalType fieldType) {
+               this.fieldType = fieldType;
+               this.fieldGetter = RowData.createFieldGetter(fieldType, 0);
+               this.converter = createConverter(fieldType);
+       }
+
+       @Override
+       public byte[] serialize(RowData row) {
+               try {
+                       return 
converter.convert(fieldGetter.getFieldOrNull(row));
+               } catch (IOException e) {
+                       throw new RuntimeException("Could not serialize row '" 
+ row + "'. ", e);
+               }
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               SingleFieldSerializationSchema that = 
(SingleFieldSerializationSchema) o;
+               return fieldType.equals(that.fieldType);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(fieldType);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Runtime converter that convert an object of internal data structure 
to byte[].
+        */
+       @FunctionalInterface
+       private interface SerializationRuntimeConverter extends Serializable {
+               byte[] convert(Object value) throws IOException;
+       }
+
+       /**
+        * Creates a runtime converter.
+        */
+       private SerializationRuntimeConverter createConverter(LogicalType type) 
{
+               switch (type.getTypeRoot()) {
+                       case CHAR:
+                       case VARCHAR:
+                               return value -> {
+                                       // put null check logic in the lambda 
instead of wrapping outside
+                                       // to avoid virtual method invoking.
+                                       if (value == null) {
+                                               return null;
+                                       }
+                                       return ((StringData) value).toBytes();

Review comment:
       Is this Flink's representation of strings in bytes? I could imagine that 
most people assume `"string".getBytes()` semantics. Is the string length 
included in the bytes are as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to