Hi Jark, You totally got my point. Actually, the perfect solution in my opinion is to support schema evolution in one query. Although classic SQL needs to know the schema before do any computing, when integrating the nosql data source to flink datastream, if schema evolution is possible, it will save tons of time for user. For example, when I have some json docs in mongodb, I want to expose the collections as tables in flink SQL. But aligning the schema in flink catalog service is not very friendly, I need to remember to update the catalog when I add a new field in my database.
Although, it is not easy to validate SQL correctly if there is no schema information about the table, for example "select sum(amount) from my_table group by category", if the amount field is not number, runtime error will be thrown. I think this is another challenge about supporting schema evolution. anyway, I think deferring the errors to runtime is fair when user wants to have schema flexibility. Guodong On Mon, Jun 1, 2020 at 12:29 PM Jark Wu <imj...@gmail.com> wrote: > Hi all, > > This is an interesting topic. Schema inference will be the next big > feature planned in the next release. > I added this thread link into FLINK-16420. > > I think the case of Guodong is schema evolution, which I think there is > something to do with schema inference. > I don't have a clear idea for this yet, but some initial thoughts are: > > 1) schema inference can happen for each query, instead of when creating > table. > So that, once data schema is evolved, the catalog table can have the > new schema. > However, this may break existing queries on this catalog table (e.g. > SELECT * FROM T). > 2) manually create a new table with schema inference, we can use LIKE > grammer or SHOW CREATE TABLE to > help creating a table based on existing ones. The new table have the > new schema because we re-infer schema again. > 3) auto-matically create a new tabel with schema inference. This can be > done with some catalogs, for example, SchemaRegistryCatalog, > once a new avro schema (say schema id = 100) is added to the registry, > users can use this new schema with table "mytopic-100". > > > Best, > Jark > > > On Fri, 29 May 2020 at 22:05, Guodong Wang <wangg...@gmail.com> wrote: > >> Benchao, >> >> Thank you for your detailed explanation. >> >> Schema Inference can solve my problem partially. For example, starting >> from some time, all the json afterward will contain a new field. I think >> for this case, schema inference will help. >> but if I need to handle all the json events with different schemas in one >> table(this is the case 2), I agree with you. Schema inference does not >> help either. >> >> >> >> Guodong >> >> >> On Fri, May 29, 2020 at 11:02 AM Benchao Li <libenc...@gmail.com> wrote: >> >>> Hi Guodong, >>> >>> After an offline discussion with Leonard. I think you get the right >>> meaning of schema inference. >>> But there are two problems here: >>> 1. schema of the data is fixed, schema inference can save your effort to >>> write the schema explicitly. >>> 2. schema of the data is dynamic, in this case the schema inference >>> cannot help. Because SQL is somewhat static language, which should know all >>> the data types at compile stage. >>> >>> Maybe I've misunderstood your question at the very beginning. I thought >>> your case is #2. If your case is #1, then schema inference is a good >>> choice. >>> >>> Guodong Wang <wangg...@gmail.com> 于2020年5月28日周四 下午11:39写道: >>> >>>> Yes. Setting the value type as raw is one possible approach. And I >>>> would like to vote for schema inference as well. >>>> >>>> Correct me if I am wrong, IMO schema inference means I can provide a >>>> method in the table source to infer the data schema base on the runtime >>>> computation. Just like some calcite adaptor does. Right? >>>> For SQL table registration, I think that requiring the table source to >>>> provide a static schema might be too strict. Let planner to infer the table >>>> schema will be more flexible. >>>> >>>> Thank you for your suggestions. >>>> >>>> Guodong >>>> >>>> >>>> On Thu, May 28, 2020 at 11:11 PM Benchao Li <libenc...@gmail.com> >>>> wrote: >>>> >>>>> Hi Guodong, >>>>> >>>>> Does the RAW type meet your requirements? For example, you can specify >>>>> map<varchar, raw> type, and the value for the map is the raw JsonNode >>>>> parsed from Jackson. >>>>> This is not supported yet, however IMO this could be supported. >>>>> >>>>> Guodong Wang <wangg...@gmail.com> 于2020年5月28日周四 下午9:43写道: >>>>> >>>>>> Benchao, >>>>>> >>>>>> Thank you for your quick reply. >>>>>> >>>>>> As you mentioned, for current scenario, approach 2 should work for >>>>>> me. But it is a little bit annoying that I have to modify schema to add >>>>>> new >>>>>> field types when upstream app changes the json format or adds new fields. >>>>>> Otherwise, my user can not refer the field in their SQL. >>>>>> >>>>>> Per description in the jira, I think after implementing this, all the >>>>>> json values will be converted as strings. >>>>>> I am wondering if Flink SQL can/will support the flexible schema in >>>>>> the future, for example, register the table without defining >>>>>> specific schema for each field, to let user define a generic map or array >>>>>> for one field. but the value of map/array can be any object. Then, the >>>>>> type >>>>>> conversion cost might be saved. >>>>>> >>>>>> Guodong >>>>>> >>>>>> >>>>>> On Thu, May 28, 2020 at 7:43 PM Benchao Li <libenc...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Guodong, >>>>>>> >>>>>>> I think you almost get the answer, >>>>>>> 1. map type, it's not working for current implementation. For >>>>>>> example, use map<varchar, varchar>, if the value if non-string json >>>>>>> object, >>>>>>> then `JsonNode.asText()` may not work as you wish. >>>>>>> 2. list all fields you cares. IMO, this can fit your scenario. And >>>>>>> you can set format.fail-on-missing-field = true, to allow setting >>>>>>> non-existed fields to be null. >>>>>>> >>>>>>> For 1, I think maybe we can support it in the future, and I've >>>>>>> created jira[1] to track this. >>>>>>> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-18002 >>>>>>> >>>>>>> Guodong Wang <wangg...@gmail.com> 于2020年5月28日周四 下午6:32写道: >>>>>>> >>>>>>>> Hi ! >>>>>>>> >>>>>>>> I want to use Flink SQL to process some json events. It is quite >>>>>>>> challenging to define a schema for the Flink SQL table. >>>>>>>> >>>>>>>> My data source's format is some json like this >>>>>>>> { >>>>>>>> "top_level_key1": "some value", >>>>>>>> "nested_object": { >>>>>>>> "nested_key1": "abc", >>>>>>>> "nested_key2": 123, >>>>>>>> "nested_key3": ["element1", "element2", "element3"] >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> The big challenges for me to define a schema for the data source are >>>>>>>> 1. the keys in nested_object are flexible, there might be 3 unique >>>>>>>> keys or more unique keys. If I enumerate all the keys in the schema, I >>>>>>>> think my code is fragile, how to handle event which contains more >>>>>>>> nested_keys in nested_object ? >>>>>>>> 2. I know table api support Map type, but I am not sure if I can >>>>>>>> put generic object as the value of the map. Because the values in >>>>>>>> nested_object are of different types, some of them are int, some of >>>>>>>> them >>>>>>>> are string or array. >>>>>>>> >>>>>>>> So. how to expose this kind of json data as table in Flink SQL >>>>>>>> without enumerating all the nested_keys? >>>>>>>> >>>>>>>> Thanks. >>>>>>>> >>>>>>>> Guodong >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Best, >>>>>>> Benchao Li >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> >>>>> Best, >>>>> Benchao Li >>>>> >>>> >>> >>> -- >>> >>> Best, >>> Benchao Li >>> >>