Hi Yik,

if I understand you correctly you would like to avoid the deletions in your stream?

You could filter the deletions manually in DataStream API before writing them to Kafka. Semantically the deletions are required to produce a correct result because the runtime is not aware of a key for idempotent updates.

To simplify the query you could also investigate to implement your own aggregate function and combine the Top 2 and ListAgg into one operation.

Regards,
Timo

On 28.02.21 09:55, Yik San Chan wrote:
I define a `Transaction` class:

```scala
case class Transaction(accountId: Long, amount: Long, timestamp: Long)
```

The `TransactionSource` simply emits `Transaction` with some time interval. Now I want to compute the last 2 transaction timestamp of each account id, see code below:

```scala
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.walkthrough.common.entity.Transaction
import org.apache.flink.walkthrough.common.source.TransactionSource

object LastNJob {

   final val QUERY =
     """
       |WITH last_n AS (
       |    SELECT accountId, `timestamp`
       |    FROM (
       |        SELECT *,
      |            ROW_NUMBER() OVER (PARTITION BY accountId ORDER BY `timestamp` DESC) AS row_num
       |        FROM transactions
       |    )
       |    WHERE row_num <= 2
       |)
      |SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING)) last2_timestamp
       |FROM last_n
       |GROUP BY accountId
       |""".stripMargin

   def main(args: Array[String]): Unit = {
    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()     val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment     val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(streamEnv, settings)

     val txnStream: DataStream[Transaction] = streamEnv
       .addSource(new TransactionSource)
       .name("transactions")

     tableEnv.createTemporaryView("transactions", txnStream)

     tableEnv.executeSql(QUERY).print()
   }
}
```

When I run the program, I get:

```
+----+----------------------+--------------------------------+
| op |            accountId |                last2_timestamp |
+----+----------------------+--------------------------------+
| +I |                    1 |                  1546272000000 |
| +I |                    2 |                  1546272360000 |
| +I |                    3 |                  1546272720000 |
| +I |                    4 |                  1546273080000 |
| +I |                    5 |                  1546273440000 |
| -U |                    1 |                  1546272000000 |
| +U |                    1 |    1546272000000,1546273800000 |
| -U |                    2 |                  1546272360000 |
| +U |                    2 |    1546272360000,1546274160000 |
| -U |                    3 |                  1546272720000 |
| +U |                    3 |    1546272720000,1546274520000 |
| -U |                    4 |                  1546273080000 |
| +U |                    4 |    1546273080000,1546274880000 |
| -U |                    5 |                  1546273440000 |
| +U |                    5 |    1546273440000,1546275240000 |
| -U |                    1 |    1546272000000,1546273800000 |
| +U |                    1 |                  1546273800000 |
| -U |                    1 |                  1546273800000 |
| +U |                    1 |    1546273800000,1546275600000 |
(to continue)
```

Let's focus on the last transaction (from above) of accountId=1. When there is a new transaction from account 1 that happens at timestamp=1546275600000, there are 4 operations in total.

```
+----+----------------------+--------------------------------+
| op |            accountId |                last2_timestamp |
+----+----------------------+--------------------------------+
| -U |                    1 |    1546272000000,1546273800000 |
| +U |                    1 |                  1546273800000 |
| -U |                    1 |                  1546273800000 |
| +U |                    1 |    1546273800000,1546275600000 |
```

While I only want to emit the below "new status" to my downstream (let's say another Kafka topic) via some sort of merging:

```
+----------------------+--------------------------------+
|            accountId |                last2_timestamp |
+----------------------+--------------------------------+
|                    1 |    1546273800000,1546275600000 |
```

So that my downstream is able to consume literally "the last 2 transaction timestamps of each account":
```
+----------------------+--------------------------------+
|            accountId |                last2_timestamp |
+----------------------+--------------------------------+
|                    1 |                  1546272000000 |
|                    1 |    1546272000000,1546273800000 |
|                    1 |    1546273800000,1546275600000 |
(to continue)
```

What is the right way to do this?

Reply via email to