twalthr commented on code in PR #26167:
URL: https://github.com/apache/flink/pull/26167#discussion_r1973372794


##########
flink-python/pyflink/table/catalog.py:
##########
@@ -1391,3 +1391,90 @@ def of(catalog_name: str, configuration: Configuration, 
comment: str = None):
         j_catalog_descriptor = 
gateway.jvm.org.apache.flink.table.catalog.CatalogDescriptor.of(
             catalog_name, configuration._j_configuration, comment)
         return CatalogDescriptor(j_catalog_descriptor)
+
+
+class ObjectIdentifier(object):
+    """
+    Identifies an object in a catalog, including tables, views, function, or 
types.
+    An :class:`ObjectIdentifier` must be fully qualified. It is the 
responsibility of the catalog
+    manager to resolve an :class:`ObjectIdentifier` to an object.
+
+    While Path :class:`ObjectPath` is used within the same catalog, instances 
of this class can be
+    used across catalogs. An :class:`ObjectPath` only describes the name and 
database of an
+    object and so is scoped over a particular catalog, but an 
:class:`ObjectIdentifier` is fully
+    qualified and describes the name, database and catalog of the object.
+
+    Two objects are considered equal if they share the same 
:class:`ObjectIdentifier` in a session
+    context, such as a :class:`~pyflink.table.TableEnvironment`, where 
catalogs (or objects in a
+    catalog) have not been added, deleted or modified.
+    """
+
+    _UNKNOWN = "<UNKNOWN>"
+
+    def __init__(self, j_object_identifier):
+        self._j_object_identifier = j_object_identifier
+
+    def __str__(self):
+        return self._j_object_identifier.toString()
+
+    def __hash__(self):
+        return self._j_object_identifier.hashCode()
+
+    def __eq__(self, other):
+        return isinstance(other, self.__class__) and 
self._j_object_identifier.equals(
+            other._j_object_identifier
+        )
+
+    @staticmethod
+    def of(catalog_name: str, database_name: str, object_name: str) -> 
"ObjectIdentifier":
+        assert catalog_name is not None, "Catalog name must not be null."
+        assert database_name is not None, "Database name must not be null."
+        assert object_name is not None, "Object name must not be null."
+
+        if catalog_name == ObjectIdentifier._UNKNOWN or database_name == 
ObjectIdentifier._UNKNOWN:

Review Comment:
   why do we have to add this logic in Python? Isn't Java doing this for us 
already and raising an error?



##########
flink-python/pyflink/table/table.py:
##########
@@ -1077,6 +1078,89 @@ def explain(self, *extra_details: ExplainDetail) -> str:
         j_extra_details = to_j_explain_detail_arr(extra_details)
         return self._j_table.explain(TEXT, j_extra_details)
 
+    def insert_into(
+        self, table_path_or_descriptor: Union[str, TableDescriptor], 
overwrite: bool = False
+    ) -> TablePipeline:
+        """
+        When ``target_path_or_descriptor`` is a table path:
+
+            Declares that the pipeline defined by the given :class:`Table` 
(backed by a

Review Comment:
   The `(backed by a ...)` belongs after the `written to a table`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to