Reuven, could you clarify what you have in mind? I know multiple times we've discussed the possibility of adding update compatibility support to SchemaCoder, including support for certain schema changes (field additions/deletions) - I think the most recent discussion was here [1].
But it sounds like Talat is asking for something a little beyond that, effectively a dynamic schema. Is that something you think we can support? [1] https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax <[email protected]> wrote: > Thanks. It might be theoretically possible to do this (at least for the > case where existing fields do not change). Whether anyone currently has > available time to do this is a different question, but it's something that > can be looked into. > > On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <[email protected]> > wrote: > >> Adding new fields is more common than modifying existing fields. But type >> change is also possible for existing fields, such as regular mandatory >> field(string,integer) to union(nullable field). No field deletion. >> >> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <[email protected]> wrote: >> >>> And when you say schema changes, are these new fields being added to the >>> schema? Or are you making changes to the existing fields? >>> >>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer < >>> [email protected]> wrote: >>> >>>> Hi, >>>> For sure let me explain a little bit about my pipeline. >>>> My Pipeline is actually simple >>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>, >>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from >>>> Row to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc >>>> >>>> On our jobs We have three type sqls >>>> - SELECT * FROM PCOLLECTION >>>> - SELECT * FROM PCOLLECTION <with Where Condition> >>>> - SQL Projection with or without Where clause SELECT col1, col2 FROM >>>> PCOLLECTION >>>> >>>> We know writerSchema for each message. While deserializing avro binary >>>> we use writer schema and reader schema on Convert Avro Bytes to Beam Row >>>> step. It always produces a reader schema's generic record and we convert >>>> that generic record to Row. >>>> While submitting DF job we use latest schema to generate beamSchema. >>>> >>>> In the current scenario When we have schema changes first we restart >>>> all 15k jobs with the latest updated schema then whenever we are done we >>>> turn on the latest schema for writers. Because of Avro's GrammerResolver[1] >>>> we read different versions of the schema and we always produce the latest >>>> schema's record. Without breaking our pipeline we are able to handle >>>> multiple versions of data in the same streaming pipeline. If we can >>>> generate SQL's java code when we get notified wirth latest schema we will >>>> handle all schema changes. The only remaining obstacle is Beam's SQL Java >>>> code. That's why I am looking for some solution. We dont need multiple >>>> versions of SQL. We only need to regenerate SQL schema with the latest >>>> schema on the fly. >>>> >>>> I hope I can explain it :) >>>> >>>> Thanks >>>> >>>> [1] >>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html >>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=> >>>> >>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <[email protected]> wrote: >>>> >>>>> Can you explain the use case some more? Are you wanting to change your >>>>> SQL statement as well when the schema changes? If not, what are those new >>>>> fields doing in the pipeline? What I mean is that your old SQL statement >>>>> clearly didn't reference those fields in a SELECT statement since they >>>>> didn't exist, so what are you missing by not having them unless you are >>>>> also changing the SQL statement? >>>>> >>>>> Is this a case where you have a SELECT *, and just want to make sure >>>>> those fields are included? >>>>> >>>>> Reuven >>>>> >>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi Andrew, >>>>>> >>>>>> I assume SQL query is not going to change. Changing things is the Row >>>>>> schema by adding new columns or rename columns. if we keep a version >>>>>> information on somewhere for example a KV pair. Key is schema >>>>>> information, >>>>>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k >>>>>> pipelines. When we have a schema change we restart a 15k DF job which is >>>>>> pain. I am looking for a possible way to avoid job restart. Dont you >>>>>> think >>>>>> it is not still doable ? >>>>>> >>>>>> Thanks >>>>>> >>>>>> >>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Unfortunately we don't have a way to generate the SQL Java code on >>>>>>> the fly, even if we did, that wouldn't solve your problem. I believe our >>>>>>> recommended practice is to run both the old and new pipeline for some >>>>>>> time, >>>>>>> then pick a window boundary to transition the output from the old >>>>>>> pipeline >>>>>>> to the new one. >>>>>>> >>>>>>> Beam doesn't handle changing the format of data sent between >>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to >>>>>>> serialize >>>>>>> data between steps of the pipeline. The builtin coders (including the >>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't handle >>>>>>> schema evolution. They are optimized for performance at all costs. >>>>>>> >>>>>>> If you worked around this, the Beam model doesn't support changing >>>>>>> the structure of the pipeline graph. This would significantly limit the >>>>>>> changes you can make. It would also require some changes to SQL to try >>>>>>> to >>>>>>> produce the same plan for an updated SQL query. >>>>>>> >>>>>>> Andrew >>>>>>> >>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> We are using Beamsql on our pipeline. Our Data is written in Avro >>>>>>>> format. We generate our rows based on our Avro schema. Over time the >>>>>>>> schema >>>>>>>> is changing. I believe Beam SQL generates Java code based on what we >>>>>>>> define >>>>>>>> as BeamSchema while submitting the pipeline. Do you have any idea How >>>>>>>> can >>>>>>>> we handle schema changes with resubmitting our beam job. Is it >>>>>>>> possible to >>>>>>>> generate SQL java code on the fly ? >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>
