Hi,
I'm using Flink 1.9 and testing MATCH_RECOGNIZE by following [1].
While testing the query in [2] on myself, I've got the different result
from [2]
The query result from [2] is as follows:

 symbol       start_tstamp       end_tstamp          avgPrice
=========  ==================  ==================  ============
ACME       01-APR-11 10:00:00  01-APR-11 10:00:03     14.5
ACME       01-APR-11 10:00:04  01-APR-11 10:00:09     13.5

The other query result from the attached maven project (which only contains
a sample program that executes the query in [2]) is as follows:

> ACME,1970-01-01 00:00:01.0,1970-01-01 00:00:04.0,14.5
>
There's just one entry, not two.
(As you might notice, the time of the first record in the attached maven
project is 1970-01-01 00:00:01 for testing. The other numbers are the same.)

I dug into the internal implementation of CepOperator and got the
followings:

   1. INPUT : ACME,1000,12.0,1
   2. PARTIAL MATCH : [A*1]
   3.
   4. INPUT : ACME,2000,17.0,2
   5. PARTIAL MATCH : [A*2]
   6.
   7. INPUT : ACME,3000,13.0,1
   8. PARTIAL MATCH : [A*3]
   9. PARTIAL MATCH : [A*1]
   10.
   11. INPUT : ACME,4000,16.0,3
   12. PARTIAL MATCH : [A*4]
   13. PARTIAL MATCH : [A*2]
   14.
   15. *INPUT : ACME,5000,25.0,2*
   16. *COMPLETED MATCH : [A*4, B*1]*
   17.
   18. INPUT : ACME,6000,2.0,1
   19. PARTIAL MATCH : [A*1]
   20.
   21. INPUT : ACME,7000,4.0,1
   22. PARTIAL MATCH : [A*2]
   23. PARTIAL MATCH : [A*1]
   24.
   25. INPUT : ACME,8000,10.0,2
   26. PARTIAL MATCH : [A*3]
   27. PARTIAL MATCH : [A*2]
   28. PARTIAL MATCH : [A*1]
   29.
   30. INPUT : ACME,9000,15.0,2
   31. PARTIAL MATCH : [A*4]
   32. PARTIAL MATCH : [A*3]
   33. PARTIAL MATCH : [A*2]
   34.
   35. INPUT : ACME,10000,25.0,2
   36. PARTIAL MATCH : [A*5]
   37. PARTIAL MATCH : [A*4]
   38.
   39. INPUT : ACME,11000,30.0,1
   40. PARTIAL MATCH : [A*6]


My observation is that, when "ACME,5000,25.0,2" comes in (line 15), we get
a completed match (line 16) but no partial match (which is [A*1] in my
notation) starting from it.
According to the definition of "AFTER MATCH SKIP TO FIRST B", as
"ACME,5000,25,2" is B, a new match should start from "ACME,5000,25.0,2".
However, a new match starts from the next one (line 18, 19) in the above
trace.
Therefore, when the last one "ACME,11000,30.0,1" comes in, the average at
that point is 14.3(=2+4+10+15+25+30/6) which is less than 15
so "ACME,11000,30.0,1" belongs to A, not B as shown in the example.

Is it a bug? or did I miss something conceptually?

p.s. how do you load rows from a local csv file with rowtime configured? I
don't like the way I implemented my custom table source in the attached
file which I use for testing.

Best,
Dongwon

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html#aggregations

Attachment: cepTest.tar.gz
Description: application/gzip

Reply via email to