Hi Shahar,

Thanks!

The approach of the UDAGG would be very manual. You could not reuse the
built-in functions.
There are several ways to achieve this. One approach could be to have a
map-based UDAGG for each type of aggregation that you'd like to support
(SUM, COUNT, ...)
Let's say we have a sumMap function, it could have a MAP(String, Double) as
input parameter and produce a MAP(String, Double) as result. Internally,
the function would create and maintain a sum aggregate for each unique
String key of the map.
The same could be done for countMap, minMap, etc.
Since the accumulator of the UDAGGs would be a map, it should be state
compatible and support a growing number of aggregates. I would not be
easily possible (without injecting marker records) to delete aggregates.

I don't think this would be very efficient, but should work.

Best, Fabian

Am Di., 9. Apr. 2019 um 01:35 Uhr schrieb Shahar Cizer Kobrinsky <
shahar.kobrin...@gmail.com>:

> That makes sense Fabian!
> So I want to make sure I fully understand how this should look.
> Would the same expression look like:
>
> custom_groupby(my_group_fields, map[ 'a', sum(a)...])
> ?
> Will I be able to use the builtin aggregation function internally such as
> sum/avg etc? or would I need to reimplement all such functions?
> In terms of schema evolution, if these are implemented as a map state,
> will I be OK as new items are added to that map?
>
> Thanks again, and congrats on an awesome conference, I had learned a lot
> Shahar
>
> From: Fabian Hueske
> Sent: Monday, April 8, 02:54
> Subject: Re: Schema Evolution on Dynamic Schema
> To: Shahar Cizer Kobrinsky
> Cc: Rong Rong, user
>
>
> Hi Shahar,
>
> Sorry for the late response.
>
> The problem is not with the type of the retract stream, but with the GROUP
> BY aggregation operator.
> The optimizer is converting the plan into an aggregation operator that
> computes all aggregates followed by a projection that inserts the
> aggregation results into a MAP type.
> The problem is the state of the aggregation operator. By adding a new
> field to the map, the state of the operator changes and you cannot restore
> it.
> The only workaround that I can think of would be to implement a
> user-defined aggregation function [1] that performs all aggregations
> internally and manually maintain state compatibility for the accumulator of
> the UDAGG.
>
> Best,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#aggregation-functions
>
> Am Do., 28. März 2019 um 22:28 Uhr schrieb Shahar Cizer Kobrinsky <
> shahar.kobrin...@gmail.com>:
>
> Hmm kinda stuck here. Seems like SQL Group by is translated to a
> *GroupAggProcessFunction* which stores a state for every aggregation
> element (thus flattening the map items for state store). Seems like there's
> no way around it. Am i wrong? is there any way to evolve the map elements
> when doing *SELECT map['a', sum(a), 'b', sum(b).. ] FROM.. group by ..  *?
>
> On Wed, Mar 20, 2019 at 2:00 AM Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi,
>
> I think this would work.
> However, you should be aware that all keys are kept forever in state
> (unless you configure idle state retention time [1]).
> This includes previous versions of keys.
>
> Also note that we are not guaranteeing savepoint compatibility across
> Flink versions yet.
> If the state of the aggregation operator changes in a later version (say
> Flink 1.9.x), it might not be possible to migrate to a later Flink version.
> Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided.
>
> Best,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/query_configuration.html#idle-state-retention-time
>
> Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky <
> shahar.kobrin...@gmail.com>:
>
> My bad. it actually did work with
> Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map
> group by a
>
> do you think thats OK as a workaround? main schema should be changed that
> way - only keys in the map
>
> On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <
> shahar.kobrin...@gmail.com> wrote:
>
> Thanks Fabian,
>
> Im thinking about how to work around that issue and one thing that came to
> my mind is to create a map that holds keys & values that can be edited
> without changing the schema, though im thinking how to implement it in
> Calcite.
> Considering the following original SQL in which "metrics" can be
> added/deleted/renamed
> Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
> Group by a
>
> im looking both at json_objectagg & map to change it but it seems that
> json_objectagg is on a later calcite version and map doesnt work for me.
> Trying something like
> Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map
> group by a
>
> results with "Non-query expression encountered in illegal context"
> is my train of thought the right one? if so, do i have a mistake in the
> way im trying to implement it?
>
> Thanks!
>
>
>
>
>
>
>
> On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi,
>
> Restarting a changed query from a savepoint is currently not supported.
> In general this is a very difficult problem as new queries might result in
> completely different execution plans.
> The special case of adding and removing aggregates is easier to solve, but
> the schema of the stored state changes and we would need to analyze the
> previous and current query and generate compatible serializers.
> So far we did not explore this rabbit hole.
>
> Also, starting a different query from a savepoint can also lead to weird
> result semantics.
> I'd recommend to bootstrap the state of the new query from scatch.
>
> Best, Fabian
>
>
>
> Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <
> shahar.kobrin...@gmail.com>:
>
> Or is it the SQL state that is incompatible.. ?
>
> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <
> shahar.kobrin...@gmail.com> wrote:
>
> Thanks Guys,
>
> I actually got an error now adding some fields into the select statement:
>
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
> at
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: For heap
> backends, the new state serializer must not be incompatible.
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
> at
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
> at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
> at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 9 more
>
> Does that mean i should move from having a Pojo storing the result of the
> SQL retracted stream to Avro? trying to understand how to mitigate it.
>
> Thanks
>
> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <walter...@gmail.com> wrote:
>
> Hi Shahar,
>
> From my understanding, if you use "groupby" withAggregateFunctions, they
> save the accumulators to SQL internal states: which are invariant from your
> input schema. Based on what you described I think that's why it is fine for
> recovering from existing state.
> I think one confusion you might have is the "toRetractStream" syntax. This
> actually passes the "retracting" flag to the Flink planner to indicate how
> the DataStream operator gets generated based on your SQL.
>
> So in my understanding, there's really no "state" associated with the
> "retracting stream", but rather associated with the generated operators.
> However, I am not expert in Table/SQL state recovery: I recall there were
> an open JIRA[1] that might be related to your question regarding SQL/Table
> generated operator recovery. Maybe @Fabian can provide more insight here?
>
> Regarding the rest of the pipeline, both "filter" and "map" operators are
> stateless; and sink state recovery depends on what you do.
>
> --
> Rong
>
> [1] https://issues.apache.org/jira/browse/FLINK-6966
>
> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <shahar.kobrin...@gmail.com> wrote:
>
> Thanks Rong,
>
> I have made some quick test changing the SQL select (adding a select field
> in the middle) and reran the job from a savepoint and it worked without any
> errors. I want to make sure i understand how at what point the state is
> stored and how does it work.
>
> Let's simplify the scenario and forget my specific case of dynamically
> generated pojo. let's focus on generic steps of:
> Source->register table->SQL select and group by session->retracted stream
> (Row)->transformToPojo (Custom Map function) ->pushToSink
>
> And let's assume the SQL select is changed (a field is added somewhere in
> the middle of the select field).
> So:
> We had intermediate results that are in the old format that are loaded from
> state to the new Row object in the retracted stream. is that an accurate
> statement? at what operator/format is the state stored in this case? is it
> the SQL result/Row? is it the Pojo? as this scenario does not fail for me
> im
> trying to understand how/where it is handled in Flink?
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>
>

Reply via email to