ajian2002 commented on PR #121:
URL: 
https://github.com/apache/flink-table-store/pull/121#issuecomment-1135419375
   > You can try this test case if your're interested.
   > 
   > ```java
   > @Test
   > public void myTest() throws Exception {
   >     String ddl3 =
   >             "CREATE TABLE IF NOT EXISTS T5 ( dt STRING, hr INT, price INT, 
PRIMARY KEY (dt, hr) NOT ENFORCED ) WITH ( 'merge-engine' = 'aggregation', 
'price.aggregate-function' = 'sum' );";
   >     String tmpPath;
   >     try {
   >         tmpPath = TEMPORARY_FOLDER.newFolder().toURI().toString();
   >     } catch (Exception e) {
   >         throw new RuntimeException(e);
   >     }
   >     String ddl4 =
   >             "CREATE TABLE IF NOT EXISTS A ( dt STRING, hr INT, price INT ) 
WITH ( 'connector' = 'filesystem', 'path' = '"
   >                     + tmpPath
   >                     + "', 'format' = 'avro' );";
   >     String ddl5 =
   >             "CREATE TABLE IF NOT EXISTS P ( dt STRING, hr INT, price INT ) 
WITH ( 'connector' = 'print' );";
   >     bEnv.executeSql(ddl3).await();
   >     bEnv.executeSql(ddl4).await();
   >     sEnv.executeSql(ddl3).await();
   >     sEnv.executeSql(ddl4).await();
   >     sEnv.executeSql(ddl5).await();
   >     bEnv.executeSql(
   >                     "INSERT INTO A VALUES ('20220101', 8, 100), 
('20220101', 8, 300), ('20220101', 8, 200), ('20220101', 8, 400), ('20220101', 
9, 100)")
   >             .await();
   >     sEnv.executeSql(
   >                     "INSERT INTO T5 SELECT dt, hr, price FROM ("
   >                             + "  SELECT dt, hr, price, ROW_NUMBER() OVER 
(PARTITION BY dt, hr ORDER BY price desc) AS rn FROM A"
   >                             + ") WHERE rn <= 2")
   >             .await();
   >     sEnv.executeSql(
   >                     "INSERT INTO P SELECT dt, hr, price FROM ("
   >                             + "  SELECT dt, hr, price, ROW_NUMBER() OVER 
(PARTITION BY dt, hr ORDER BY price desc) AS rn FROM A"
   >                             + ") WHERE rn <= 2")
   >             .await();
   >     List<Row> result = iteratorToList(bEnv.from("T5").execute().collect());
   >     System.out.println(result);
   > }
   > ```
   > 
   > This SQL script calculates the sum of the top 2 largest price in each 
hour. I've also created a print sink so that you can see what's going into the 
sink.
   > 
   > ```
   > 1> +I[20220101, 8, 100]
   > 1> +I[20220101, 8, 300] # insert price 100 and 300
   > 1> -D[20220101, 8, 100]
   > 1> +I[20220101, 8, 200] # price 200 comes, so 100 should be removed out of 
the result
   > 1> -D[20220101, 8, 200]
   > 1> +I[20220101, 8, 400] # price 400 comes, so 200 should be removed out of 
the result
   > 1> +I[20220101, 9, 100] # 100 in another hour, not affected
   > ```
   > 
   > The expected result of our aggregate function should be `[+I[20220101, 8, 
700], +I[20220101, 9, 100]]` but sadly current implementation prints out 
`[+I[20220101, 8, 1300], +I[20220101, 9, 100]]`.
   > 
   > Sorry that my previous comments on row kinds are sort of misleading. Flink 
does have 4 row kinds but in Table Store we only consider key-value pairs 
instead of rows. Key-value pairs only have two value kinds (provided by 
`KeyValue#valueKind`): `ValueKind.ADD` means updating the corresponding key 
with the value (you can think of it as `Map#compute` in Java) and 
`ValueKind.DELETE` means removing the corresponding key (like `Map#remove` in 
Java). Although each key and each value is represented by a `RowData`, their 
row kind are meaningless and are always `INSERT`. Only their value kinds are 
important.
   > 
   > To connect table store with Flink, we parse `RowData` from Flink into 
`KeyValue` according to both its row kind and the merge function used in 
`flink-table-store-connector` module. As the number of merge functions are 
growing we should consider extracting a common method to complete this parsing. 
But that is out of the scope of this PR.
   > 
   > For this PR I suggest that we only support `INSERT` row kind and leave the 
support for other row kind later. All in all each PR should be as small as 
possible and only concentrate on one thing.
   
   
   sorry i didn't understand what you meant
   Your test executes SQL
   > "Insert a value ('20220101', 8, 100), ('20220101', 8, 300), ('20220101', 
8, 200), ('20220101', 8, 400), ('20220101' , 9, 100)"
   
   But I didn't find where to execute `-D[20220101, 8, 200]`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to