I think this requirement can be satisfied by temporal table function [1], am I missing anything?
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#temporal-table-function Best, Kurt On Sat, Mar 28, 2020 at 2:47 PM Maatary Okouya <maatarioko...@gmail.com> wrote: > Hi all, > > Just wondering what is the status at this point? > > On Thu, Sep 19, 2019 at 4:38 PM Hequn Cheng <chenghe...@gmail.com> wrote: > >> Hi, >> >> Fabian is totally right. Big thanks to the detailed answers and nice >> examples above. >> >> As for the PR, very sorry about the delay. It is mainly because of the >> merge of blink and my work switching to Flink Python recently. >> However, I think the later version of blink would cover this feature >> natively with further merges. >> >> Before that, I think we can use the solution Fabian provided above. >> >> There are some examples here[1][2] which may be helpful to you >> @Casado @Maatary. >> In [1], the test case quite matches your scenario(perform join after >> groupby+last_value). It also provides the udaf what you want and shows how >> to register it. >> In [2], the test shows how to use the built-in last_value in SQL. Note >> that the built-in last_value UDAF is only supported in blink-planner from >> flink-1.9.0. If you are using the flink-planner(or version before that), >> you can register the last_value UDAF with the TableEnvironment like it is >> showed in [1]. >> >> Feel free to ask if there are other problems. >> >> Best, Hequn >> [1] >> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.scala#L207 >> [2] >> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala#L228 >> >> On Thu, Sep 19, 2019 at 9:40 PM Casado Tejedor, Rubén < >> ruben.casado.teje...@accenture.com> wrote: >> >>> Thanks Fabian. @Hequn Cheng <chenghe...@gmail.com> Could you share the >>> status? Thanks for your amazing work! >>> >>> >>> >>> *De: *Fabian Hueske <fhue...@gmail.com> >>> *Fecha: *viernes, 16 de agosto de 2019, 9:30 >>> *Para: *"Casado Tejedor, Rubén" <ruben.casado.teje...@accenture.com> >>> *CC: *Maatary Okouya <maatarioko...@gmail.com>, miki haiat < >>> miko5...@gmail.com>, user <user@flink.apache.org>, Hequn Cheng < >>> chenghe...@gmail.com> >>> *Asunto: *Re: [External] Re: From Kafka Stream to Flink >>> >>> >>> >>> Hi Ruben, >>> >>> >>> >>> Work on this feature has already started [1], but stalled a bit >>> (probably due to the effort of merging the new Blink query processor). >>> >>> Hequn (in CC) is the guy working on upsert table ingestion, maybe he can >>> share what the status of this feature is. >>> >>> >>> >>> Best, Fabian >>> >>> >>> >>> [1] https://github.com/apache/flink/pull/6787 >>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_pull_6787&d=DwMFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc&m=rosq06snxAUogg77Y2SAZnaQef16zpmhGPcoGNUd4vg&s=0Mc6IZBBxqaJ6S_possk4V4ZTpdNphlZ3NoNPeL6NGA&e=> >>> >>> >>> >>> Am Di., 13. Aug. 2019 um 11:05 Uhr schrieb Casado Tejedor, Rubén < >>> ruben.casado.teje...@accenture.com>: >>> >>> Hi >>> >>> >>> >>> Do you have an expected version of Flink to include the capability to >>> ingest an upsert stream as a dynamic table? We have such need in our >>> current project. What we have done is to emulate such behavior working at >>> low level with states (e.g. update existing value if key exists, create a >>> new value if key does not exist). But we cannot use SQL that would help as >>> to do it faster. >>> >>> >>> >>> Our use case is many small flink jobs that have to something like: >>> >>> >>> >>> SELECT *some fields* >>> >>> FROM *t1* INNER JOIN *t1 on t1.id >>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__t1.id&d=DwMFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc&m=rosq06snxAUogg77Y2SAZnaQef16zpmhGPcoGNUd4vg&s=5ReK8KBJ2AMxI8faigLTfxwAxvlvXbtPG48TzkLZbXc&e=> >>> = t2.id >>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__t2.id&d=DwMFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc&m=rosq06snxAUogg77Y2SAZnaQef16zpmhGPcoGNUd4vg&s=BnXyZjU0mHMrZ-gu7wRz5GUirxitCuQcCFjd8nbVNyw&e=> >>> (maybe join +3 tables)* >>> >>> WHERE *some conditions on fields*; >>> >>> >>> >>> We need the result of that queries taking into account only the last >>> values of each row. The result is inserted/updated in a in-memory K-V >>> database for fast access. >>> >>> >>> >>> Thanks in advance! >>> >>> >>> >>> Best >>> >>> >>> >>> *De: *Fabian Hueske <fhue...@gmail.com> >>> *Fecha: *miércoles, 7 de agosto de 2019, 11:08 >>> *Para: *Maatary Okouya <maatarioko...@gmail.com> >>> *CC: *miki haiat <miko5...@gmail.com>, user <user@flink.apache.org> >>> *Asunto: *[External] Re: From Kafka Stream to Flink >>> >>> >>> >>> This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with >>> links and attachments. >>> ------------------------------ >>> >>> >>> >>> Hi, >>> >>> >>> >>> LAST_VAL is not a built-in function, so you'd need to implement it as a >>> user-defined aggregate function (UDAGG) and register it. >>> >>> >>> >>> The problem with joining an append only table with an updating table is >>> the following. >>> >>> >>> >>> Consider two tables: users (uid, name, zip) and orders (oid, uid, >>> product), with user being an updating table and orders being append only. >>> >>> >>> >>> On January 1st, the tables look like this: >>> >>> >>> >>> Users: >>> >>> uid_1, Fred, 12345 >>> >>> uid_2, Mary, 67890 >>> >>> >>> >>> Orders >>> >>> oid_1, uid_1, Popcorn >>> >>> oid_2, uid_2, Carrots >>> >>> >>> >>> Joining both tables with the following query SELECT oid, product, name, >>> zip FROM users u, orders o WHERE u.uid = o.uid results in: >>> >>> >>> >>> oid_1, Popcorn, Fred, 12345 >>> >>> oid_2, Carrots, Mary, 67890 >>> >>> >>> >>> Whenever, a new order is appended, we look up the corresponding user >>> data, perform the join and emit the results. >>> >>> Let's say on July 1st we have received 100 orders from our two users all >>> is fine. However, on July 2nd Fred updates his zip code because he moved to >>> another city. >>> >>> Our data now looks like this: >>> >>> >>> >>> Users: >>> >>> uid_1, Fred, 24680 >>> >>> uid_2, Mary, 67890 >>> >>> >>> >>> Orders >>> >>> oid_1, uid_1, Popcorn >>> >>> oid_2, uid_2, Carrots >>> >>> .... >>> >>> oid_100, uid_2, Potatoes >>> >>> >>> >>> The result of the same query as before is: >>> >>> >>> >>> oid_1, Popcorn, Fred, 24680 >>> >>> oid_2, Carrots, Mary, 67890 >>> >>> .... >>> >>> oid_100, Potatoes, Mary, 67890 >>> >>> >>> >>> Notice how the first row changed? >>> >>> If we strictly follow SQL semantics (which we do in Flink SQL) the query >>> needs to update the ZIP code of the first result row. >>> >>> In order to do so, we need access to the original data of the orders >>> table, which is the append only table in our scenario. >>> >>> Consequently, we need to fully materialize append only tables when they >>> are joined with an updating table without temporal constraints. >>> >>> >>> >>> In many situations, the indented semantics for such a query would be to >>> join the order with the ZIP code of the user *that was valid at the time >>> when the order was placed*. >>> >>> However, this is *not* semantics of the query of our example. For such a >>> query, we need to model the data differently. The users table needs to >>> store all modifications, i.e., the full history of all updates. >>> >>> Each update needs a timestamp and each order needs a timestamp as well. >>> With these timestamps, we can write a query that joins an order with the >>> user data that we valid at the time when the order was placed. >>> >>> This is the temporal constraint that I mentioned before. With this >>> constraint, Flink can use the information about progressing time to reason >>> about how much state it needs to keep because a change of the user table >>> will only affect future orders. >>> >>> >>> >>> Flink makes this a lot easier with the concept of temporal tables [1] >>> and temporal table joins [2]. >>> >>> >>> >>> Best, >>> >>> Fabian >>> >>> >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/temporal_tables.html >>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.8_dev_table_streaming_temporal-5Ftables.html&d=DwMFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc&m=qsYWKlnwGvoMp8BTKycFnjrxmmg3GLXf_AbKoXOAb_A&s=dkqRgaz4ropnGdtxuW8gYVx8hJNuPKdgS3K7hge-qvY&e=> >>> >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/joins.html#join-with-a-temporal-table >>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.8_dev_table_streaming_joins.html-23join-2Dwith-2Da-2Dtemporal-2Dtable&d=DwMFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=brkRAgrW3LbdVDOiRLzI7SFUIWBL5aa2MIfENljA8xoe0lFg2u3-S6GnFTH7Pbmc&m=qsYWKlnwGvoMp8BTKycFnjrxmmg3GLXf_AbKoXOAb_A&s=YVSwoF7IzyDGMzL-0n1o5BolAcRxd4qN0u0ITq1cwXE&e=> >>> >>> >>> >>> >>> >>> Am Di., 6. Aug. 2019 um 21:09 Uhr schrieb Maatary Okouya < >>> maatarioko...@gmail.com>: >>> >>> Fabian, >>> >>> >>> >>> ultimately, i just want to perform a join on the last values for each >>> keys. >>> >>> >>> >>> On Tue, Aug 6, 2019 at 8:07 PM Maatary Okouya <maatarioko...@gmail.com> >>> wrote: >>> >>> Fabian, >>> >>> >>> >>> could you please clarify the following statement: >>> >>> >>> >>> However joining an append-only table with this view without adding >>> temporal join condition, means that the stream is fully materialized as >>> state. >>> >>> This is because previously emitted results must be updated when the view >>> changes. >>> >>> It really depends on the semantics of the join and query that you need, >>> how much state the query will need to maintain. >>> >>> >>> >>> >>> >>> I am not sure to understand the problem. If i have to append-only table >>> and perform some join on it, what's the issue ? >>> >>> >>> >>> >>> >>> On Tue, Aug 6, 2019 at 8:03 PM Maatary Okouya <maatarioko...@gmail.com> >>> wrote: >>> >>> Thank you for the clarification. Really appreciated. >>> >>> >>> >>> Is Last_val part of the API ? >>> >>> >>> >>> On Fri, Aug 2, 2019 at 10:49 AM Fabian Hueske <fhue...@gmail.com> wrote: >>> >>> Hi, >>> >>> >>> >>> Flink does not distinguish between streams and tables. For the Table API >>> / SQL, there are only tables that are changing over time, i.e., dynamic >>> tables. >>> >>> A Stream in the Kafka Streams or KSQL sense, is in Flink a Table with >>> append-only changes, i.e., records are only inserted and never deleted or >>> modified. >>> >>> A Table in the Kafka Streams or KSQL sense, is in Flink a Table that has >>> upsert and delete changes, i.e., the table has a unique key and records are >>> inserted, deleted, or updated per key. >>> >>> >>> >>> In the current version, Flink does not have native support to ingest an >>> upsert stream as a dynamic table (right now only append-only tables can be >>> ingested, native support for upsert tables will be added soon.). >>> >>> However, you can create a view with the following SQL query on an >>> append-only table that creates an upsert table: >>> >>> >>> >>> SELECT key, LAST_VAL(v1), LAST_VAL(v2), ... >>> >>> FROM appendOnlyTable >>> >>> GROUP BY key >>> >>> >>> >>> Given, this view, you can run all kinds of SQL queries on it. >>> >>> However joining an append-only table with this view without adding >>> temporal join condition, means that the stream is fully materialized as >>> state. >>> >>> This is because previously emitted results must be updated when the view >>> changes. >>> >>> It really depends on the semantics of the join and query that you need, >>> how much state the query will need to maintain. >>> >>> >>> >>> An alternative to using Table API / SQL and it's dynamic table >>> abstraction is to use Flink's DataStream API and ProcessFunctions. >>> >>> These APIs are more low level and expose access to state and timers, >>> which are the core ingredients for stream processing. >>> >>> You can implement pretty much all logic of KStreams and more in these >>> APIs. >>> >>> >>> >>> Best, Fabian >>> >>> >>> >>> >>> >>> Am Di., 23. Juli 2019 um 13:06 Uhr schrieb Maatary Okouya < >>> maatarioko...@gmail.com>: >>> >>> I would like to have a KTable, or maybe in Flink term a dynamic Table, >>> that only contains the latest value for each keyed record. This would allow >>> me to perform aggregation and join, based on the latest state of every >>> record, as opposed to every record over time, or a period of time. >>> >>> >>> >>> On Sun, Jul 21, 2019 at 8:21 AM miki haiat <miko5...@gmail.com> wrote: >>> >>> Can you elaborate more about your use case . >>> >>> >>> >>> On Sat, Jul 20, 2019 at 1:04 AM Maatary Okouya <maatarioko...@gmail.com> >>> wrote: >>> >>> Hi, >>> >>> >>> >>> I am a user of Kafka Stream so far. However, because i have been face >>> with several limitation in particular in performing Join on KTable. >>> >>> >>> >>> I was wondering what is the appraoch in Flink to achieve (1) the >>> concept of KTable, i.e. a Table that represent a changeLog, i.e. only the >>> latest version of all keyed records, and (2) joining those. >>> >>> >>> >>> There are currently a lot of limitation around that on Kafka Stream, and >>> i need that for performing some ETL process, where i need to mirror entire >>> databases in Kafka, and then do some join on the table to emit the logical >>> entity in Kafka Topics. I was hoping that somehow i could acheive that by >>> using FLink as intermediary. >>> >>> >>> >>> I can see that you support any kind of join, but i just don't see the >>> notion of Ktable. >>> >>> >>> >>> >>> >>> >>> ------------------------------ >>> >>> >>> This message is for the designated recipient only and may contain >>> privileged, proprietary, or otherwise confidential information. If you have >>> received it in error, please notify the sender immediately and delete the >>> original. Any other use of the e-mail by you is prohibited. Where allowed >>> by local law, electronic communications with Accenture and its affiliates, >>> including e-mail and instant messaging (including content), may be scanned >>> by our systems for the purposes of information security and assessment of >>> internal compliance with Accenture policy. Your privacy is important to us. >>> Accenture uses your personal data only in compliance with data protection >>> laws. For further information on how Accenture processes your personal >>> data, please see our privacy statement at >>> https://www.accenture.com/us-en/privacy-policy. >>> >>> ______________________________________________________________________________________ >>> >>> www.accenture.com >>> >>>