twalthr commented on code in PR #26121:
URL: https://github.com/apache/flink/pull/26121#discussion_r1959584163


##########
flink-python/pyflink/table/catalog.py:
##########
@@ -1391,3 +1398,418 @@ def of(catalog_name: str, configuration: Configuration, 
comment: str = None):
         j_catalog_descriptor = 
gateway.jvm.org.apache.flink.table.catalog.CatalogDescriptor.of(
             catalog_name, configuration._j_configuration, comment)
         return CatalogDescriptor(j_catalog_descriptor)
+
+
+class Column(metaclass=ABCMeta):

Review Comment:
   For my Python knowledge, what is `metaclass=ABCMeta`?



##########
flink-python/pyflink/table/catalog.py:
##########
@@ -1391,3 +1398,418 @@ def of(catalog_name: str, configuration: Configuration, 
comment: str = None):
         j_catalog_descriptor = 
gateway.jvm.org.apache.flink.table.catalog.CatalogDescriptor.of(
             catalog_name, configuration._j_configuration, comment)
         return CatalogDescriptor(j_catalog_descriptor)
+
+
+class Column(metaclass=ABCMeta):
+    """
+    Representation of a column in a :class:`~pyflink.table.ResolvedSchema`.
+
+    A table column describes either a :class:`PhysicalColumn`, 
:class:`ComputedColumn`, or
+    :class:`MetadataColumn`.
+    """
+
+    def __init__(self, j_column):
+        self._j_column = j_column
+
+    def __eq__(self, other):
+        return self.__class__ == other.__class__ and 
self._j_column.equals(other._j_column)
+
+    def __hash__(self):
+        return self._j_column.hashCode()
+
+    def __str__(self):
+        return self._j_column.toString()
+
+    @staticmethod
+    def _from_j_column(j_column) -> Optional["Column"]:
+        """
+        Returns a non-abstract column, either a :class:`PhysicalColumn`, a 
:class:`ComputedColumn`,
+        or a :class:`MetadataColumn` from an 
org.apache.flink.table.catalog.Column.
+        """
+        if j_column is None:
+            return None
+        gateway = get_gateway()
+        JColumn = gateway.jvm.org.apache.flink.table.catalog.Column
+        JPhysicalColumn = 
gateway.jvm.org.apache.flink.table.catalog.Column.PhysicalColumn
+        JComputedColumn = 
gateway.jvm.org.apache.flink.table.catalog.Column.ComputedColumn
+        JMetadataColumn = 
gateway.jvm.org.apache.flink.table.catalog.Column.MetadataColumn
+        j_clz = j_column.getClass()
+
+        if not get_java_class(JColumn).isAssignableFrom(j_clz):
+            raise TypeError("The input %s is not an instance of Column." % 
j_column)
+
+        if 
get_java_class(JPhysicalColumn).isAssignableFrom(j_column.getClass()):
+            return PhysicalColumn(j_physical_column=j_column.getClass())
+        elif 
get_java_class(JComputedColumn).isAssignableFrom(j_column.getClass()):
+            return MetaDataColumn(j_metadata_column=j_column.getClass())
+        elif 
get_java_class(JMetadataColumn).isAssignableFrom(j_column.getClass()):
+            return MetaDataColumn(j_metadata_column=j_column.getClass())
+        else:
+            return None
+
+    @staticmethod
+    def physical(name: str, data_type: DataType) -> "PhysicalColumn":
+        """
+        Creates a regular table column that represents physical data.
+        """
+        gateway = get_gateway()
+        j_data_type = _to_java_data_type(data_type)
+        j_physical_column = 
gateway.jvm.org.apache.flink.table.catalog.Column.physical(
+            name, j_data_type
+        )
+        return PhysicalColumn(j_physical_column)
+
+    @staticmethod
+    def computed(name: str, resolved_expression: ResolvedExpression) -> 
"ComputedColumn":
+        """
+        Creates a computed column that is computed from the given
+        :class:`~pyflink.table.ResolvedExpression`.
+        """
+        gateway = get_gateway()
+        j_resolved_expression = resolved_expression
+        j_computed_column = 
gateway.jvm.org.apache.flink.table.catalog.Column.computed(
+            name, j_resolved_expression
+        )
+        return ComputedColumn(j_computed_column)
+
+    @staticmethod
+    def metadata(
+        name: str, data_type: DataType, metadata_key: Optional[str], 
is_virtual: bool
+    ) -> "MetaDataColumn":
+        """
+        Creates a metadata column from metadata of the given column name or 
from metadata of the
+        given key (if not null).
+
+        Allows to specify whether the column is virtual or not.
+        """
+        gateway = get_gateway()
+        j_data_type = _to_java_data_type(data_type)
+        j_metadata_column = 
gateway.jvm.org.apache.flink.table.catalog.Column.metadata(
+            name, j_data_type, metadata_key, is_virtual
+        )
+        return MetaDataColumn(j_metadata_column)
+
+    @abstractmethod
+    def with_comment(self, comment: Optional[str]):
+        """
+        Add the comment to the column and return the new object.
+        """
+        pass
+
+    @abstractmethod
+    def is_physical(self) -> bool:
+        """
+        Returns whether the given column is a physical column of a table; 
neither computed nor
+        metadata.
+        """
+        pass
+
+    @abstractmethod
+    def is_persisted(self) -> bool:
+        """
+        Returns whether the given column is persisted in a sink operation.
+        """
+        pass
+
+    def get_data_type(self) -> DataType:
+        """
+        Returns the data type of this column.
+        """
+        j_data_type = self._j_column.getDataType()
+        return DataType(_from_java_data_type(j_data_type))
+
+    def get_name(self):
+        """
+        Returns the name of this column.
+        """
+        return self._j_column.getName()
+
+    def get_comment(self) -> Optional[str]:
+        """
+        Returns the comment of this column.
+        """
+        optional_result = self._j_column.getComment()
+        return optional_result.get() if optional_result.isPresent() else None
+
+    def as_summary_string(self) -> str:
+        """
+        Returns a string that summarizes this column for printing to a console.
+        """
+        return self._j_column.asSummaryString()
+
+    @abstractmethod
+    def explain_extras(self) -> Optional[str]:
+        """
+        Returns an explanation of specific column extras next to name and type.
+        """
+        pass
+
+    @abstractmethod
+    def copy(self, new_type: DataType) -> "Column":
+        """
+        Returns a copy of the column with a replaced 
:class:`~pyflink.table.types.DataType`.
+        """
+        pass
+
+    @abstractmethod
+    def rename(self, new_name: str) -> "Column":
+        """
+        Returns a copy of the column with a replaced name.
+        """
+        pass
+
+
+class PhysicalColumn(Column):
+    """
+    Representation of a physical columns.

Review Comment:
   ```suggestion
       Representation of a physical column.
   ```



##########
flink-python/pyflink/table/table.py:
##########
@@ -965,6 +966,14 @@ def get_schema(self) -> TableSchema:
         """
         return TableSchema(j_table_schema=self._j_table.getSchema())
 
