Hi Flavio, I'll open a JIRA for de/serializing TableSource to textual JSON.
Would something like this work for you? main() { ExecutionEnvironment env = ... TableEnvironment tEnv = ... // accessing an external catalog YourTableSource ts = Catalog.getTableSource("someIdentifier"); tEnv.registerTableSource("someId", ts); // preparing meta data MetaData meta = ts.getMetaData() DataSet<MetaData> metaDS = env.fromElements(meta); // read data, table transformations + conversion to DataSet Table t = tEnv.scan("someId"); // apply some Table transformations if necessary DataSet<TupleX<...>> ds = tEnv.toDataSet(t, TupleX); // apply custom functions on data set ds.map(MetaMapFunctionWrapper(new MetaMapFunction())).withBroadcastSet(metaDS, "meta"); // continue program } The YourMapFunctionWrapper could be a RichMapFunction that accesses the meta data from the broadcasted set and provides it to a wrapped MetaMapFunction (an extended MapFunction with custom interface for meta data). Depending on what kind of interface you plan to offer, you can hide most of the complexity, e.g, users would only have to implement a MetaMapFunction not have to deal with the broadcasting and accessing of meta data (this would be done by your wrapper). Fabian 2016-05-05 10:08 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > HI Fabian, > thanks for your detailed answer, as usual ;) > > I think that an external service it's ok,actually I wasn't aware of the > TableSource interface. > As you said, an utility to serialize and deserialize them would be very > helpful and will ease this thing. > However, registering metadata for a table is a very common task to do. > Wouldn't be of useful for other Flink-related projects (I was thinking to > Nifi for example) to define a common minimal set of (optional) metadata to > display in a UI for a TableSource (like name, description, creationDate, > creator, field aliases)? > > About point 2, I think that dataset broadcasting or closure variables are > useful when you write a program, not if you try to "compose" it using > reusable UDFs (using a script like in Pig). > Of course, the worst case scenario for us (e.g. right now) is to connect > to our repository within rich operators but I thought that it could be easy > to define a link from operators to TableEnvironment and then to TableSource > (using the lineage tag/source-id you said) and, finally to its metadata. I > don't know whether this is specific only to us, I just wanted to share our > needs and see if the table API development could benefit from them. > > Best, > Flavio > > On Wed, May 4, 2016 at 10:35 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Flavio, >> >> I thought a bit about your proposal. I am not sure if it is actually >> necessary to integrate a central source repository into Flink. It should be >> possible to offer this as an external service which is based on the >> recently added TableSource interface. TableSources could be extended to be >> able to serialize and descerialize their configuration to/from JSON. When >> the external repository service starts, it can read the JSON fields and >> instantiate and register TableSource objectes. The repository could also >> hold metadata about the sources and serve a (web) UI to list available >> source. When a Flink program wants to access a data source which is >> registered in the repository, it could lookup the respective TableSouce >> object from the repository. >> >> Given that an integration of metadata with Flink user functions (point 2. >> in your proposal) is a very special requirement, I am not sure how much >> "native" support should be added to Flink. Would it be possible to add a >> lineage tag to each record and ship the metadata of all sources as >> broadcast set to each operator? Then user functions could lookup the >> metadata from the broadcast set. >> >> Best, Fabian >> >> 2016-04-29 12:49 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >> >>> Hi to all, >>> >>> as discussed briefly with Fabian, for our products in Okkam we need a >>> central repository of DataSources processed by Flink. >>> With respect to existing external catalogs, such as Hive or Confluent's >>> SchemaRegistry, whose objective is to provide necessary metadata to >>> read/write the registered tables, we would also need a way to acess to >>> other general metadata (e.g. name, description, creator, creation date, >>> lastUpdate date, processedRecords, certificationLevel of provided data, >>> provenance, language, etc). >>> >>> This integration has 2 main goals: >>> >>> 1. In a UI: to enable the user to choose (or even create) a >>> datasource to process with some task (e.g. quality assessment) and then >>> see >>> its metadata (name, description, creator user, etc) >>> 2. During a Flink job: when 2 datasource gets joined and we have >>> multiple values for an attribute (e.g. name or lastname) we can access >>> the >>> datasource metadata to decide which value to retain (e.g. the one coming >>> from the most authoritative/certified source for that attribute) >>> >>> We also think that this could be of interest for projects like Apache >>> Zeppelin or Nifi enabling them to suggest to the user the sources to start >>> from. >>> >>> Do you think it makes sense to think about designing such a module for >>> Flink? >>> >>> Best, >>> Flavio >>> >> >> >