dianfu commented on a change in pull request #16516:
URL: https://github.com/apache/flink/pull/16516#discussion_r671985396



##########
File path: flink-python/pyflink/table/schema.py
##########
@@ -0,0 +1,265 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Union, List
+
+from pyflink.java_gateway import get_gateway
+from pyflink.table import Expression
+from pyflink.table.expression import _get_java_expression
+from pyflink.table.types import DataType, _to_java_data_type
+from pyflink.util.java_utils import to_jarray
+
+__all__ = ['Schema']

Review comment:
       also added in __init__.py?

##########
File path: flink-python/pyflink/table/schema.py
##########
@@ -0,0 +1,265 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Union, List
+
+from pyflink.java_gateway import get_gateway
+from pyflink.table import Expression
+from pyflink.table.expression import _get_java_expression
+from pyflink.table.types import DataType, _to_java_data_type
+from pyflink.util.java_utils import to_jarray
+
+__all__ = ['Schema']
+
+
+class Schema(object):
+    """
+    Schema of a table or view.
+
+    A schema represents the schema part of a {@code CREATE TABLE (schema) WITH 
(options)} DDL
+    statement in SQL. It defines columns of different kind, constraints, time 
attributes, and
+    watermark strategies. It is possible to reference objects (such as 
functions or types) across
+    different catalogs.
+
+    This class is used in the API and catalogs to define an unresolved schema 
that will be
+    translated to ResolvedSchema. Some methods of this class perform basic 
validation, however, the
+    main validation happens during the resolution. Thus, an unresolved schema 
can be incomplete and
+    might be enriched or merged with a different schema at a later stage.
+
+    Since an instance of this class is unresolved, it should not be directly 
persisted. The str()
+    shows only a summary of the contained objects.
+    """
+
+    def __init__(self, j_schema):
+        self._j_schema = j_schema
+
+    @staticmethod
+    def new_builder() -> 'Schema.Builder':
+        gateway = get_gateway()
+        j_builder = gateway.jvm.Schema.newBuilder()
+        return Schema.Builder(j_builder)
+
+    def __str__(self):
+        return self._j_schema.toString()
+
+    def __eq__(self, other):
+        return self.__class__ == other.__class__ and 
self._j_schema.equals(other._j_schema)
+
+    def __hash__(self):
+        return self._j_schema.hashCode()
+
+    class Builder(object):
+        """
+        A builder for constructing an immutable but still unresolved Schema.
+        """
+
+        def __init__(self, j_builder):
+            self._j_builder = j_builder
+
+        def from_schema(self, unresolved_schema: 'Schema') -> 'Schema.Builder':
+            """
+            Adopts all members from the given unresolved schema.
+            """
+            self._j_builder.fromSchema(unresolved_schema)

Review comment:
       ```suggestion
               self._j_builder.fromSchema(unresolved_schema._j_schema)
   ```

##########
File path: flink-python/pyflink/common/config_options.py
##########
@@ -0,0 +1,125 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import TypeVar, Generic
+
+from pyflink.java_gateway import get_gateway
+
+T = TypeVar('T')
+
+__all__ = ['ConfigOptions', 'ConfigOption']
+
+
+class ConfigOptions(object):
+    """
+    {@code ConfigOptions} are used to build a 
:class:`~pyflink.table.ConfigOption`. The option is

Review comment:
       ```suggestion
       {@code ConfigOptions} are used to build a 
:class:`~pyflink.common.ConfigOption`. The option is
   ```

##########
File path: flink-python/pyflink/table/statement_set.py
##########
@@ -52,21 +55,70 @@ def add_insert_sql(self, stmt: str) -> 'StatementSet':
         self._j_statement_set.addInsertSql(stmt)
         return self
 
-    def add_insert(self, target_path: str, table, overwrite: bool = False) -> 
'StatementSet':
+    def add_insert(self,
+                   target_path_or_descriptor: Union[str, TableDescriptor],
+                   table,
+                   overwrite: bool = False) -> 'StatementSet':
         """
-        add Table with the given sink table name to the set.
+        Adds a statement that the pipeline defined by the given Table object 
should be written to a
+        table (backed by a DynamicTableSink) that was registered under the 
specified path or
+        expressed via the given TableDescriptor.
+
+        1. When target_path_or_descriptor is a tale path:
+
+            See the documentation of :func:`~TableEnvironment.use_database` or
+            :func:`~TableEnvironment.use_catalog` for the rules on the path 
resolution.
+
+        2. When target_path_or_descriptor is a table descriptor:
+
+            The given TableDescriptor is registered as an inline (i.e. 
anonymous) temporary catalog
+            table (see :func:`~TableEnvironment.create_temporary_table`).
+
+            Then a statement is added to the statement set that inserts the 
Table object's pipeline
+            into that temporary table.
+
+            This method allows to declare a Schema for the sink descriptor. 
The declaration is
+            similar to a {@code CREATE TABLE} DDL in SQL and allows to:
+
+            <ul>
+                <li>overwrite automatically derived columns with a custom 
DataType
+                <li>add metadata columns next to the physical columns
+                <li>declare a primary key
+            </ul>
+
+            It is possible to declare a schema without physical/regular 
columns. In this case, those
+            columns will be automatically derived and implicitly put at the 
beginning of the schema
+            declaration.
+
+            Examples:
+            ::
+
+                >>>stmt_set = table_env.create_statement_set()

