autophagy commented on code in PR #26167:
URL: https://github.com/apache/flink/pull/26167#discussion_r1961499980


##########
flink-python/pyflink/table/catalog.py:
##########
@@ -1391,3 +1391,87 @@ def of(catalog_name: str, configuration: Configuration, 
comment: str = None):
         j_catalog_descriptor = 
gateway.jvm.org.apache.flink.table.catalog.CatalogDescriptor.of(
             catalog_name, configuration._j_configuration, comment)
         return CatalogDescriptor(j_catalog_descriptor)
+
+
+class ObjectIdentifier(object):
+    """
+    Identifies an object in a catalog. It allows to identify objects such as 
tables, views,
+    function, or types in a catalog. An identifier must be fully qualified. It 
is the
+    responsibility of the catalog manager to resolve an identifier to an 
object.
+
+    While Path :class:`ObjectPath` is used within the same catalog, instances 
of this class can be
+    used across catalogs.
+
+    Two objects are considered equal if they share the same object identifier 
in a stable session
+    context.
+    """
+
+    _UNKNOWN = "<UNKNOWN>"
+
+    def __init__(self, j_object_identifier):
+        self._j_object_identifier = j_object_identifier
+
+    def __str__(self):
+        return self._j_object_identifier.toString()
+
+    def __hash__(self):
+        return self._j_object_identifier.hashCode()
+
+    def __eq__(self, other):
+        return isinstance(other, self.__class__) and 
self._j_object_identifier.equals(
+            other._j_object_identifier
+        )
+
+    @staticmethod
+    def of(catalog_name: str, database_name: str, object_name: str) -> 
"ObjectIdentifier":
+        assert catalog_name is not None, "Catalog name must not be null."
+        assert database_name is not None, "Database name must not be null."
+        assert object_name is not None, "Object name must not be null."
+
+        if catalog_name == ObjectIdentifier._UNKNOWN or database_name == 
ObjectIdentifier._UNKNOWN:
+            raise ValueError(f"Catalog or database cannot be named 
{ObjectIdentifier._UNKNOWN}")
+        else:
+            gateway = get_gateway()
+            j_object_identifier = 
gateway.jvm.org.apache.flink.table.catalog.ObjectIdentifier.of(
+                catalog_name, database_name, object_name
+            )
+            return ObjectIdentifier(j_object_identifier=j_object_identifier)
+
+    def get_catalog_name(self) -> str:
+        return self._j_object_identifier.getCatalogName()
+
+    def get_database_name(self) -> str:
+        return self._j_object_identifier.getDatabaseName()
+
+    def get_object_name(self) -> str:
+        return self._j_object_identifier.getObjectName()
+
+    def to_object_path(self) -> ObjectPath:
+        """
+        Convert this :class:`ObjectIdentifier` to :class:`ObjectPath`.
+
+        Throws a TableException if the identifier cannot be converted.
+        """
+        j_object_path = self._j_object_identifier.toObjectPath()
+        return ObjectPath(j_object_path=j_object_path)
+
+    def to_list(self) -> List[str]:
+        """
+        List of the component names of this object identifier.
+        """
+        return self._j_object_identifier.toList()
+
+    def as_serializable_string(self) -> str:
+        """
+        Returns a string that fully serializes this instance. The serialized 
string can be used for
+        transmitting or persisting an object identifier.
+
+        Throws a TableException if the identifier cannot be serialized.

Review Comment:
   In fact, digging into this further, there is a handler that maps the Java 
exception to a Python exception: 
https://github.com/apache/flink/blob/master/flink-python/pyflink/util/exceptions.py#L126:L127



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to