+    def get_resolved_schema(self) -> ResolvedSchema:

Review Comment:
   deprecate the old `get_schema` and class



##########
flink-python/pyflink/table/tests/test_catalog_completeness.py:
##########
@@ -160,6 +161,51 @@ def java_class(cls):
         return "org.apache.flink.table.catalog.stats.CatalogColumnStatistics"
 
 
+class ColumnAPICompletenessTests(PythonAPICompletenessTestCase, 
PyFlinkTestCase):
+    """
+    Tests whether the Python :class:`Column` is consistent with
+    Java `org.apache.flink.table.catalog.Column`.
+    """
+
+    @classmethod
+    def python_class(cls):
+        return Column
+
+    @classmethod
+    def java_class(cls):
+        return "org.apache.flink.table.catalog.Column"
+
+
+class WatermarkSpecAPICompletenessTests(PythonAPICompletenessTestCase, 
PyFlinkTestCase):
+    """
+    Tests whether the Python :class:`WatermarkSpec` is consistent with
+    Java `org.apache.flink.table.catalog.WatermarkSpec`.
+    """
+
+    @classmethod
+    def python_class(cls):
+        return WatermarkSpec
+
+    @classmethod
+    def java_class(cls):
+        return "org.apache.flink.table.catalog.WatermarkSpec"
+
+
+class ConstraintAPICompletenessTests(PythonAPICompletenessTestCase, 
PyFlinkTestCase):

Review Comment:
   Add a completeness test for all classes added in this PR. I think resolved 
schema, resolved expression is missing and others?



##########
flink-python/pyflink/table/table_result.py:
##########
@@ -75,9 +75,9 @@ def wait(self, timeout_ms: int = None):
         else:
             get_method(self._j_table_result, "await")()
 
-    def get_table_schema(self) -> TableSchema:
+    def get_resolved_schema(self) -> ResolvedSchema:

Review Comment:
   we should not replace method but keep them around for 1-2 Flink versions. We 
should offer both and deprecate the old one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to