Review comment:
       ```suggestion
                   >>> stmt_set = table_env.create_statement_set()
   ```

##########
File path: flink-python/pyflink/common/config_options.py
##########
@@ -0,0 +1,125 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import TypeVar, Generic
+
+from pyflink.java_gateway import get_gateway
+
+T = TypeVar('T')
+
+__all__ = ['ConfigOptions', 'ConfigOption']

Review comment:
       What about also adding it in __init__.py? otherwise, I'm afraid that it 
will not appear in the Python doc

##########
File path: flink-python/pyflink/table/schema.py
##########
@@ -0,0 +1,265 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Union, List
+
+from pyflink.java_gateway import get_gateway
+from pyflink.table import Expression
+from pyflink.table.expression import _get_java_expression
+from pyflink.table.types import DataType, _to_java_data_type
+from pyflink.util.java_utils import to_jarray
+
+__all__ = ['Schema']
+
+
+class Schema(object):
+    """
+    Schema of a table or view.
+
+    A schema represents the schema part of a {@code CREATE TABLE (schema) WITH 
(options)} DDL
+    statement in SQL. It defines columns of different kind, constraints, time 
attributes, and
+    watermark strategies. It is possible to reference objects (such as 
functions or types) across
+    different catalogs.
+
+    This class is used in the API and catalogs to define an unresolved schema 
that will be
+    translated to ResolvedSchema. Some methods of this class perform basic 
validation, however, the
+    main validation happens during the resolution. Thus, an unresolved schema 
can be incomplete and
+    might be enriched or merged with a different schema at a later stage.
+
+    Since an instance of this class is unresolved, it should not be directly 
persisted. The str()
+    shows only a summary of the contained objects.
+    """
+
+    def __init__(self, j_schema):
+        self._j_schema = j_schema
+
+    @staticmethod
+    def new_builder() -> 'Schema.Builder':
+        gateway = get_gateway()
+        j_builder = gateway.jvm.Schema.newBuilder()
+        return Schema.Builder(j_builder)
+
+    def __str__(self):
+        return self._j_schema.toString()
+
+    def __eq__(self, other):
+        return self.__class__ == other.__class__ and 
self._j_schema.equals(other._j_schema)
+
+    def __hash__(self):
+        return self._j_schema.hashCode()
+
+    class Builder(object):
+        """
+        A builder for constructing an immutable but still unresolved Schema.
+        """
+
+        def __init__(self, j_builder):
+            self._j_builder = j_builder
+
+        def from_schema(self, unresolved_schema: 'Schema') -> 'Schema.Builder':
+            """
+            Adopts all members from the given unresolved schema.
+            """
+            self._j_builder.fromSchema(unresolved_schema)
+            return self
+
+        def from_row_data_type(self, data_type: DataType) -> 'Schema.Builder':
+            """
+            Adopts all fields of the given row as physical columns of the 
schema.
+            """
+            self._j_builder.fromRowDataType(data_type)

Review comment:
       It would be great to improve the test case to also cover this method

##########
File path: flink-python/pyflink/table/schema.py
##########
@@ -0,0 +1,265 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Union, List
+
+from pyflink.java_gateway import get_gateway
+from pyflink.table import Expression
+from pyflink.table.expression import _get_java_expression
+from pyflink.table.types import DataType, _to_java_data_type
+from pyflink.util.java_utils import to_jarray
+
+__all__ = ['Schema']
+
+
+class Schema(object):
+    """
+    Schema of a table or view.
+
+    A schema represents the schema part of a {@code CREATE TABLE (schema) WITH 
(options)} DDL
+    statement in SQL. It defines columns of different kind, constraints, time 
attributes, and
+    watermark strategies. It is possible to reference objects (such as 
functions or types) across
+    different catalogs.
+
+    This class is used in the API and catalogs to define an unresolved schema 
that will be
+    translated to ResolvedSchema. Some methods of this class perform basic 
validation, however, the
+    main validation happens during the resolution. Thus, an unresolved schema 
can be incomplete and
+    might be enriched or merged with a different schema at a later stage.
+
+    Since an instance of this class is unresolved, it should not be directly 
persisted. The str()
+    shows only a summary of the contained objects.
+    """
+
+    def __init__(self, j_schema):
+        self._j_schema = j_schema
+
+    @staticmethod
+    def new_builder() -> 'Schema.Builder':
+        gateway = get_gateway()
+        j_builder = gateway.jvm.Schema.newBuilder()
+        return Schema.Builder(j_builder)
+
+    def __str__(self):
+        return self._j_schema.toString()
+
+    def __eq__(self, other):
+        return self.__class__ == other.__class__ and 
self._j_schema.equals(other._j_schema)
+
+    def __hash__(self):
+        return self._j_schema.hashCode()
+
+    class Builder(object):
+        """
+        A builder for constructing an immutable but still unresolved Schema.
+        """
+
+        def __init__(self, j_builder):
+            self._j_builder = j_builder
+
+        def from_schema(self, unresolved_schema: 'Schema') -> 'Schema.Builder':
+            """
+            Adopts all members from the given unresolved schema.
+            """
+            self._j_builder.fromSchema(unresolved_schema)
+            return self
+
+        def from_row_data_type(self, data_type: DataType) -> 'Schema.Builder':
+            """
+            Adopts all fields of the given row as physical columns of the 
schema.
+            """
+            self._j_builder.fromRowDataType(data_type)

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to