Thanks. It might be theoretically possible to do this (at least for the case where existing fields do not change). Whether anyone currently has available time to do this is a different question, but it's something that can be looked into.
On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <tuya...@paloaltonetworks.com> wrote: > Adding new fields is more common than modifying existing fields. But type > change is also possible for existing fields, such as regular mandatory > field(string,integer) to union(nullable field). No field deletion. > > On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com> wrote: > >> And when you say schema changes, are these new fields being added to the >> schema? Or are you making changes to the existing fields? >> >> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <tuya...@paloaltonetworks.com> >> wrote: >> >>> Hi, >>> For sure let me explain a little bit about my pipeline. >>> My Pipeline is actually simple >>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>, >>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from Row >>> to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc >>> >>> On our jobs We have three type sqls >>> - SELECT * FROM PCOLLECTION >>> - SELECT * FROM PCOLLECTION <with Where Condition> >>> - SQL Projection with or without Where clause SELECT col1, col2 FROM >>> PCOLLECTION >>> >>> We know writerSchema for each message. While deserializing avro binary >>> we use writer schema and reader schema on Convert Avro Bytes to Beam Row >>> step. It always produces a reader schema's generic record and we convert >>> that generic record to Row. >>> While submitting DF job we use latest schema to generate beamSchema. >>> >>> In the current scenario When we have schema changes first we restart all >>> 15k jobs with the latest updated schema then whenever we are done we turn >>> on the latest schema for writers. Because of Avro's GrammerResolver[1] we >>> read different versions of the schema and we always produce the latest >>> schema's record. Without breaking our pipeline we are able to handle >>> multiple versions of data in the same streaming pipeline. If we can >>> generate SQL's java code when we get notified wirth latest schema we will >>> handle all schema changes. The only remaining obstacle is Beam's SQL Java >>> code. That's why I am looking for some solution. We dont need multiple >>> versions of SQL. We only need to regenerate SQL schema with the latest >>> schema on the fly. >>> >>> I hope I can explain it :) >>> >>> Thanks >>> >>> [1] >>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html >>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=> >>> >>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com> wrote: >>> >>>> Can you explain the use case some more? Are you wanting to change your >>>> SQL statement as well when the schema changes? If not, what are those new >>>> fields doing in the pipeline? What I mean is that your old SQL statement >>>> clearly didn't reference those fields in a SELECT statement since they >>>> didn't exist, so what are you missing by not having them unless you are >>>> also changing the SQL statement? >>>> >>>> Is this a case where you have a SELECT *, and just want to make sure >>>> those fields are included? >>>> >>>> Reuven >>>> >>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer < >>>> tuya...@paloaltonetworks.com> wrote: >>>> >>>>> Hi Andrew, >>>>> >>>>> I assume SQL query is not going to change. Changing things is the Row >>>>> schema by adding new columns or rename columns. if we keep a version >>>>> information on somewhere for example a KV pair. Key is schema information, >>>>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k >>>>> pipelines. When we have a schema change we restart a 15k DF job which is >>>>> pain. I am looking for a possible way to avoid job restart. Dont you think >>>>> it is not still doable ? >>>>> >>>>> Thanks >>>>> >>>>> >>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <apill...@google.com> >>>>> wrote: >>>>> >>>>>> Unfortunately we don't have a way to generate the SQL Java code on >>>>>> the fly, even if we did, that wouldn't solve your problem. I believe our >>>>>> recommended practice is to run both the old and new pipeline for some >>>>>> time, >>>>>> then pick a window boundary to transition the output from the old >>>>>> pipeline >>>>>> to the new one. >>>>>> >>>>>> Beam doesn't handle changing the format of data sent between >>>>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize >>>>>> data between steps of the pipeline. The builtin coders (including the >>>>>> Schema Row Coder used by SQL) have a fixed data format and don't handle >>>>>> schema evolution. They are optimized for performance at all costs. >>>>>> >>>>>> If you worked around this, the Beam model doesn't support changing >>>>>> the structure of the pipeline graph. This would significantly limit the >>>>>> changes you can make. It would also require some changes to SQL to try to >>>>>> produce the same plan for an updated SQL query. >>>>>> >>>>>> Andrew >>>>>> >>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer < >>>>>> tuya...@paloaltonetworks.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> We are using Beamsql on our pipeline. Our Data is written in Avro >>>>>>> format. We generate our rows based on our Avro schema. Over time the >>>>>>> schema >>>>>>> is changing. I believe Beam SQL generates Java code based on what we >>>>>>> define >>>>>>> as BeamSchema while submitting the pipeline. Do you have any idea How >>>>>>> can >>>>>>> we handle schema changes with resubmitting our beam job. Is it possible >>>>>>> to >>>>>>> generate SQL java code on the fly ? >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>