Yes Reuven, I would like to write a proposal for that. And also I like Andrew Pilloud's idea. We can only put necessary fields on Row rest of them can stay in the unknown field side. We are using Beam Calcite SQL . Is it ok right ?
On Tue, Dec 8, 2020 at 3:15 PM Reuven Lax <re...@google.com> wrote: > Talat, are you interested in writing a proposal and sending it to > d...@beam.apache.org? We could help advise on the options. > > Reuven > > On Tue, Dec 8, 2020 at 10:28 AM Andrew Pilloud <apill...@google.com> > wrote: > >> We could support EXPECT statements in proposal 2 as long as we restricted >> it to known fields. >> >> We are getting into implementation details now. Making unknown fields >> just a normal column introduces a number of problems. ZetaSQL doesn't >> support Map type. All our IOs would need to explicitly deal with that >> special column. There would be a lack of consistency between the various >> types (Avro, Proto, Json) which should all support this. >> >> We might also want something even more invasive: everything is an unknown >> field unless it is referenced in the SQL query. All of these options are >> possible. I guess we need someone who has time to work on it to write a >> proposal. >> >> On Tue, Dec 8, 2020 at 10:03 AM Reuven Lax <re...@google.com> wrote: >> >>> I'm not sure that we could support EXCEPT statements, as that would >>> require introspecting the unknown fields (what if the EXCEPT statement >>> matches a field that later is added as an unknown field?). IMO this sort of >>> behavior only makes sense on true pass-through queries. Anything that >>> modifies the input record would be tricky to support. >>> >>> Nested rows would work for proposal 2. You would need to make sure that >>> the unknown-fields map is recursively added to all nested rows, and you >>> would do this when you infer a schema from the avro schema. >>> >>> On Tue, Dec 8, 2020 at 9:58 AM Andrew Pilloud <apill...@google.com> >>> wrote: >>> >>>> Proposal 1 would also interact poorly with SELECT * EXCEPT ... >>>> statements, which returns all columns except specific ones. Adding an >>>> unknown field does seem like a reasonable way to handle this. It probably >>>> needs to be something that is native to the Row type, so columns added to >>>> nested rows also work. >>>> >>>> Andrew >>>> >>>> On Tue, Dec 8, 2020 at 9:50 AM Reuven Lax <re...@google.com> wrote: >>>> >>>>> There's a difference between a fully dynamic schema and simply being >>>>> able to forward "unknown" fields to the output. >>>>> >>>>> A fully-dynamic schema is not really necessary unless we also had >>>>> dynamic SQL statements. Since the existing SQL statements do not reference >>>>> the new fields by name, there's no reason to add them to the main schema. >>>>> >>>>> However, if you have a SELECT * FROM WHERE XXXX statement that does no >>>>> aggregation, there's fundamentally no reason we couldn't forward the >>>>> messages exactly. In theory we could forward the exact bytes that are in >>>>> the input PCollection, which would necessarily forward the new fields. In >>>>> practice I believe that we convert the input messages to Beam Row objects >>>>> in order to evaluate the WHERE clause, and then convert back to Avro to >>>>> output those messages. I believe this is where we "lose" the unknown >>>>> messages,but this is an implementation artifact - in theory we could >>>>> output >>>>> the original bytes whenever we see a SELECT *. This is not truly a dynamic >>>>> schema, since you can't really do anything with these extra fields except >>>>> forward them to your output. >>>>> >>>>> I see two possible ways to address this. >>>>> >>>>> 1. As I mentioned above, in the case of a SELECT * we could output the >>>>> original bytes, and only use the Beam Row for evaluating the WHERE clause. >>>>> This might be very expensive though - we risk having to keep two copies of >>>>> every message around, one in the original Avro format and one in Row >>>>> format. >>>>> >>>>> 2. The other way would be to do what protocol buffers do. We could add >>>>> one extra field to the inferred Beam schema to store new, unknown fields >>>>> (probably this would be a map-valued field). This extra field would simply >>>>> store the raw bytes of these unknown fields, and then when converting back >>>>> to Avro they would be added to the output message. This might also add >>>>> some >>>>> overhead to the pipeline, so might be best to make this behavior opt in. >>>>> >>>>> Reuven >>>>> >>>>> On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette <bhule...@google.com> >>>>> wrote: >>>>> >>>>>> Reuven, could you clarify what you have in mind? I know multiple >>>>>> times we've discussed the possibility of adding update compatibility >>>>>> support to SchemaCoder, including support for certain schema changes >>>>>> (field >>>>>> additions/deletions) - I think the most recent discussion was here [1]. >>>>>> >>>>>> But it sounds like Talat is asking for something a little beyond >>>>>> that, effectively a dynamic schema. Is that something you think we can >>>>>> support? >>>>>> >>>>>> [1] >>>>>> https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E >>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7-2540-253Cdev.beam.apache.org-253E&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=1djoltLEJQ6jtY86m9IeTdPEQJxYe7z71jr8apNlCa0&s=eRPi17hG4lAj-GUxi-8IAvcjsWnYeE5pk_hhVVaLdWc&e=> >>>>>> >>>>>> On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> Thanks. It might be theoretically possible to do this (at least for >>>>>>> the case where existing fields do not change). Whether anyone currently >>>>>>> has >>>>>>> available time to do this is a different question, but it's something >>>>>>> that >>>>>>> can be looked into. >>>>>>> >>>>>>> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer < >>>>>>> tuya...@paloaltonetworks.com> wrote: >>>>>>> >>>>>>>> Adding new fields is more common than modifying existing fields. >>>>>>>> But type change is also possible for existing fields, such as regular >>>>>>>> mandatory field(string,integer) to union(nullable field). No field >>>>>>>> deletion. >>>>>>>> >>>>>>>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com> wrote: >>>>>>>> >>>>>>>>> And when you say schema changes, are these new fields being added >>>>>>>>> to the schema? Or are you making changes to the existing fields? >>>>>>>>> >>>>>>>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer < >>>>>>>>> tuya...@paloaltonetworks.com> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> For sure let me explain a little bit about my pipeline. >>>>>>>>>> My Pipeline is actually simple >>>>>>>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>, >>>>>>>>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back >>>>>>>>>> from Row to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc >>>>>>>>>> >>>>>>>>>> On our jobs We have three type sqls >>>>>>>>>> - SELECT * FROM PCOLLECTION >>>>>>>>>> - SELECT * FROM PCOLLECTION <with Where Condition> >>>>>>>>>> - SQL Projection with or without Where clause SELECT col1, col2 >>>>>>>>>> FROM PCOLLECTION >>>>>>>>>> >>>>>>>>>> We know writerSchema for each message. While deserializing avro >>>>>>>>>> binary we use writer schema and reader schema on Convert Avro Bytes >>>>>>>>>> to Beam >>>>>>>>>> Row step. It always produces a reader schema's generic record and we >>>>>>>>>> convert that generic record to Row. >>>>>>>>>> While submitting DF job we use latest schema to generate >>>>>>>>>> beamSchema. >>>>>>>>>> >>>>>>>>>> In the current scenario When we have schema changes first we >>>>>>>>>> restart all 15k jobs with the latest updated schema then whenever we >>>>>>>>>> are >>>>>>>>>> done we turn on the latest schema for writers. Because of Avro's >>>>>>>>>> GrammerResolver[1] we read different versions of the schema and we >>>>>>>>>> always >>>>>>>>>> produce the latest schema's record. Without breaking our pipeline we >>>>>>>>>> are >>>>>>>>>> able to handle multiple versions of data in the same streaming >>>>>>>>>> pipeline. If >>>>>>>>>> we can generate SQL's java code when we get notified wirth latest >>>>>>>>>> schema we >>>>>>>>>> will handle all schema changes. The only remaining obstacle is >>>>>>>>>> Beam's SQL >>>>>>>>>> Java code. That's why I am looking for some solution. We dont need >>>>>>>>>> multiple >>>>>>>>>> versions of SQL. We only need to regenerate SQL schema with the >>>>>>>>>> latest >>>>>>>>>> schema on the fly. >>>>>>>>>> >>>>>>>>>> I hope I can explain it :) >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html >>>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=> >>>>>>>>>> >>>>>>>>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Can you explain the use case some more? Are you wanting to >>>>>>>>>>> change your SQL statement as well when the schema changes? If not, >>>>>>>>>>> what are >>>>>>>>>>> those new fields doing in the pipeline? What I mean is that your >>>>>>>>>>> old SQL >>>>>>>>>>> statement clearly didn't reference those fields in a SELECT >>>>>>>>>>> statement since >>>>>>>>>>> they didn't exist, so what are you missing by not having them >>>>>>>>>>> unless you >>>>>>>>>>> are also changing the SQL statement? >>>>>>>>>>> >>>>>>>>>>> Is this a case where you have a SELECT *, and just want to make >>>>>>>>>>> sure those fields are included? >>>>>>>>>>> >>>>>>>>>>> Reuven >>>>>>>>>>> >>>>>>>>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer < >>>>>>>>>>> tuya...@paloaltonetworks.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Andrew, >>>>>>>>>>>> >>>>>>>>>>>> I assume SQL query is not going to change. Changing things is >>>>>>>>>>>> the Row schema by adding new columns or rename columns. if we keep >>>>>>>>>>>> a >>>>>>>>>>>> version information on somewhere for example a KV pair. Key is >>>>>>>>>>>> schema >>>>>>>>>>>> information, value is Row. Can not we generate SQL code ? Why I am >>>>>>>>>>>> asking >>>>>>>>>>>> We have 15k pipelines. When we have a schema change we restart a >>>>>>>>>>>> 15k DF job >>>>>>>>>>>> which is pain. I am looking for a possible way to avoid job >>>>>>>>>>>> restart. Dont >>>>>>>>>>>> you think it is not still doable ? >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud < >>>>>>>>>>>> apill...@google.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Unfortunately we don't have a way to generate the SQL Java >>>>>>>>>>>>> code on the fly, even if we did, that wouldn't solve your >>>>>>>>>>>>> problem. I >>>>>>>>>>>>> believe our recommended practice is to run both the old and new >>>>>>>>>>>>> pipeline >>>>>>>>>>>>> for some time, then pick a window boundary to transition the >>>>>>>>>>>>> output from >>>>>>>>>>>>> the old pipeline to the new one. >>>>>>>>>>>>> >>>>>>>>>>>>> Beam doesn't handle changing the format of data sent between >>>>>>>>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to >>>>>>>>>>>>> serialize >>>>>>>>>>>>> data between steps of the pipeline. The builtin coders (including >>>>>>>>>>>>> the >>>>>>>>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't >>>>>>>>>>>>> handle >>>>>>>>>>>>> schema evolution. They are optimized for performance at all costs. >>>>>>>>>>>>> >>>>>>>>>>>>> If you worked around this, the Beam model doesn't support >>>>>>>>>>>>> changing the structure of the pipeline graph. This would >>>>>>>>>>>>> significantly >>>>>>>>>>>>> limit the changes you can make. It would also require some >>>>>>>>>>>>> changes to SQL >>>>>>>>>>>>> to try to produce the same plan for an updated SQL query. >>>>>>>>>>>>> >>>>>>>>>>>>> Andrew >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer < >>>>>>>>>>>>> tuya...@paloaltonetworks.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> We are using Beamsql on our pipeline. Our Data is written in >>>>>>>>>>>>>> Avro format. We generate our rows based on our Avro schema. Over >>>>>>>>>>>>>> time the >>>>>>>>>>>>>> schema is changing. I believe Beam SQL generates Java code based >>>>>>>>>>>>>> on what we >>>>>>>>>>>>>> define as BeamSchema while submitting the pipeline. Do you have >>>>>>>>>>>>>> any idea >>>>>>>>>>>>>> How can we handle schema changes with resubmitting our beam job. >>>>>>>>>>>>>> Is it >>>>>>>>>>>>>> possible to generate SQL java code on the fly ? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> >>>>>>>>>>>>>