simenliuxing created FLINK-32778:
------------------------------------

             Summary: Stream join data output sequence is inconsistent with 
input sequence
                 Key: FLINK-32778
                 URL: https://issues.apache.org/jira/browse/FLINK-32778
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Runtime
    Affects Versions: 1.16.1
            Reporter: simenliuxing
             Fix For: 1.7.3


-- flink version:1.16.1
-- parallelism.default: 1

CREATE TABLE s1(
    id string,
    gk bigint,
    price int
 )WITH(
    'connector' = 'kafka'
    ,'properties.bootstrap.servers' = 'xx:9092'
    ,'properties.group.id' = 'xx-xx'
    ,'scan.startup.mode' = 'earliest-offset'
    ,'value.format' = 'json'
    ,'topic' = 'topic1'
 );

CREATE TABLE s2(
    id string,
    name string
 )WITH(
    'connector' = 'kafka'
    ,'properties.bootstrap.servers' = 'xx:9092'
    ,'properties.group.id' = 'xx-xx'
    ,'scan.startup.mode' = 'earliest-offset'
    ,'value.format' = 'json'
    ,'topic' = 'topic2'
 );

create table sink(
    id string,
    name string,
    gk bigint,
    price int
 )with(
    'connector'='print'
 );

create view v1 as select
    id,
    gk,
    last_value(price) price
from s1
group by id,gk;

insert into sink
select
    v1.id,
    s2.name,
    v1.gk,
    v1.price
from v1
left join s2 on v1.id=s2.id;

1.Enter two pieces of data into the topic1 topic:
{"id":"1","gk":758,"price":100}
{"id":"1","gk":1818,"price":200}

The output is as follows:
+I[1, null, 758, 100]
+I[1, null, 1818, 200]

2.Enter two pieces of data into the topic2 topic:
{"id":1,"name":"z3"}

The output is as follows:
-D[1, null, 1818, 200]
-D[1, null, 758, 100]
+I[1, z3, 1818, 200]
+I[1, z3, 758, 100]

My doubt is that the output should be in the order of input , like below:
-D[1, null, 758, 100]
-D[1, null, 1818, 200]
+I[1, z3, 758, 100]
+I[1, z3, 1818, 200]

3.When I re-run the above sql, the results are output in the order of input
+I[1, z3, 758, 100]
+I[1, z3, 1818, 200]

Is there a way to control this uncertainty?

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to