Hello,
I’m knew to Flink and I need some help. I’d like to use the SQL API for
processing an incoming stream that has the following characteristics:
* Each stream record has a key
* The record can be updated
* The record is of the form: reviewId -> (productId, rating)
For the above stream, I want to compute the average rating for each product ID.
The key is the reviewId
With the SQL API, I get incorrect results. However, I’ve been able to make it
work through the use of RichFlatMapFunction and the Datastream API.
Below is the entire code listing, which does not work. I know I’m missing the
definition/use of a primary key so that an update on the same key can occur.
However, I’m not sure how to go about doing this. Any help/comments are
welcome.
Thank you,
Ghassan
package com.bazaarvoice.flink_poc
import com.bazaarvoice.flink_poc.flink_poc.{ProductId, ReviewId}
import org.apache.flink.api.common.time.Time
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, createTypeInformation,
_}
import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
import org.apache.flink.table.api.scala._
package object flink_poc{
type ProductId = String
type ReviewId = String
}
case class SummaryReview(reviewId: ReviewId, productId: ProductId, approved:
Boolean, rating: Double) extends Serializable {
override def toString: String = {
s"$reviewId, $productId, ${if (approved) "APPROVED" else "REJECTED"},
$rating"
}
}
object AverageRatingWithSQL {
def main(args: Array[String]) {
val events = List(
SummaryReview("review-1", "product-100", approved = true, 1),
SummaryReview("review-2", "product-100", approved = true, 4),
SummaryReview("review-3", "product-100", approved = true, 6),
SummaryReview("review-3", "product-100", approved = true, 3) // <--
this should override the previous record
).toSeq
// Average rating should be equal to (1+4+3)/3 = 2.666667
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
val inputStream: DataStream[SummaryReview] = env.fromCollection(events)
val tableEnv = TableEnvironment.getTableEnvironment(env)
tableEnv.registerDataStream("Reviews", inputStream, 'ReviewID, 'ProductID,
'Approved, 'Rating)
val resultTable = tableEnv.sql(
"SELECT ProductID, AVG(Rating) FROM Reviews WHERE Approved = true GROUP
BY ProductID"
)
val typeInfo = createTypeInformation[(ProductId, Double)]
val outStream = resultTable.toRetractStream(typeInfo)
outStream.print()
env.execute("Flink SQL Average rating")
}
}