Hi Timo,

Thanks for your quick reply.

I understand your statement about the SQL expression:  It is grouped by 
ProductID and performs the average computation over all the records it sees.  
There is no concept of primary key and INSERT/UPDATE.

Is there a way to do what I want using SQL without, or with, the pre-processing 
of the stream before a SQL expression is executed?

One thought I had was to transform the stream - prior to the SQL expression - 
by doing something like morphing the value of the Rating to be relative 
(instead of absolute) to the prior Rating, for the same review.  However, this 
would entail some review tracking mechanism.

As I mentioned before, I’ve implemented this using the Datastream API - 
internally maintaining a map of reviews and updating the average calculation as 
each record is processed.  Using SQL is very important to us, and I’d like to 
see if I can make it work.

Regarding the output sink, I previously looked into that and concluded that I 
could not perform any “adjustment” to the average calculation because of what 
the current SQL expression emits (i.e. incorrect average calc.).  Here is what 
the (retract) stream looks like post-SQL:

(true,(product-100,1.0))
(false,(product-100,1.0))
(true,(product-100,2.5))
(false,(product-100,2.5))
(true,(product-100,3.6666666666666665))
(false,(product-100,3.6666666666666665))
(true,(product-100,3.5)).   <———————— This is the correct value (per the SQL) 
but not what I want it to be.

Even if I change what the SQL query returns/emits so that the TableSink can 
perform the average calculation, the latter will have to track prior reviews in 
order to update its average calculation.  If I’m correct, then this is 
essentially no different than the Datastream API implementation that I have.

Agains, thanks for your quick response and help.

Regards,

Ghassan



On Dec 20, 2017, at 11:50 AM, Timo Walther 
<twal...@apache.org<mailto:twal...@apache.org>> wrote:

Hi Ghassan,

in your example the result 3.5 is correct. The query is executed with standard 
SQL semantics. You only group by ProductID and since it is the same for all 
elements, the average is 3.5.

The second "review-3" does not replace anything. In general, the replacement 
would happen in the TableSink. The dynamic table performs view maintenance. The 
TableSink materializes the result to some key-value store or database.

It might be worth to look into TableSinks [0] and the JavaDocs of the mentioned 
classes.

Feel free to ask further questions if necessary.

Regards,
Timo

[0] 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sourceSinks.html#define-a-tablesink



Am 12/19/17 um 9:08 PM schrieb Ghassan Yammine:
Hello,

I’m knew to Flink and I need some help.  I’d like to use the SQL API for 
processing an incoming stream that has the following characteristics:

  *   Each stream record has a key
  *   The record can be updated
  *   The record is of the form: reviewId -> (productId, rating)

For the above stream, I want to compute the average rating for each product ID. 
 The key is the reviewId
With the SQL API, I get incorrect results.  However, I’ve been able to make it 
work through the use of RichFlatMapFunction and the Datastream API.

Below is the entire code listing, which does not work.  I know I’m missing the 
definition/use of a primary key so that an update on the same key can occur.
However, I’m not sure how to go about doing this.  Any help/comments are 
welcome.

Thank you,

Ghassan


package com.bazaarvoice.flink_poc

import com.bazaarvoice.flink_poc.flink_poc.{ProductId, ReviewId}
import org.apache.flink.api.common.time.Time
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, createTypeInformation, 
_}
import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
import org.apache.flink.table.api.scala._

package object flink_poc{
  type ProductId = String
  type ReviewId = String
}

case class SummaryReview(reviewId: ReviewId, productId: ProductId, approved: 
Boolean, rating: Double) extends Serializable {
  override def toString: String = {
    s"$reviewId,  $productId,  ${if (approved) "APPROVED" else "REJECTED"},  
$rating"
  }
}

object AverageRatingWithSQL {

  def main(args: Array[String]) {

    val events = List(
      SummaryReview("review-1", "product-100", approved = true, 1),
      SummaryReview("review-2", "product-100", approved = true, 4),
      SummaryReview("review-3", "product-100", approved = true, 6),
      SummaryReview("review-3", "product-100", approved = true, 3)    // <-- 
this should override the previous record
    ).toSeq
    // Average rating should be equal to (1+4+3)/3 = 2.666667

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

    val inputStream: DataStream[SummaryReview] = env.fromCollection(events)

    val tableEnv = TableEnvironment.getTableEnvironment(env)

    tableEnv.registerDataStream("Reviews", inputStream, 'ReviewID, 'ProductID, 
'Approved, 'Rating)

    val resultTable = tableEnv.sql(
      "SELECT ProductID, AVG(Rating) FROM Reviews WHERE Approved = true GROUP 
BY ProductID"
    )

    val typeInfo = createTypeInformation[(ProductId, Double)]
    val outStream = resultTable.toRetractStream(typeInfo)

    outStream.print()

    env.execute("Flink SQL Average rating")

  }
}






Reply via email to