Thanks very much!

Regards,
Ghassan


> On Jan 5, 2018, at 7:55 AM, Hequn Cheng <chenghe...@gmail.com> wrote:
> 
> hi Ghassan,
> 
> TableSource in Flink doesn't support primary key now, but you can achieve
> it by doing a group by manually.
> 
> Such as:
> val resultTable = tableEnv.sql(
>      "
>           SELECT ProductID, AVG(Rating) FROM
>                 ( SELECT ReviewID, LAST_VALUE(ProductID),
> LAST_VALUE(Approved) , LAST_VALUE(Rating) FROM Reviews GROUP BY  ReviewID)
>           WHERE Approved = true GROUP BY ProductID
>        "
>    )
> 
> You have to implement the LAST_VALUE AggregateFunction. For implementation,
> you can refer to the MAX AggregateFunction(MAX always return the max value
> while LAST_VALUE always return the latest value). Also, you can find
> documents about Aggregate Functions here: https://ci.apache.org/
> projects/flink/flink-docs-release-1.4/dev/table/udfs.
> html#aggregation-functions
> 
> Best, Hequn
> 
> 
> 2018-01-05 18:31 GMT+08:00 Fabian Hueske <fhue...@gmail.com>:
> 
>> Hi Ghassan,
>> 
>> Flink's Table API / SQL does not support the upsert ingestion mode
>> (updating table rows by key) yet but only append mode, i.e, each event of
>> the data stream is appended to the table.
>> Hence, it is not possible to implement your use case using SQL.
>> 
>> An upsert ingestion mode will be added in future version of Flink.
>> 
>> Best, Fabian
>> 
>> 2017-12-21 5:20 GMT+01:00 Ghassan Yammine <ghassan.yamm...@bazaarvoice.com
>>> :
>> 
>>> Hi Timo,
>>> 
>>> Thanks for your quick reply.
>>> 
>>> I understand your statement about the SQL expression:  It is grouped by
>>> ProductID and performs the average computation over all the records it
>>> sees.  There is no concept of primary key and INSERT/UPDATE.
>>> 
>>> Is there a way to do what I want using SQL without, or with, the
>>> pre-processing of the stream before a SQL expression is executed?
>>> 
>>> One thought I had was to transform the stream - prior to the SQL
>>> expression - by doing something like morphing the value of the Rating to
>> be
>>> relative (instead of absolute) to the prior Rating, for the same review.
>>> However, this would entail some review tracking mechanism.
>>> 
>>> As I mentioned before, I’ve implemented this using the Datastream API -
>>> internally maintaining a map of reviews and updating the average
>>> calculation as each record is processed.  Using SQL is very important to
>>> us, and I’d like to see if I can make it work.
>>> 
>>> Regarding the output sink, I previously looked into that and concluded
>>> that I could not perform any “adjustment” to the average calculation
>>> because of what the current SQL expression emits (i.e. incorrect average
>>> calc.).  Here is what the (retract) stream looks like post-SQL:
>>> 
>>> (true,(product-100,1.0))
>>> (false,(product-100,1.0))
>>> (true,(product-100,2.5))
>>> (false,(product-100,2.5))
>>> (true,(product-100,3.6666666666666665))
>>> (false,(product-100,3.6666666666666665))
>>> (true,(product-100,3.5)).   <———————— This is the correct value (per the
>>> SQL) but not what I want it to be.
>>> 
>>> Even if I change what the SQL query returns/emits so that the TableSink
>>> can perform the average calculation, the latter will have to track prior
>>> reviews in order to update its average calculation.  If I’m correct, then
>>> this is essentially no different than the Datastream API implementation
>>> that I have.
>>> 
>>> Agains, thanks for your quick response and help.
>>> 
>>> Regards,
>>> 
>>> Ghassan
>>> 
>>> 
>>> 
>>> On Dec 20, 2017, at 11:50 AM, Timo Walther <twal...@apache.org<mailto:
>> twa
>>> l...@apache.org>> wrote:
>>> 
>>> Hi Ghassan,
>>> 
>>> in your example the result 3.5 is correct. The query is executed with
>>> standard SQL semantics. You only group by ProductID and since it is the
>>> same for all elements, the average is 3.5.
>>> 
>>> The second "review-3" does not replace anything. In general, the
>>> replacement would happen in the TableSink. The dynamic table performs
>> view
>>> maintenance. The TableSink materializes the result to some key-value
>> store
>>> or database.
>>> 
>>> It might be worth to look into TableSinks [0] and the JavaDocs of the
>>> mentioned classes.
>>> 
>>> Feel free to ask further questions if necessary.
>>> 
>>> Regards,
>>> Timo
>>> 
>>> [0] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/table/
>>> sourceSinks.html#define-a-tablesink
>>> 
>>> 
>>> 
>>> Am 12/19/17 um 9:08 PM schrieb Ghassan Yammine:
>>> Hello,
>>> 
>>> I’m knew to Flink and I need some help.  I’d like to use the SQL API for
>>> processing an incoming stream that has the following characteristics:
>>> 
>>>  *   Each stream record has a key
>>>  *   The record can be updated
>>>  *   The record is of the form: reviewId -> (productId, rating)
>>> 
>>> For the above stream, I want to compute the average rating for each
>>> product ID.  The key is the reviewId
>>> With the SQL API, I get incorrect results.  However, I’ve been able to
>>> make it work through the use of RichFlatMapFunction and the Datastream
>> API.
>>> 
>>> Below is the entire code listing, which does not work.  I know I’m
>> missing
>>> the definition/use of a primary key so that an update on the same key can
>>> occur.
>>> However, I’m not sure how to go about doing this.  Any help/comments are
>>> welcome.
>>> 
>>> Thank you,
>>> 
>>> Ghassan
>>> 
>>> 
>>> package com.bazaarvoice.flink_poc
>>> 
>>> import com.bazaarvoice.flink_poc.flink_poc.{ProductId, ReviewId}
>>> import org.apache.flink.api.common.time.Time
>>> import org.apache.flink.streaming.api.TimeCharacteristic
>>> import org.apache.flink.streaming.api.scala.{DataStream,
>>> createTypeInformation, _}
>>> import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
>>> import org.apache.flink.table.api.scala._
>>> 
>>> package object flink_poc{
>>>  type ProductId = String
>>>  type ReviewId = String
>>> }
>>> 
>>> case class SummaryReview(reviewId: ReviewId, productId: ProductId,
>>> approved: Boolean, rating: Double) extends Serializable {
>>>  override def toString: String = {
>>>    s"$reviewId,  $productId,  ${if (approved) "APPROVED" else
>>> "REJECTED"},  $rating"
>>>  }
>>> }
>>> 
>>> object AverageRatingWithSQL {
>>> 
>>>  def main(args: Array[String]) {
>>> 
>>>    val events = List(
>>>      SummaryReview("review-1", "product-100", approved = true, 1),
>>>      SummaryReview("review-2", "product-100", approved = true, 4),
>>>      SummaryReview("review-3", "product-100", approved = true, 6),
>>>      SummaryReview("review-3", "product-100", approved = true, 3)    //
>>> <-- this should override the previous record
>>>    ).toSeq
>>>    // Average rating should be equal to (1+4+3)/3 = 2.666667
>>> 
>>>    val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>> 
>>>    val inputStream: DataStream[SummaryReview] =
>> env.fromCollection(events)
>>> 
>>>    val tableEnv = TableEnvironment.getTableEnvironment(env)
>>> 
>>>    tableEnv.registerDataStream("Reviews", inputStream, 'ReviewID,
>>> 'ProductID, 'Approved, 'Rating)
>>> 
>>>    val resultTable = tableEnv.sql(
>>>      "SELECT ProductID, AVG(Rating) FROM Reviews WHERE Approved = true
>>> GROUP BY ProductID"
>>>    )
>>> 
>>>    val typeInfo = createTypeInformation[(ProductId, Double)]
>>>    val outStream = resultTable.toRetractStream(typeInfo)
>>> 
>>>    outStream.print()
>>> 
>>>    env.execute("Flink SQL Average rating")
>>> 
>>>  }
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>> 

Reply via email to