hi Ghassan, TableSource in Flink doesn't support primary key now, but you can achieve it by doing a group by manually.
Such as: val resultTable = tableEnv.sql( " SELECT ProductID, AVG(Rating) FROM ( SELECT ReviewID, LAST_VALUE(ProductID), LAST_VALUE(Approved) , LAST_VALUE(Rating) FROM Reviews GROUP BY ReviewID) WHERE Approved = true GROUP BY ProductID " ) You have to implement the LAST_VALUE AggregateFunction. For implementation, you can refer to the MAX AggregateFunction(MAX always return the max value while LAST_VALUE always return the latest value). Also, you can find documents about Aggregate Functions here: https://ci.apache.org/ projects/flink/flink-docs-release-1.4/dev/table/udfs. html#aggregation-functions Best, Hequn 2018-01-05 18:31 GMT+08:00 Fabian Hueske <fhue...@gmail.com>: > Hi Ghassan, > > Flink's Table API / SQL does not support the upsert ingestion mode > (updating table rows by key) yet but only append mode, i.e, each event of > the data stream is appended to the table. > Hence, it is not possible to implement your use case using SQL. > > An upsert ingestion mode will be added in future version of Flink. > > Best, Fabian > > 2017-12-21 5:20 GMT+01:00 Ghassan Yammine <ghassan.yamm...@bazaarvoice.com > >: > > > Hi Timo, > > > > Thanks for your quick reply. > > > > I understand your statement about the SQL expression: It is grouped by > > ProductID and performs the average computation over all the records it > > sees. There is no concept of primary key and INSERT/UPDATE. > > > > Is there a way to do what I want using SQL without, or with, the > > pre-processing of the stream before a SQL expression is executed? > > > > One thought I had was to transform the stream - prior to the SQL > > expression - by doing something like morphing the value of the Rating to > be > > relative (instead of absolute) to the prior Rating, for the same review. > > However, this would entail some review tracking mechanism. > > > > As I mentioned before, I’ve implemented this using the Datastream API - > > internally maintaining a map of reviews and updating the average > > calculation as each record is processed. Using SQL is very important to > > us, and I’d like to see if I can make it work. > > > > Regarding the output sink, I previously looked into that and concluded > > that I could not perform any “adjustment” to the average calculation > > because of what the current SQL expression emits (i.e. incorrect average > > calc.). Here is what the (retract) stream looks like post-SQL: > > > > (true,(product-100,1.0)) > > (false,(product-100,1.0)) > > (true,(product-100,2.5)) > > (false,(product-100,2.5)) > > (true,(product-100,3.6666666666666665)) > > (false,(product-100,3.6666666666666665)) > > (true,(product-100,3.5)). <———————— This is the correct value (per the > > SQL) but not what I want it to be. > > > > Even if I change what the SQL query returns/emits so that the TableSink > > can perform the average calculation, the latter will have to track prior > > reviews in order to update its average calculation. If I’m correct, then > > this is essentially no different than the Datastream API implementation > > that I have. > > > > Agains, thanks for your quick response and help. > > > > Regards, > > > > Ghassan > > > > > > > > On Dec 20, 2017, at 11:50 AM, Timo Walther <twal...@apache.org<mailto: > twa > > l...@apache.org>> wrote: > > > > Hi Ghassan, > > > > in your example the result 3.5 is correct. The query is executed with > > standard SQL semantics. You only group by ProductID and since it is the > > same for all elements, the average is 3.5. > > > > The second "review-3" does not replace anything. In general, the > > replacement would happen in the TableSink. The dynamic table performs > view > > maintenance. The TableSink materializes the result to some key-value > store > > or database. > > > > It might be worth to look into TableSinks [0] and the JavaDocs of the > > mentioned classes. > > > > Feel free to ask further questions if necessary. > > > > Regards, > > Timo > > > > [0] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ > dev/table/ > > sourceSinks.html#define-a-tablesink > > > > > > > > Am 12/19/17 um 9:08 PM schrieb Ghassan Yammine: > > Hello, > > > > I’m knew to Flink and I need some help. I’d like to use the SQL API for > > processing an incoming stream that has the following characteristics: > > > > * Each stream record has a key > > * The record can be updated > > * The record is of the form: reviewId -> (productId, rating) > > > > For the above stream, I want to compute the average rating for each > > product ID. The key is the reviewId > > With the SQL API, I get incorrect results. However, I’ve been able to > > make it work through the use of RichFlatMapFunction and the Datastream > API. > > > > Below is the entire code listing, which does not work. I know I’m > missing > > the definition/use of a primary key so that an update on the same key can > > occur. > > However, I’m not sure how to go about doing this. Any help/comments are > > welcome. > > > > Thank you, > > > > Ghassan > > > > > > package com.bazaarvoice.flink_poc > > > > import com.bazaarvoice.flink_poc.flink_poc.{ProductId, ReviewId} > > import org.apache.flink.api.common.time.Time > > import org.apache.flink.streaming.api.TimeCharacteristic > > import org.apache.flink.streaming.api.scala.{DataStream, > > createTypeInformation, _} > > import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment} > > import org.apache.flink.table.api.scala._ > > > > package object flink_poc{ > > type ProductId = String > > type ReviewId = String > > } > > > > case class SummaryReview(reviewId: ReviewId, productId: ProductId, > > approved: Boolean, rating: Double) extends Serializable { > > override def toString: String = { > > s"$reviewId, $productId, ${if (approved) "APPROVED" else > > "REJECTED"}, $rating" > > } > > } > > > > object AverageRatingWithSQL { > > > > def main(args: Array[String]) { > > > > val events = List( > > SummaryReview("review-1", "product-100", approved = true, 1), > > SummaryReview("review-2", "product-100", approved = true, 4), > > SummaryReview("review-3", "product-100", approved = true, 6), > > SummaryReview("review-3", "product-100", approved = true, 3) // > > <-- this should override the previous record > > ).toSeq > > // Average rating should be equal to (1+4+3)/3 = 2.666667 > > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) > > > > val inputStream: DataStream[SummaryReview] = > env.fromCollection(events) > > > > val tableEnv = TableEnvironment.getTableEnvironment(env) > > > > tableEnv.registerDataStream("Reviews", inputStream, 'ReviewID, > > 'ProductID, 'Approved, 'Rating) > > > > val resultTable = tableEnv.sql( > > "SELECT ProductID, AVG(Rating) FROM Reviews WHERE Approved = true > > GROUP BY ProductID" > > ) > > > > val typeInfo = createTypeInformation[(ProductId, Double)] > > val outStream = resultTable.toRetractStream(typeInfo) > > > > outStream.print() > > > > env.execute("Flink SQL Average rating") > > > > } > > } > > > > > > > > > > > > > > >