Thanks Reuven,

I can work on that. I know the internals of BeamSQL. I could not figure out
How to replace Step's code with new generated code after the pipeline is
submitted. Could you share your thoughts on this?

Thanks

On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax <re...@google.com> wrote:

> Thanks. It might be theoretically possible to do this (at least for the
> case where existing fields do not change). Whether anyone currently has
> available time to do this is a different question, but it's something that
> can be looked into.
>
> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <tuya...@paloaltonetworks.com>
> wrote:
>
>> Adding new fields is more common than modifying existing fields. But type
>> change is also possible for existing fields, such as regular mandatory
>> field(string,integer) to union(nullable field). No field deletion.
>>
>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com> wrote:
>>
>>> And when you say schema changes, are these new fields being added to the
>>> schema? Or are you making changes to the existing fields?
>>>
>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <
>>> tuya...@paloaltonetworks.com> wrote:
>>>
>>>> Hi,
>>>> For sure let me explain a little bit about my pipeline.
>>>> My Pipeline is actually simple
>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>,
>>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from
>>>> Row to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc
>>>>
>>>> On our jobs We have three type sqls
>>>> - SELECT * FROM PCOLLECTION
>>>> - SELECT * FROM PCOLLECTION <with Where Condition>
>>>> - SQL Projection with or without Where clause  SELECT col1, col2 FROM
>>>> PCOLLECTION
>>>>
>>>> We know writerSchema for each message. While deserializing avro binary
>>>> we use writer schema and reader schema on Convert Avro Bytes to Beam Row
>>>> step. It always produces a reader schema's generic record and we convert
>>>> that generic record to Row.
>>>> While submitting DF job we use latest schema to generate beamSchema.
>>>>
>>>> In the current scenario When we have schema changes first we restart
>>>> all 15k jobs with the latest updated schema then whenever we are done we
>>>> turn on the latest schema for writers. Because of Avro's GrammerResolver[1]
>>>> we read different versions of the schema and we always produce the latest
>>>> schema's record. Without breaking our pipeline we are able to handle
>>>> multiple versions of data in the same streaming pipeline. If we can
>>>> generate SQL's java code when we get notified wirth latest schema we will
>>>> handle all schema changes. The only remaining obstacle is Beam's SQL Java
>>>> code. That's why I am looking for some solution. We dont need multiple
>>>> versions of SQL. We only need to regenerate SQL schema with the latest
>>>> schema on the fly.
>>>>
>>>> I hope I can explain it :)
>>>>
>>>> Thanks
>>>>
>>>> [1]
>>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=>
>>>>
>>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Can you explain the use case some more? Are you wanting to change your
>>>>> SQL statement as well when the schema changes? If not, what are those new
>>>>> fields doing in the pipeline? What I mean is that your old SQL statement
>>>>> clearly didn't reference those fields in a SELECT statement since they
>>>>> didn't exist, so what are you missing by not having them unless you are
>>>>> also changing the SQL statement?
>>>>>
>>>>> Is this a case where you have a SELECT *, and just want to make sure
>>>>> those fields are included?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>
>>>>>> Hi Andrew,
>>>>>>
>>>>>> I assume SQL query is not going to change. Changing things is the Row
>>>>>> schema by adding new columns or rename columns. if we keep a version
>>>>>> information on somewhere for example a KV pair. Key is schema 
>>>>>> information,
>>>>>> value is Row. Can not we generate SQL code ? Why I am asking We have 15k
>>>>>> pipelines. When we have a schema change we restart a 15k DF job which is
>>>>>> pain. I am looking for a possible way to avoid job restart. Dont you 
>>>>>> think
>>>>>> it is not still doable ?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <apill...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Unfortunately we don't have a way to generate the SQL Java code on
>>>>>>> the fly, even if we did, that wouldn't solve your problem. I believe our
>>>>>>> recommended practice is to run both the old and new pipeline for some 
>>>>>>> time,
>>>>>>> then pick a window boundary to transition the output from the old 
>>>>>>> pipeline
>>>>>>> to the new one.
>>>>>>>
>>>>>>> Beam doesn't handle changing the format of data sent between
>>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to 
>>>>>>> serialize
>>>>>>> data between steps of the pipeline. The builtin coders (including the
>>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't handle
>>>>>>> schema evolution. They are optimized for performance at all costs.
>>>>>>>
>>>>>>> If you worked around this, the Beam model doesn't support changing
>>>>>>> the structure of the pipeline graph. This would significantly limit the
>>>>>>> changes you can make. It would also require some changes to SQL to try 
>>>>>>> to
>>>>>>> produce the same plan for an updated SQL query.
>>>>>>>
>>>>>>> Andrew
>>>>>>>
>>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> We are using Beamsql on our pipeline. Our Data is written in Avro
>>>>>>>> format. We generate our rows based on our Avro schema. Over time the 
>>>>>>>> schema
>>>>>>>> is changing. I believe Beam SQL generates Java code based on what we 
>>>>>>>> define
>>>>>>>> as BeamSchema while submitting the pipeline. Do you have any idea How 
>>>>>>>> can
>>>>>>>> we handle schema changes with resubmitting our beam job. Is it 
>>>>>>>> possible to
>>>>>>>> generate SQL java code on the fly ?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>

Reply via email to