Hoang Tri Tam created FLINK-29827:
-------------------------------------

             Summary: [Connector][AsyncSinkWriter] Checkpointed states block 
writer from sending records
                 Key: FLINK-29827
                 URL: https://issues.apache.org/jira/browse/FLINK-29827
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Common
    Affects Versions: 1.15.2
            Reporter: Hoang Tri Tam


Hi every one,

Recently we discovered an issue which blocks Sink operators from sending 
records to client's endpoint.

To *reproduce* the issue, we started our Flink app from an existing savepoint, 
in which some Sink operators hold some buffered records. For instance, app 
employs KinesisStreamSink with a parallelism of 4. 2 of them has no buffered 
records, the other 2 start with existing states of some records, which are 
leftover from the previous run. 

{*}Behavior{*}: during runtime, we sent records (let's say 200) to this sink in 
rebalance mode. But only 100 of them (50%) were dispatched from the sink 
operators.

After {*}investigation{*}, we found that the implementation AsyncSinkWriter 
invokes submitRequestEntries() to send the records to their destination. This 
invocation is performed when a callback is performed, a flush(true) or 
forced-flush is called, or when the buffered is full (either in size or in 
quantity).

The case falls in the first scenario: the _callback is not registered_ {_}when 
the writer starts with some existing buffered records{_}, initialized from 
savepoint. Hence in our case, those operators were holding records till their 
buffers become full, while other operators still perform the usual sending.

Impacted {*}scope{*}: flink-1.15.2 or later version, for any Sink that 
implements AsyncSinkWriter.

We currently treat this as an abnormal behavior of Flink, but please let me 
know if this behavior is intended by design.

Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to