That makes sense Fabian!
So I want to make sure I fully understand how this should look.
Would the same expression look like:

custom_groupby(my_group_fields, map[ 'a', sum(a)...])
?
Will I be able to use the builtin aggregation function internally such as 
sum/avg etc? or would I need to reimplement all such functions?
In terms of schema evolution, if these are implemented as a map state, will I 
be OK as new items are added to that map?

Thanks again, and congrats on an awesome conference, I had learned a lot
Shahar

From: Fabian Hueske
Sent: Monday, April 8, 02:54
Subject: Re: Schema Evolution on Dynamic Schema
To: Shahar Cizer Kobrinsky
Cc: Rong Rong, user


Hi Shahar,

Sorry for the late response.

The problem is not with the type of the retract stream, but with the GROUP BY 
aggregation operator.
The optimizer is converting the plan into an aggregation operator that computes 
all aggregates followed by a projection that inserts the aggregation results 
into a MAP type.
The problem is the state of the aggregation operator. By adding a new field to 
the map, the state of the operator changes and you cannot restore it.
The only workaround that I can think of would be to implement a user-defined 
aggregation function [1] that performs all aggregations internally and manually 
maintain state compatibility for the accumulator of the UDAGG.

Best,
Fabian

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#aggregation-functions

Am Do., 28. März 2019 um 22:28 Uhr schrieb Shahar Cizer Kobrinsky 
<shahar.kobrin...@gmail.com<mailto:shahar.kobrin...@gmail.com>>:
Hmm kinda stuck here. Seems like SQL Group by is translated to a 
GroupAggProcessFunction which stores a state for every aggregation element 
(thus flattening the map items for state store). Seems like there's no way 
around it. Am i wrong? is there any way to evolve the map elements when doing 
SELECT map['a', sum(a), 'b', sum(b).. ] FROM.. group by ..  ?

On Wed, Mar 20, 2019 at 2:00 AM Fabian Hueske 
<fhue...@gmail.com<mailto:fhue...@gmail.com>> wrote:
Hi,

I think this would work.
However, you should be aware that all keys are kept forever in state (unless 
you configure idle state retention time [1]).
This includes previous versions of keys.

Also note that we are not guaranteeing savepoint compatibility across Flink 
versions yet.
If the state of the aggregation operator changes in a later version (say Flink 
1.9.x), it might not be possible to migrate to a later Flink version.
Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided.

Best,
Fabian

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/query_configuration.html#idle-state-retention-time

Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky 
<shahar.kobrin...@gmail.com<mailto:shahar.kobrin...@gmail.com>>:
My bad. it actually did work with
Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map
group by a

do you think thats OK as a workaround? main schema should be changed that way - 
only keys in the map

On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky 
<shahar.kobrin...@gmail.com<mailto:shahar.kobrin...@gmail.com>> wrote:
Thanks Fabian,

Im thinking about how to work around that issue and one thing that came to my 
mind is to create a map that holds keys & values that can be edited without 
changing the schema, though im thinking how to implement it in Calcite.
Considering the following original SQL in which "metrics" can be 
added/deleted/renamed
Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
Group by a

im looking both at json_objectagg & map to change it but it seems that 
json_objectagg is on a later calcite version and map doesnt work for me.
Trying something like
Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map
group by a

results with "Non-query expression encountered in illegal context"
is my train of thought the right one? if so, do i have a mistake in the way im 
trying to implement it?

Thanks!







On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske 
<fhue...@gmail.com<mailto:fhue...@gmail.com>> wrote:
Hi,

Restarting a changed query from a savepoint is currently not supported.
In general this is a very difficult problem as new queries might result in 
completely different execution plans.
The special case of adding and removing aggregates is easier to solve, but the 
schema of the stored state changes and we would need to analyze the previous 
and current query and generate compatible serializers.
So far we did not explore this rabbit hole.

Also, starting a different query from a savepoint can also lead to weird result 
semantics.
I'd recommend to bootstrap the state of the new query from scatch.

Best, Fabian



Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky 
<shahar.kobrin...@gmail.com<mailto:shahar.kobrin...@gmail.com>>:
Or is it the SQL state that is incompatible.. ?

On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky 
<shahar.kobrin...@gmail.com<mailto:shahar.kobrin...@gmail.com>> wrote:
Thanks Guys,

I actually got an error now adding some fields into the select statement:

java.lang.RuntimeException: Error while getting state
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
at 
org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap backends, 
the new state serializer must not be incompatible.
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
at 
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 more

Does that mean i should move from having a Pojo storing the result of the SQL 
retracted stream to Avro? trying to understand how to mitigate it.

Thanks

On Sat, Mar 9, 2019 at 4:41 PM Rong Rong 
<walter...@gmail.com<mailto:walter...@gmail.com>> wrote:
Hi Shahar,

>From my understanding, if you use "groupby" withAggregateFunctions, they save 
>the accumulators to SQL internal states: which are invariant from your input 
>schema. Based on what you described I think that's why it is fine for 
>recovering from existing state.
I think one confusion you might have is the "toRetractStream" syntax. This 
actually passes the "retracting" flag to the Flink planner to indicate how the 
DataStream operator gets generated based on your SQL.

So in my understanding, there's really no "state" associated with the 
"retracting stream", but rather associated with the generated operators.
However, I am not expert in Table/SQL state recovery: I recall there were an 
open JIRA[1] that might be related to your question regarding SQL/Table 
generated operator recovery. Maybe @Fabian can provide more insight here?

Regarding the rest of the pipeline, both "filter" and "map" operators are 
stateless; and sink state recovery depends on what you do.

--
Rong

[1] https://issues.apache.org/jira/browse/FLINK-6966

On Fri, Mar 8, 2019 at 12:07 PM shkob1 
<shahar.kobrin...@gmail.com<mailto:shahar.kobrin...@gmail.com>> wrote:
Thanks Rong,

I have made some quick test changing the SQL select (adding a select field
in the middle) and reran the job from a savepoint and it worked without any
errors. I want to make sure i understand how at what point the state is
stored and how does it work.

Let's simplify the scenario and forget my specific case of dynamically
generated pojo. let's focus on generic steps of:
Source->register table->SQL select and group by session->retracted stream
(Row)->transformToPojo (Custom Map function) ->pushToSink

And let's assume the SQL select is changed (a field is added somewhere in
the middle of the select field).
So:
We had intermediate results that are in the old format that are loaded from
state to the new Row object in the retracted stream. is that an accurate
statement? at what operator/format is the state stored in this case? is it
the SQL result/Row? is it the Pojo? as this scenario does not fail for me im
trying to understand how/where it is handled in Flink?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply via email to