autophagy commented on code in PR #26167: URL: https://github.com/apache/flink/pull/26167#discussion_r1961499980
########## flink-python/pyflink/table/catalog.py: ########## @@ -1391,3 +1391,87 @@ def of(catalog_name: str, configuration: Configuration, comment: str = None): j_catalog_descriptor = gateway.jvm.org.apache.flink.table.catalog.CatalogDescriptor.of( catalog_name, configuration._j_configuration, comment) return CatalogDescriptor(j_catalog_descriptor) + + +class ObjectIdentifier(object): + """ + Identifies an object in a catalog. It allows to identify objects such as tables, views, + function, or types in a catalog. An identifier must be fully qualified. It is the + responsibility of the catalog manager to resolve an identifier to an object. + + While Path :class:`ObjectPath` is used within the same catalog, instances of this class can be + used across catalogs. + + Two objects are considered equal if they share the same object identifier in a stable session + context. + """ + + _UNKNOWN = "<UNKNOWN>" + + def __init__(self, j_object_identifier): + self._j_object_identifier = j_object_identifier + + def __str__(self): + return self._j_object_identifier.toString() + + def __hash__(self): + return self._j_object_identifier.hashCode() + + def __eq__(self, other): + return isinstance(other, self.__class__) and self._j_object_identifier.equals( + other._j_object_identifier + ) + + @staticmethod + def of(catalog_name: str, database_name: str, object_name: str) -> "ObjectIdentifier": + assert catalog_name is not None, "Catalog name must not be null." + assert database_name is not None, "Database name must not be null." + assert object_name is not None, "Object name must not be null." + + if catalog_name == ObjectIdentifier._UNKNOWN or database_name == ObjectIdentifier._UNKNOWN: + raise ValueError(f"Catalog or database cannot be named {ObjectIdentifier._UNKNOWN}") + else: + gateway = get_gateway() + j_object_identifier = gateway.jvm.org.apache.flink.table.catalog.ObjectIdentifier.of( + catalog_name, database_name, object_name + ) + return ObjectIdentifier(j_object_identifier=j_object_identifier) + + def get_catalog_name(self) -> str: + return self._j_object_identifier.getCatalogName() + + def get_database_name(self) -> str: + return self._j_object_identifier.getDatabaseName() + + def get_object_name(self) -> str: + return self._j_object_identifier.getObjectName() + + def to_object_path(self) -> ObjectPath: + """ + Convert this :class:`ObjectIdentifier` to :class:`ObjectPath`. + + Throws a TableException if the identifier cannot be converted. + """ + j_object_path = self._j_object_identifier.toObjectPath() + return ObjectPath(j_object_path=j_object_path) + + def to_list(self) -> List[str]: + """ + List of the component names of this object identifier. + """ + return self._j_object_identifier.toList() + + def as_serializable_string(self) -> str: + """ + Returns a string that fully serializes this instance. The serialized string can be used for + transmitting or persisting an object identifier. + + Throws a TableException if the identifier cannot be serialized. Review Comment: In fact, digging into this further, there is a handler that maps the Java exception to a Python exception: https://github.com/apache/flink/blob/master/flink-python/pyflink/util/exceptions.py#L126:L127 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org