Thanks. It might be theoretically possible to do this (at least for the
case where existing fields do not change). Whether anyone currently has
available time to do this is a different question, but it's something that
can be looked into.

On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <tuya...@paloaltonetworks.com>
wrote:

> Adding new fields is more common than modifying existing fields. But type
> change is also possible for existing fields, such as regular mandatory
> field(string,integer) to union(nullable field). No field deletion.
>
> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com> wrote:
>
>> And when you say schema changes, are these new fields being added to the
>> schema? Or are you making changes to the existing fields?
>>
>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <tuya...@paloaltonetworks.com>
>> wrote:
>>
>>> Hi,
>>> For sure let me explain a little bit about my pipeline.
>>> My Pipeline is actually simple
>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>,
>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from Row
>>> to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc
>>>
>>> On our jobs We have three type sqls
>>> - SELECT * FROM PCOLLECTION
>>> - SELECT * FROM PCOLLECTION <with Where Condition>
>>> - SQL Projection with or without Where clause  SELECT col1, col2 FROM
>>> PCOLLECTION
>>>
>>> We know writerSchema for each message. While deserializing avro binary
>>> we use writer schema and reader schema on Convert Avro Bytes to Beam Row
>>> step. It always produces a reader schema's generic record and we convert
>>> that generic record to Row.
>>> While submitting DF job we use latest schema to generate beamSchema.
>>>
>>> In the current scenario When we have schema changes first we restart all
>>> 15k jobs with the latest updated schema then whenever we are done we turn
>>> on the latest schema for writers. Because of Avro's GrammerResolver[1] we
>>> read different versions of the schema and we always produce the latest
>>> schema's record. Without breaking our pipeline we are able to handle
>>> multiple versions of data in the same streaming pipeline. If we can
>>> generate SQL's java code when we get notified wirth latest schema we will
>>> handle all schema changes. The only remaining obstacle is Beam's SQL Java
>>> code. That's why I am looking for some solution. We dont need multiple
>>> versions of SQL. We only need to regenerate SQL schema with the latest
>>> schema on the fly.
>>>
>>> I hope I can explain it :)
>>>
>>> Thanks
>>>
>>> [1]
>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=>
>>>
>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Can you explain the use case some more? Are you wanting to change your
>>>> SQL statement as well when the schema changes? If not, what are those new
>>>> fields doing in the pipeline? What I mean is that your old SQL statement
>>>> clearly didn't reference those fields in a SELECT statement since they
>>>> didn't exist, so what are you missing by not having them unless you are
>>>> also changing the SQL statement?
>>>>
>>>> Is this a case where you have a SELECT *, and just want to make sure
>>>> those fields are included?
>>>>
>>>> Reuven
>>>>
>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>>> tuya...@paloaltonetworks.com> wrote:
>>>>
>>>>> Hi Andrew,
>>>>>
>>>>> I assume SQL query is not going to change. Changing things is the Row
>>>>> schema by adding new columns or rename columns. if we keep a version
>>>>> information on somewhere for example a KV pair. Key is schema information,
>>>>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
>>>>> pipelines. When we have a schema change we restart a 15k DF job which is
>>>>> pain. I am looking for a possible way to avoid job restart. Dont you think
>>>>> it is not still doable ?
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <apill...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Unfortunately we don't have a way to generate the SQL Java code on
>>>>>> the fly, even if we did, that wouldn't solve your problem. I believe our
>>>>>> recommended practice is to run both the old and new pipeline for some 
>>>>>> time,
>>>>>> then pick a window boundary to transition the output from the old 
>>>>>> pipeline
>>>>>> to the new one.
>>>>>>
>>>>>> Beam doesn't handle changing the format of data sent between
>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to serialize
>>>>>> data between steps of the pipeline. The builtin coders (including the
>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't handle
>>>>>> schema evolution. They are optimized for performance at all costs.
>>>>>>
>>>>>> If you worked around this, the Beam model doesn't support changing
>>>>>> the structure of the pipeline graph. This would significantly limit the
>>>>>> changes you can make. It would also require some changes to SQL to try to
>>>>>> produce the same plan for an updated SQL query.
>>>>>>
>>>>>> Andrew
>>>>>>
>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> We are using Beamsql on our pipeline. Our Data is written in Avro
>>>>>>> format. We generate our rows based on our Avro schema. Over time the 
>>>>>>> schema
>>>>>>> is changing. I believe Beam SQL generates Java code based on what we 
>>>>>>> define
>>>>>>> as BeamSchema while submitting the pipeline. Do you have any idea How 
>>>>>>> can
>>>>>>> we handle schema changes with resubmitting our beam job. Is it possible 
>>>>>>> to
>>>>>>> generate SQL java code on the fly ?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>

Reply via email to