I'm not sure that we could support EXCEPT statements, as that would require
introspecting the unknown fields (what if the EXCEPT statement matches a
field that later is added as an unknown field?). IMO this sort of behavior
only makes sense on true pass-through queries. Anything that modifies the
input record would be tricky to support.

Nested rows would work for proposal 2. You would need to make sure that the
unknown-fields map is recursively added to all nested rows, and you would
do this when you infer a schema from the avro schema.

On Tue, Dec 8, 2020 at 9:58 AM Andrew Pilloud <apill...@google.com> wrote:

> Proposal 1 would also interact poorly with SELECT * EXCEPT ... statements,
> which returns all columns except specific ones. Adding an unknown field
> does seem like a reasonable way to handle this. It probably needs to be
> something that is native to the Row type, so columns added to nested rows
> also work.
>
> Andrew
>
> On Tue, Dec 8, 2020 at 9:50 AM Reuven Lax <re...@google.com> wrote:
>
>> There's a difference between a fully dynamic schema and simply being able
>> to forward "unknown" fields to the output.
>>
>> A fully-dynamic schema is not really necessary unless we also had dynamic
>> SQL statements. Since the existing SQL statements do not reference the new
>> fields by name, there's no reason to add them to the main schema.
>>
>> However, if you have a SELECT * FROM WHERE XXXX statement that does no
>> aggregation, there's fundamentally no reason we couldn't forward the
>> messages exactly. In theory we could forward the exact bytes that are in
>> the input PCollection, which would necessarily forward the new fields. In
>> practice I believe that we convert the input messages to Beam Row objects
>> in order to evaluate the WHERE clause, and then convert back to Avro to
>> output those messages. I believe this is where we "lose" the unknown
>> messages,but this is an implementation artifact - in theory we could output
>> the original bytes whenever we see a SELECT *. This is not truly a dynamic
>> schema, since you can't really do anything with these extra fields except
>> forward them to your output.
>>
>> I see two possible ways to address this.
>>
>> 1. As I mentioned above, in the case of a SELECT * we could output the
>> original bytes, and only use the Beam Row for evaluating the WHERE clause.
>> This might be very expensive though - we risk having to keep two copies of
>> every message around, one in the original Avro format and one in Row format.
>>
>> 2. The other way would be to do what protocol buffers do. We could add
>> one extra field to the inferred Beam schema to store new, unknown fields
>> (probably this would be a map-valued field). This extra field would simply
>> store the raw bytes of these unknown fields, and then when converting back
>> to Avro they would be added to the output message. This might also add some
>> overhead to the pipeline, so might be best to make this behavior opt in.
>>
>> Reuven
>>
>> On Tue, Dec 8, 2020 at 9:33 AM Brian Hulette <bhule...@google.com> wrote:
>>
>>> Reuven, could you clarify what you have in mind? I know multiple times
>>> we've discussed the possibility of adding update compatibility support to
>>> SchemaCoder, including support for certain schema changes (field
>>> additions/deletions) - I think the most recent discussion was here [1].
>>>
>>> But it sounds like Talat is asking for something a little beyond that,
>>> effectively a dynamic schema. Is that something you think we can support?
>>>
>>> [1]
>>> https://lists.apache.org/thread.html/ref73a8c40e24e0b038b4e5b065cd502f4c5df2e5e15af6f7ea1cdaa7%40%3Cdev.beam.apache.org%3E
>>>
>>> On Tue, Dec 8, 2020 at 9:20 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Thanks. It might be theoretically possible to do this (at least for the
>>>> case where existing fields do not change). Whether anyone currently has
>>>> available time to do this is a different question, but it's something that
>>>> can be looked into.
>>>>
>>>> On Mon, Dec 7, 2020 at 9:29 PM Talat Uyarer <
>>>> tuya...@paloaltonetworks.com> wrote:
>>>>
>>>>> Adding new fields is more common than modifying existing fields. But
>>>>> type change is also possible for existing fields, such as regular 
>>>>> mandatory
>>>>> field(string,integer) to union(nullable field). No field deletion.
>>>>>
>>>>> On Mon, Dec 7, 2020 at 9:22 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> And when you say schema changes, are these new fields being added to
>>>>>> the schema? Or are you making changes to the existing fields?
>>>>>>
>>>>>> On Mon, Dec 7, 2020 at 9:02 PM Talat Uyarer <
>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> For sure let me explain a little bit about my pipeline.
>>>>>>> My Pipeline is actually simple
>>>>>>> Read Kafka -> Convert Avro Bytes to Beam Row(DoFn<KV<byte[], byte[]>,
>>>>>>> Row>) -> Apply Filter(SqlTransform.query(sql)) -> Convert back from
>>>>>>> Row to Avro (DoFn<Row, byte[]>)-> Write DB/GCS/GRPC etc
>>>>>>>
>>>>>>> On our jobs We have three type sqls
>>>>>>> - SELECT * FROM PCOLLECTION
>>>>>>> - SELECT * FROM PCOLLECTION <with Where Condition>
>>>>>>> - SQL Projection with or without Where clause  SELECT col1, col2
>>>>>>> FROM PCOLLECTION
>>>>>>>
>>>>>>> We know writerSchema for each message. While deserializing avro
>>>>>>> binary we use writer schema and reader schema on Convert Avro Bytes to 
>>>>>>> Beam
>>>>>>> Row step. It always produces a reader schema's generic record and we
>>>>>>> convert that generic record to Row.
>>>>>>> While submitting DF job we use latest schema to generate beamSchema.
>>>>>>>
>>>>>>> In the current scenario When we have schema changes first we restart
>>>>>>> all 15k jobs with the latest updated schema then whenever we are done we
>>>>>>> turn on the latest schema for writers. Because of Avro's 
>>>>>>> GrammerResolver[1]
>>>>>>> we read different versions of the schema and we always produce the 
>>>>>>> latest
>>>>>>> schema's record. Without breaking our pipeline we are able to handle
>>>>>>> multiple versions of data in the same streaming pipeline. If we can
>>>>>>> generate SQL's java code when we get notified wirth latest schema we 
>>>>>>> will
>>>>>>> handle all schema changes. The only remaining obstacle is Beam's SQL 
>>>>>>> Java
>>>>>>> code. That's why I am looking for some solution. We dont need multiple
>>>>>>> versions of SQL. We only need to regenerate SQL schema with the latest
>>>>>>> schema on the fly.
>>>>>>>
>>>>>>> I hope I can explain it :)
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> [1]
>>>>>>> https://avro.apache.org/docs/1.7.2/api/java/org/apache/avro/io/parsing/doc-files/parsing.html
>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__avro.apache.org_docs_1.7.2_api_java_org_apache_avro_io_parsing_doc-2Dfiles_parsing.html&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=0qahAe7vDisJq_hMYGY8F-Bp7-_5lOwOKzNoQ3r3-IQ&s=lwwIMsJO9nmj6_xZcSG_7qkBIaxOwyUXry4st1q70Rc&e=>
>>>>>>>
>>>>>>> On Mon, Dec 7, 2020 at 7:49 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Can you explain the use case some more? Are you wanting to change
>>>>>>>> your SQL statement as well when the schema changes? If not, what are 
>>>>>>>> those
>>>>>>>> new fields doing in the pipeline? What I mean is that your old SQL
>>>>>>>> statement clearly didn't reference those fields in a SELECT statement 
>>>>>>>> since
>>>>>>>> they didn't exist, so what are you missing by not having them unless 
>>>>>>>> you
>>>>>>>> are also changing the SQL statement?
>>>>>>>>
>>>>>>>> Is this a case where you have a SELECT *, and just want to make
>>>>>>>> sure those fields are included?
>>>>>>>>
>>>>>>>> Reuven
>>>>>>>>
>>>>>>>> On Mon, Dec 7, 2020 at 6:31 PM Talat Uyarer <
>>>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Andrew,
>>>>>>>>>
>>>>>>>>> I assume SQL query is not going to change. Changing things is the
>>>>>>>>> Row schema by adding new columns or rename columns. if we keep a 
>>>>>>>>> version
>>>>>>>>> information on somewhere for example a KV pair. Key is schema 
>>>>>>>>> information,
>>>>>>>>> value is Row. Can not we generate SQL code ? Why I am asking We have 
>>>>>>>>> 15k
>>>>>>>>> pipelines. When we have a schema change we restart a 15k DF job which 
>>>>>>>>> is
>>>>>>>>> pain. I am looking for a possible way to avoid job restart. Dont you 
>>>>>>>>> think
>>>>>>>>> it is not still doable ?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Dec 7, 2020 at 6:10 PM Andrew Pilloud <apill...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Unfortunately we don't have a way to generate the SQL Java code
>>>>>>>>>> on the fly, even if we did, that wouldn't solve your problem. I 
>>>>>>>>>> believe our
>>>>>>>>>> recommended practice is to run both the old and new pipeline for 
>>>>>>>>>> some time,
>>>>>>>>>> then pick a window boundary to transition the output from the old 
>>>>>>>>>> pipeline
>>>>>>>>>> to the new one.
>>>>>>>>>>
>>>>>>>>>> Beam doesn't handle changing the format of data sent between
>>>>>>>>>> intermediate steps in a running pipeline. Beam uses "coders" to 
>>>>>>>>>> serialize
>>>>>>>>>> data between steps of the pipeline. The builtin coders (including the
>>>>>>>>>> Schema Row Coder used by SQL) have a fixed data format and don't 
>>>>>>>>>> handle
>>>>>>>>>> schema evolution. They are optimized for performance at all costs.
>>>>>>>>>>
>>>>>>>>>> If you worked around this, the Beam model doesn't support
>>>>>>>>>> changing the structure of the pipeline graph. This would 
>>>>>>>>>> significantly
>>>>>>>>>> limit the changes you can make. It would also require some changes 
>>>>>>>>>> to SQL
>>>>>>>>>> to try to produce the same plan for an updated SQL query.
>>>>>>>>>>
>>>>>>>>>> Andrew
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 7, 2020 at 5:44 PM Talat Uyarer <
>>>>>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> We are using Beamsql on our pipeline. Our Data is written in
>>>>>>>>>>> Avro format. We generate our rows based on our Avro schema. Over 
>>>>>>>>>>> time the
>>>>>>>>>>> schema is changing. I believe Beam SQL generates Java code based on 
>>>>>>>>>>> what we
>>>>>>>>>>> define as BeamSchema while submitting the pipeline. Do you have any 
>>>>>>>>>>> idea
>>>>>>>>>>> How can we handle schema changes with resubmitting our beam job. Is 
>>>>>>>>>>> it
>>>>>>>>>>> possible to generate SQL java code on the fly ?
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to