Yes Reuven, I would like to write a proposal for that. And also I
like Andrew Pilloud's idea. We can only put necessary fields on Row rest of
them can stay in the unknown field side. We are using Beam Calcite SQL . Is
it ok right ?


On Tue, Dec 8, 2020 at 3:15 PM Reuven Lax <re...@google.com> wrote:

> Talat, are you interested in writing a proposal and sending it to
> d...@beam.apache.org? We could help advise on the options.
>
> Reuven
>
> On Tue, Dec 8, 2020 at 10:28 AM Andrew Pilloud <apill...@google.com>
> wrote:
>
>> We could support EXPECT statements in proposal 2 as long as we restricted
>> it to known fields.
>>
>> We are getting into implementation details now. Making unknown fields
>> just a normal column introduces a number of problems. ZetaSQL doesn't
>> support Map type. All our IOs would need to explicitly deal with that
>> special column. There would be a lack of consistency between the various
>> types (Avro, Proto, Json) which should all support this.
>>
>> We might also want something even more invasive: everything is an unknown
>> field unless it is referenced in the SQL query. All of these options are
>> possible. I guess we need someone who has time to work on it to write a
>> proposal.
>>
>> On Tue, Dec 8, 2020 at 10:03 AM Reuven Lax <re...@google.com> wrote:
>>
>>> I'm not sure that we could support EXCEPT statements, as that would
>>> require introspecting the unknown fields (what if the EXCEPT statement
>>> matches a field that later is added as an unknown field?). IMO this sort of
>>> behavior only makes sense on true pass-through queries. Anything that
>>> modifies the input record would be tricky to support.
>>>
>>> Nested rows would work for proposal 2. You would need to make sure that
>>> the unknown-fields map is recursively added to all nested rows, and you
>>> would do this when you infer a schema from the avro schema.
>>>
>>> On Tue, Dec 8, 2020 at 9:58 AM Andrew Pilloud <apill...@google.com>
>>> wrote:
>>>
>>>> Proposal 1 would also interact poorly with SELECT * EXCEPT ...
>>>> statements, which returns all columns except specific ones. Adding an
>>>> unknown field does seem like a reasonable way to handle this. It probably
>>>> needs to be something that is native to the Row type, so columns added to
>>>> nested rows also work.
>>>>
>>>> Andrew
>>>>
>>>> On Tue, Dec 8, 2020 at 9:50 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> There's a difference between a fully dynamic schema and simply being
>>>>> able to forward "unknown" fields to the output.
>>>>>
>>>>> A fully-dynamic schema is not really necessary unless we also had
>>>>> dynamic SQL statements. Since the existing SQL statements do not reference
>>>>> the new fields by name, there's no reason to add them to the main schema.
>>>>>
>>>>> However, if you have a SELECT * FROM WHERE XXXX statement that does no
>>>>> aggregation, there's fundamentally no reason we couldn't forward the
>>>>> messages exactly. In theory we could forward the exact bytes that are in
>>>>> the input PCollection, which would necessarily forward the new fields. In
>>>>> practice I believe that we convert the input messages to Beam Row objects
>>>>> in order to evaluate the WHERE clause, and then convert back to Avro to
>>>>> output those messages. I believe this is where we "lose" the unknown
>>>>> messages,but this is an implementation artifact - in theory we could 
>>>>> output
>>>>> the original bytes whenever we see a SELECT *. This is not truly a dynamic
>>>>> schema, since you can't really do anything with these extra fields except
>>>>> forward them to your output.
>>>>>
>>>>> I see two possible ways to address this.
>>>>>
>>>>> 1. As I mentioned above, in the case of a SELECT * we could output the
>>>>> original bytes, and only use the Beam Row for evaluating the WHERE clause.
>>>>> This might be very expensive though - we risk having to keep two copies of
>>>>> every message around, one in the original Avro format and one in Row 
>>>>> format.
>>>>>
>>>>> 2. The other way would be to do what protocol buffers do. We could add
>>>>> one extra field to the inferred Beam schema to store new, unknown fields
>>>>> (probably this would be a map-valued field). This extra field would simply
>>>>> store the raw bytes of these unknown fields, and then when converting back
>>>>> to Avro they would be added to the output message. This might also add 
>>>>> some
>>>>> overhead to the pipeline, so might be best to make this behavior opt in.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette <bhule...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Reuven, could you clarify what you have in mind? I know multiple
>>>>>> times we've discussed the possibility of adding update compatibility
>>>>>> support to SchemaCoder, including support for certain schema changes 
>>>>>> (field
>>>>>> additions/deletions) - I think the most recent discussion was here [1].
>>>>>>
>>>>>> But it sounds like Talat is asking for something a little beyond
>>>>>> that, effectively a dynamic schema. Is that something you think we can
>>>>>> support?
>>>>>>
>>>>>> [1]
>>>>>> https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E
>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7-2540-253Cdev.beam.apache.org-253E&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=1djoltLEJQ6jtY86m9IeTdPEQJxYe7z71jr8apNlCa0&s=eRPi17hG4lAj-GUxi-8IAvcjsWnYeE5pk_hhVVaLdWc&e=>
>>>>>>
>>>>>> On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Thanks. It might be theoretically possible to do this (at least for
>>>>>>> the case where existing fields do not change). Whether anyone currently 
>>>>>>> has
>>>>>>> available time to do this is a different question, but it's something 
>>>>>>> that
>>>>>>> can be looked into.
>>>>>>>
>>>>>>> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <
>>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>>
>>>>>>>> Adding new fields is more common than modifying existing fields.
>>>>>>>> But type change is also possible for existing fields, such as regular
>>>>>>>> mandatory field(string,integer) to union(nullable field). No field
>>>>>>>> deletion.
>>>>>>>>
>>>>>>>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>>
>>>>>>>>> And when you say schema changes, are these new fields being added
>>>>>>>>> to the schema? Or are you making changes to the existing fields?
>>>>>>>>>
>>>>>>>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <
>>>>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>> For sure let me explain a little bit about my pipeline.
>>>>>>>>>> My Pipeline is actually simple
>>>>>>>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>,
>>>>>>>>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back
>>>>>>>>>> from Row to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc
>>>>>>>>>>
>>>>>>>>>> On our jobs We have three type sqls
>>>>>>>>>> - SELECT * FROM PCOLLECTION
>>>>>>>>>> - SELECT * FROM PCOLLECTION <with Where Condition>
>>>>>>>>>> - SQL Projection with or without Where clause  SELECT col1, col2
>>>>>>>>>> FROM PCOLLECTION
>>>>>>>>>>
>>>>>>>>>> We know writerSchema for each message. While deserializing avro
>>>>>>>>>> binary we use writer schema and reader schema on Convert Avro Bytes 
>>>>>>>>>> to Beam
>>>>>>>>>> Row step. It always produces a reader schema's generic record and we
>>>>>>>>>> convert that generic record to Row.
>>>>>>>>>> While submitting DF job we use latest schema to generate
>>>>>>>>>> beamSchema.
>>>>>>>>>>
>>>>>>>>>> In the current scenario When we have schema changes first we
>>>>>>>>>> restart all 15k jobs with the latest updated schema then whenever we 
>>>>>>>>>> are
>>>>>>>>>> done we turn on the latest schema for writers. Because of Avro's
>>>>>>>>>> GrammerResolver[1] we read different versions of the schema and we 
>>>>>>>>>> always
>>>>>>>>>> produce the latest schema's record. Without breaking our pipeline we 
>>>>>>>>>> are
>>>>>>>>>> able to handle multiple versions of data in the same streaming 
>>>>>>>>>> pipeline. If
>>>>>>>>>> we can generate SQL's java code when we get notified wirth latest 
>>>>>>>>>> schema we
>>>>>>>>>> will handle all schema changes. The only remaining obstacle is 
>>>>>>>>>> Beam's SQL
>>>>>>>>>> Java code. That's why I am looking for some solution. We dont need 
>>>>>>>>>> multiple
>>>>>>>>>> versions of SQL. We only need to regenerate SQL schema with the 
>>>>>>>>>> latest
>>>>>>>>>> schema on the fly.
>>>>>>>>>>
>>>>>>>>>> I hope I can explain it :)
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>>>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=>
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Can you explain the use case some more? Are you wanting to
>>>>>>>>>>> change your SQL statement as well when the schema changes? If not, 
>>>>>>>>>>> what are
>>>>>>>>>>> those new fields doing in the pipeline? What I mean is that your 
>>>>>>>>>>> old SQL
>>>>>>>>>>> statement clearly didn't reference those fields in a SELECT 
>>>>>>>>>>> statement since
>>>>>>>>>>> they didn't exist, so what are you missing by not having them 
>>>>>>>>>>> unless you
>>>>>>>>>>> are also changing the SQL statement?
>>>>>>>>>>>
>>>>>>>>>>> Is this a case where you have a SELECT *, and just want to make
>>>>>>>>>>> sure those fields are included?
>>>>>>>>>>>
>>>>>>>>>>> Reuven
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>>>>>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Andrew,
>>>>>>>>>>>>
>>>>>>>>>>>> I assume SQL query is not going to change. Changing things is
>>>>>>>>>>>> the Row schema by adding new columns or rename columns. if we keep 
>>>>>>>>>>>> a
>>>>>>>>>>>> version information on somewhere for example a KV pair. Key is 
>>>>>>>>>>>> schema
>>>>>>>>>>>> information, value is Row. Can not we generate SQL code ? Why I am 
>>>>>>>>>>>> asking
>>>>>>>>>>>> We have 15k pipelines. When we have a schema change we restart a 
>>>>>>>>>>>> 15k DF job
>>>>>>>>>>>> which is pain. I am looking for a possible way to avoid job 
>>>>>>>>>>>> restart. Dont
>>>>>>>>>>>> you think it is not still doable ?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <
>>>>>>>>>>>> apill...@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Unfortunately we don't have a way to generate the SQL Java
>>>>>>>>>>>>> code on the fly, even if we did, that wouldn't solve your 
>>>>>>>>>>>>> problem. I
>>>>>>>>>>>>> believe our recommended practice is to run both the old and new 
>>>>>>>>>>>>> pipeline
>>>>>>>>>>>>> for some time, then pick a window boundary to transition the 
>>>>>>>>>>>>> output from
>>>>>>>>>>>>> the old pipeline to the new one.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Beam doesn't handle changing the format of data sent between
>>>>>>>>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to 
>>>>>>>>>>>>> serialize
>>>>>>>>>>>>> data between steps of the pipeline. The builtin coders (including 
>>>>>>>>>>>>> the
>>>>>>>>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't 
>>>>>>>>>>>>> handle
>>>>>>>>>>>>> schema evolution. They are optimized for performance at all costs.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If you worked around this, the Beam model doesn't support
>>>>>>>>>>>>> changing the structure of the pipeline graph. This would 
>>>>>>>>>>>>> significantly
>>>>>>>>>>>>> limit the changes you can make. It would also require some 
>>>>>>>>>>>>> changes to SQL
>>>>>>>>>>>>> to try to produce the same plan for an updated SQL query.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>>>>>>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We are using Beamsql on our pipeline. Our Data is written in
>>>>>>>>>>>>>> Avro format. We generate our rows based on our Avro schema. Over 
>>>>>>>>>>>>>> time the
>>>>>>>>>>>>>> schema is changing. I believe Beam SQL generates Java code based 
>>>>>>>>>>>>>> on what we
>>>>>>>>>>>>>> define as BeamSchema while submitting the pipeline. Do you have 
>>>>>>>>>>>>>> any idea
>>>>>>>>>>>>>> How can we handle schema changes with resubmitting our beam job. 
>>>>>>>>>>>>>> Is it
>>>>>>>>>>>>>> possible to generate SQL java code on the fly ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>
>>>>>>>>>>>>>

Reply via email to