twalthr commented on code in PR #26167: URL: https://github.com/apache/flink/pull/26167#discussion_r1973372794
########## flink-python/pyflink/table/catalog.py: ########## @@ -1391,3 +1391,90 @@ def of(catalog_name: str, configuration: Configuration, comment: str = None): j_catalog_descriptor = gateway.jvm.org.apache.flink.table.catalog.CatalogDescriptor.of( catalog_name, configuration._j_configuration, comment) return CatalogDescriptor(j_catalog_descriptor) + + +class ObjectIdentifier(object): + """ + Identifies an object in a catalog, including tables, views, function, or types. + An :class:`ObjectIdentifier` must be fully qualified. It is the responsibility of the catalog + manager to resolve an :class:`ObjectIdentifier` to an object. + + While Path :class:`ObjectPath` is used within the same catalog, instances of this class can be + used across catalogs. An :class:`ObjectPath` only describes the name and database of an + object and so is scoped over a particular catalog, but an :class:`ObjectIdentifier` is fully + qualified and describes the name, database and catalog of the object. + + Two objects are considered equal if they share the same :class:`ObjectIdentifier` in a session + context, such as a :class:`~pyflink.table.TableEnvironment`, where catalogs (or objects in a + catalog) have not been added, deleted or modified. + """ + + _UNKNOWN = "<UNKNOWN>" + + def __init__(self, j_object_identifier): + self._j_object_identifier = j_object_identifier + + def __str__(self): + return self._j_object_identifier.toString() + + def __hash__(self): + return self._j_object_identifier.hashCode() + + def __eq__(self, other): + return isinstance(other, self.__class__) and self._j_object_identifier.equals( + other._j_object_identifier + ) + + @staticmethod + def of(catalog_name: str, database_name: str, object_name: str) -> "ObjectIdentifier": + assert catalog_name is not None, "Catalog name must not be null." + assert database_name is not None, "Database name must not be null." + assert object_name is not None, "Object name must not be null." + + if catalog_name == ObjectIdentifier._UNKNOWN or database_name == ObjectIdentifier._UNKNOWN: Review Comment: why do we have to add this logic in Python? Isn't Java doing this for us already and raising an error? ########## flink-python/pyflink/table/table.py: ########## @@ -1077,6 +1078,89 @@ def explain(self, *extra_details: ExplainDetail) -> str: j_extra_details = to_j_explain_detail_arr(extra_details) return self._j_table.explain(TEXT, j_extra_details) + def insert_into( + self, table_path_or_descriptor: Union[str, TableDescriptor], overwrite: bool = False + ) -> TablePipeline: + """ + When ``target_path_or_descriptor`` is a table path: + + Declares that the pipeline defined by the given :class:`Table` (backed by a Review Comment: The `(backed by a ...)` belongs after the `written to a table`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org