Hi Elias, it would be great if you could let us know if the approach works.
Btw. I should point out that the join in a query like: SELECT s.tstamp, s.item, s.score, t.source FROM ( SELECT item, source, MAX(tstamp), LAST_VAL(score, tstamp) FROM Telemetry GROUP BY item, source ) INNER JOIN Scores s ON s.item = t.item WHERE s.score <> t.score is a full-history join and will fully materialize both inputs, the upserted Telemetry table and the append-only Scores table. The query would hold a left join state with one row per item-store combination in Telemetry, and a right join state for each row of Scores. Moreover, each update on the Telemetry table will change the output for all rows of Scores that are affected by the update. You can configure a state retention time. This will clean up state per key (in case of the join above based on the equi-join attribute) if a key did not receive new data within the retention time. The typical use case for full-history joins is to join two upserted or GROUP-BY-aggregated tables, i.e,. tables that are updated but remain more or less constant in size. Best, Fabian 2018-02-21 20:00 GMT+01:00 Elias Levy <fearsome.lucid...@gmail.com>: > On Wed, Feb 21, 2018 at 3:24 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Elias, >> >> Flink does not have built-in support for upsert stream -> table >> conversions, yet. However, the community is working on that (see FLINK-8545 >> [1]). >> With a workaround, you can also solve the issue with what Flink supports >> so far. >> > > Fabian, > > Thanks for the reply. Great to see some progress on this area. If we > could implement this job in Flink rather than Kafka Stream it would mean > one less technology to support and to train our developers on, which is > always a plus. > > > >> The approach with the MAX(tstamp) query was good idea, but the query >> needs another join predicate on time. >> >> tableEnv.sqlQuery(""" >> SELECT a.tstamp, a.item, a.score, a.source >> FROM Telemetry a >> INNER JOIN ( >> SELECT MAX(tstamp) AS maxT, item, source >> FROM Telemetry >> GROUP BY item, source >> ) b ON a.item = b.item AND a.source = b.source AND a.tstamp = maxT >> """) >> >> Otherwise, the table will have multiple records for each combination of >> item and score as you noticed. >> >> HOWEVER, you might not want to use the query above because it will >> accumulate all records from Telemetry in state and never clean them up. >> The reason for this is that the query planner is not smart enough yet to >> infer that old records will never be joined (this is implied by the join >> condition on time). >> > > Thanks for the correction. But, yes, the indefinite accumulation is a > deal breakers for using this approach. > > > A better solution is to use a custom user-defined aggregation function [2] >> (LAST_VAL) that returns the value with associated max timestamp. >> >> SELECT item, source, MAX(tstamp), LAST_VAL(score, tstamp) >> FROM Telemetry >> GROUP BY item, source >> >> LAST_VAL would have an accumulator that stores a score and its associated >> timestamp. >> When a new (score, timestamp) pair is accumulated, the UDAGG compares the >> timestamps and only updates the accumulator if the new timestamp is larger. >> > > I'll give this approach a try. > > > Btw. I'm not sure if KStreams only updates the KTable if the update has a >> higher timestamp or just take the last received record. >> That might be an issue with out-of-order data. I would check the behavior >> if you expect data with out-of-order timestamps. >> > > I believe you are correct. KS will attempt to consume records from across > partitions by attempting to align their timestamps, but it won't reorder > records within a partition, which can be problematic if you can't guarantee > ordered records within a partition. While I talked about KTables, in > reality the job I wrote is a combination of the KS Stream DSL and Operator > API, to get around some of these issues. > > The upsert stream table conversion that we are working on will support >> event time (max timestamp) or processing time (last value) upserts. >> > > Excellent. > > > Best, Fabian >> >> [1] https://issues.apache.org/jira/browse/FLINK-8545 >> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ >> dev/table/udfs.html#aggregation-functions >> >