hi Ghassan,

TableSource in Flink doesn't support primary key now, but you can achieve
it by doing a group by manually.

Such as:
val resultTable = tableEnv.sql(
      "
           SELECT ProductID, AVG(Rating) FROM
                 ( SELECT ReviewID, LAST_VALUE(ProductID),
LAST_VALUE(Approved) , LAST_VALUE(Rating) FROM Reviews GROUP BY  ReviewID)
           WHERE Approved = true GROUP BY ProductID
        "
    )

You have to implement the LAST_VALUE AggregateFunction. For implementation,
you can refer to the MAX AggregateFunction(MAX always return the max value
while LAST_VALUE always return the latest value). Also, you can find
documents about Aggregate Functions here: https://ci.apache.org/
projects/flink/flink-docs-release-1.4/dev/table/udfs.
html#aggregation-functions

Best, Hequn


2018-01-05 18:31 GMT+08:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Ghassan,
>
> Flink's Table API / SQL does not support the upsert ingestion mode
> (updating table rows by key) yet but only append mode, i.e, each event of
> the data stream is appended to the table.
> Hence, it is not possible to implement your use case using SQL.
>
> An upsert ingestion mode will be added in future version of Flink.
>
> Best, Fabian
>
> 2017-12-21 5:20 GMT+01:00 Ghassan Yammine <ghassan.yamm...@bazaarvoice.com
> >:
>
> > Hi Timo,
> >
> > Thanks for your quick reply.
> >
> > I understand your statement about the SQL expression:  It is grouped by
> > ProductID and performs the average computation over all the records it
> > sees.  There is no concept of primary key and INSERT/UPDATE.
> >
> > Is there a way to do what I want using SQL without, or with, the
> > pre-processing of the stream before a SQL expression is executed?
> >
> > One thought I had was to transform the stream - prior to the SQL
> > expression - by doing something like morphing the value of the Rating to
> be
> > relative (instead of absolute) to the prior Rating, for the same review.
> > However, this would entail some review tracking mechanism.
> >
> > As I mentioned before, I’ve implemented this using the Datastream API -
> > internally maintaining a map of reviews and updating the average
> > calculation as each record is processed.  Using SQL is very important to
> > us, and I’d like to see if I can make it work.
> >
> > Regarding the output sink, I previously looked into that and concluded
> > that I could not perform any “adjustment” to the average calculation
> > because of what the current SQL expression emits (i.e. incorrect average
> > calc.).  Here is what the (retract) stream looks like post-SQL:
> >
> > (true,(product-100,1.0))
> > (false,(product-100,1.0))
> > (true,(product-100,2.5))
> > (false,(product-100,2.5))
> > (true,(product-100,3.6666666666666665))
> > (false,(product-100,3.6666666666666665))
> > (true,(product-100,3.5)).   <———————— This is the correct value (per the
> > SQL) but not what I want it to be.
> >
> > Even if I change what the SQL query returns/emits so that the TableSink
> > can perform the average calculation, the latter will have to track prior
> > reviews in order to update its average calculation.  If I’m correct, then
> > this is essentially no different than the Datastream API implementation
> > that I have.
> >
> > Agains, thanks for your quick response and help.
> >
> > Regards,
> >
> > Ghassan
> >
> >
> >
> > On Dec 20, 2017, at 11:50 AM, Timo Walther <twal...@apache.org<mailto:
> twa
> > l...@apache.org>> wrote:
> >
> > Hi Ghassan,
> >
> > in your example the result 3.5 is correct. The query is executed with
> > standard SQL semantics. You only group by ProductID and since it is the
> > same for all elements, the average is 3.5.
> >
> > The second "review-3" does not replace anything. In general, the
> > replacement would happen in the TableSink. The dynamic table performs
> view
> > maintenance. The TableSink materializes the result to some key-value
> store
> > or database.
> >
> > It might be worth to look into TableSinks [0] and the JavaDocs of the
> > mentioned classes.
> >
> > Feel free to ask further questions if necessary.
> >
> > Regards,
> > Timo
> >
> > [0] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
> dev/table/
> > sourceSinks.html#define-a-tablesink
> >
> >
> >
> > Am 12/19/17 um 9:08 PM schrieb Ghassan Yammine:
> > Hello,
> >
> > I’m knew to Flink and I need some help.  I’d like to use the SQL API for
> > processing an incoming stream that has the following characteristics:
> >
> >   *   Each stream record has a key
> >   *   The record can be updated
> >   *   The record is of the form: reviewId -> (productId, rating)
> >
> > For the above stream, I want to compute the average rating for each
> > product ID.  The key is the reviewId
> > With the SQL API, I get incorrect results.  However, I’ve been able to
> > make it work through the use of RichFlatMapFunction and the Datastream
> API.
> >
> > Below is the entire code listing, which does not work.  I know I’m
> missing
> > the definition/use of a primary key so that an update on the same key can
> > occur.
> > However, I’m not sure how to go about doing this.  Any help/comments are
> > welcome.
> >
> > Thank you,
> >
> > Ghassan
> >
> >
> > package com.bazaarvoice.flink_poc
> >
> > import com.bazaarvoice.flink_poc.flink_poc.{ProductId, ReviewId}
> > import org.apache.flink.api.common.time.Time
> > import org.apache.flink.streaming.api.TimeCharacteristic
> > import org.apache.flink.streaming.api.scala.{DataStream,
> > createTypeInformation, _}
> > import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
> > import org.apache.flink.table.api.scala._
> >
> > package object flink_poc{
> >   type ProductId = String
> >   type ReviewId = String
> > }
> >
> > case class SummaryReview(reviewId: ReviewId, productId: ProductId,
> > approved: Boolean, rating: Double) extends Serializable {
> >   override def toString: String = {
> >     s"$reviewId,  $productId,  ${if (approved) "APPROVED" else
> > "REJECTED"},  $rating"
> >   }
> > }
> >
> > object AverageRatingWithSQL {
> >
> >   def main(args: Array[String]) {
> >
> >     val events = List(
> >       SummaryReview("review-1", "product-100", approved = true, 1),
> >       SummaryReview("review-2", "product-100", approved = true, 4),
> >       SummaryReview("review-3", "product-100", approved = true, 6),
> >       SummaryReview("review-3", "product-100", approved = true, 3)    //
> > <-- this should override the previous record
> >     ).toSeq
> >     // Average rating should be equal to (1+4+3)/3 = 2.666667
> >
> >     val env = StreamExecutionEnvironment.getExecutionEnvironment
> >     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
> >
> >     val inputStream: DataStream[SummaryReview] =
> env.fromCollection(events)
> >
> >     val tableEnv = TableEnvironment.getTableEnvironment(env)
> >
> >     tableEnv.registerDataStream("Reviews", inputStream, 'ReviewID,
> > 'ProductID, 'Approved, 'Rating)
> >
> >     val resultTable = tableEnv.sql(
> >       "SELECT ProductID, AVG(Rating) FROM Reviews WHERE Approved = true
> > GROUP BY ProductID"
> >     )
> >
> >     val typeInfo = createTypeInformation[(ProductId, Double)]
> >     val outStream = resultTable.toRetractStream(typeInfo)
> >
> >     outStream.print()
> >
> >     env.execute("Flink SQL Average rating")
> >
> >   }
> > }
> >
> >
> >
> >
> >
> >
> >
>

Reply via email to