I'm not sure that we could support EXCEPT statements, as that would require introspecting the unknown fields (what if the EXCEPT statement matches a field that later is added as an unknown field?). IMO this sort of behavior only makes sense on true pass-through queries. Anything that modifies the input record would be tricky to support.
Nested rows would work for proposal 2. You would need to make sure that the unknown-fields map is recursively added to all nested rows, and you would do this when you infer a schema from the avro schema. On Tue, Dec 8, 2020 at 9:58 AM Andrew Pilloud <apill...@google.com> wrote: > Proposal 1 would also interact poorly with SELECT * EXCEPT ... statements, > which returns all columns except specific ones. Adding an unknown field > does seem like a reasonable way to handle this. It probably needs to be > something that is native to the Row type, so columns added to nested rows > also work. > > Andrew > > On Tue, Dec 8, 2020 at 9:50 AM Reuven Lax <re...@google.com> wrote: > >> There's a difference between a fully dynamic schema and simply being able >> to forward "unknown" fields to the output. >> >> A fully-dynamic schema is not really necessary unless we also had dynamic >> SQL statements. Since the existing SQL statements do not reference the new >> fields by name, there's no reason to add them to the main schema. >> >> However, if you have a SELECT * FROM WHERE XXXX statement that does no >> aggregation, there's fundamentally no reason we couldn't forward the >> messages exactly. In theory we could forward the exact bytes that are in >> the input PCollection, which would necessarily forward the new fields. In >> practice I believe that we convert the input messages to Beam Row objects >> in order to evaluate the WHERE clause, and then convert back to Avro to >> output those messages. I believe this is where we "lose" the unknown >> messages,but this is an implementation artifact - in theory we could output >> the original bytes whenever we see a SELECT *. This is not truly a dynamic >> schema, since you can't really do anything with these extra fields except >> forward them to your output. >> >> I see two possible ways to address this. >> >> 1. As I mentioned above, in the case of a SELECT * we could output the >> original bytes, and only use the Beam Row for evaluating the WHERE clause. >> This might be very expensive though - we risk having to keep two copies of >> every message around, one in the original Avro format and one in Row format. >> >> 2. The other way would be to do what protocol buffers do. We could add >> one extra field to the inferred Beam schema to store new, unknown fields >> (probably this would be a map-valued field). This extra field would simply >> store the raw bytes of these unknown fields, and then when converting back >> to Avro they would be added to the output message. This might also add some >> overhead to the pipeline, so might be best to make this behavior opt in. >> >> Reuven >> >> On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette <bhule...@google.com> wrote: >> >>> Reuven, could you clarify what you have in mind? I know multiple times >>> we've discussed the possibility of adding update compatibility support to >>> SchemaCoder, including support for certain schema changes (field >>> additions/deletions) - I think the most recent discussion was here [1]. >>> >>> But it sounds like Talat is asking for something a little beyond that, >>> effectively a dynamic schema. Is that something you think we can support? >>> >>> [1] >>> https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E >>> >>> On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax <re...@google.com> wrote: >>> >>>> Thanks. It might be theoretically possible to do this (at least for the >>>> case where existing fields do not change). Whether anyone currently has >>>> available time to do this is a different question, but it's something that >>>> can be looked into. >>>> >>>> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer < >>>> tuya...@paloaltonetworks.com> wrote: >>>> >>>>> Adding new fields is more common than modifying existing fields. But >>>>> type change is also possible for existing fields, such as regular >>>>> mandatory >>>>> field(string,integer) to union(nullable field). No field deletion. >>>>> >>>>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> And when you say schema changes, are these new fields being added to >>>>>> the schema? Or are you making changes to the existing fields? >>>>>> >>>>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer < >>>>>> tuya...@paloaltonetworks.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> For sure let me explain a little bit about my pipeline. >>>>>>> My Pipeline is actually simple >>>>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>, >>>>>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from >>>>>>> Row to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc >>>>>>> >>>>>>> On our jobs We have three type sqls >>>>>>> - SELECT * FROM PCOLLECTION >>>>>>> - SELECT * FROM PCOLLECTION <with Where Condition> >>>>>>> - SQL Projection with or without Where clause SELECT col1, col2 >>>>>>> FROM PCOLLECTION >>>>>>> >>>>>>> We know writerSchema for each message. While deserializing avro >>>>>>> binary we use writer schema and reader schema on Convert Avro Bytes to >>>>>>> Beam >>>>>>> Row step. It always produces a reader schema's generic record and we >>>>>>> convert that generic record to Row. >>>>>>> While submitting DF job we use latest schema to generate beamSchema. >>>>>>> >>>>>>> In the current scenario When we have schema changes first we restart >>>>>>> all 15k jobs with the latest updated schema then whenever we are done we >>>>>>> turn on the latest schema for writers. Because of Avro's >>>>>>> GrammerResolver[1] >>>>>>> we read different versions of the schema and we always produce the >>>>>>> latest >>>>>>> schema's record. Without breaking our pipeline we are able to handle >>>>>>> multiple versions of data in the same streaming pipeline. If we can >>>>>>> generate SQL's java code when we get notified wirth latest schema we >>>>>>> will >>>>>>> handle all schema changes. The only remaining obstacle is Beam's SQL >>>>>>> Java >>>>>>> code. That's why I am looking for some solution. We dont need multiple >>>>>>> versions of SQL. We only need to regenerate SQL schema with the latest >>>>>>> schema on the fly. >>>>>>> >>>>>>> I hope I can explain it :) >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> [1] >>>>>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html >>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=> >>>>>>> >>>>>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com> wrote: >>>>>>> >>>>>>>> Can you explain the use case some more? Are you wanting to change >>>>>>>> your SQL statement as well when the schema changes? If not, what are >>>>>>>> those >>>>>>>> new fields doing in the pipeline? What I mean is that your old SQL >>>>>>>> statement clearly didn't reference those fields in a SELECT statement >>>>>>>> since >>>>>>>> they didn't exist, so what are you missing by not having them unless >>>>>>>> you >>>>>>>> are also changing the SQL statement? >>>>>>>> >>>>>>>> Is this a case where you have a SELECT *, and just want to make >>>>>>>> sure those fields are included? >>>>>>>> >>>>>>>> Reuven >>>>>>>> >>>>>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer < >>>>>>>> tuya...@paloaltonetworks.com> wrote: >>>>>>>> >>>>>>>>> Hi Andrew, >>>>>>>>> >>>>>>>>> I assume SQL query is not going to change. Changing things is the >>>>>>>>> Row schema by adding new columns or rename columns. if we keep a >>>>>>>>> version >>>>>>>>> information on somewhere for example a KV pair. Key is schema >>>>>>>>> information, >>>>>>>>> value is Row. Can not we generate SQL code ? Why I am asking We have >>>>>>>>> 15k >>>>>>>>> pipelines. When we have a schema change we restart a 15k DF job which >>>>>>>>> is >>>>>>>>> pain. I am looking for a possible way to avoid job restart. Dont you >>>>>>>>> think >>>>>>>>> it is not still doable ? >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <apill...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Unfortunately we don't have a way to generate the SQL Java code >>>>>>>>>> on the fly, even if we did, that wouldn't solve your problem. I >>>>>>>>>> believe our >>>>>>>>>> recommended practice is to run both the old and new pipeline for >>>>>>>>>> some time, >>>>>>>>>> then pick a window boundary to transition the output from the old >>>>>>>>>> pipeline >>>>>>>>>> to the new one. >>>>>>>>>> >>>>>>>>>> Beam doesn't handle changing the format of data sent between >>>>>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to >>>>>>>>>> serialize >>>>>>>>>> data between steps of the pipeline. The builtin coders (including the >>>>>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't >>>>>>>>>> handle >>>>>>>>>> schema evolution. They are optimized for performance at all costs. >>>>>>>>>> >>>>>>>>>> If you worked around this, the Beam model doesn't support >>>>>>>>>> changing the structure of the pipeline graph. This would >>>>>>>>>> significantly >>>>>>>>>> limit the changes you can make. It would also require some changes >>>>>>>>>> to SQL >>>>>>>>>> to try to produce the same plan for an updated SQL query. >>>>>>>>>> >>>>>>>>>> Andrew >>>>>>>>>> >>>>>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer < >>>>>>>>>> tuya...@paloaltonetworks.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> We are using Beamsql on our pipeline. Our Data is written in >>>>>>>>>>> Avro format. We generate our rows based on our Avro schema. Over >>>>>>>>>>> time the >>>>>>>>>>> schema is changing. I believe Beam SQL generates Java code based on >>>>>>>>>>> what we >>>>>>>>>>> define as BeamSchema while submitting the pipeline. Do you have any >>>>>>>>>>> idea >>>>>>>>>>> How can we handle schema changes with resubmitting our beam job. Is >>>>>>>>>>> it >>>>>>>>>>> possible to generate SQL java code on the fly ? >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> >>>>>>>>>>