[ 
https://issues.apache.org/jira/browse/FLINK-35674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17859566#comment-17859566
 ] 

Thorne commented on FLINK-35674:
--------------------------------

I wil fix it 

> MySQL connector cause blocking when searching for binlog timestamps
> -------------------------------------------------------------------
>
>                 Key: FLINK-35674
>                 URL: https://issues.apache.org/jira/browse/FLINK-35674
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.1.1
>         Environment: flink-cdc-3.1.x
>            Reporter: Thorne
>            Priority: Blocker
>             Fix For: cdc-3.2.0
>
>         Attachments: A7AE0D63-365D-4572-B63D-96DF5F096BF9.png, 
> BF180441-9C61-40eb-B07C-A11F8BCEC2D0.png, 
> FBA32597-8783-4678-B391-E450148C1B30.png
>
>
> When a task is started by multiple mysql connector timestamp start mode at 
> the same time, when searching for binlog timestamp, there will be task 
> blocking problem, which may cause source to be unable to obtain data all the 
> time. 
>  
> 1、 I have four tables(products,orders,orders_copy,shipments,)to capture in a 
> task . For these four tables, I made a lot of binlogs,such as 10 million。
> 2、I try start it with timestamp mode and the products table could not get any 
> records .
> !FBA32597-8783-4678-B391-E450148C1B30.png|width=550,height=264!
> 3、I try start it with timestamp mode  ,but  the orders_copy table could not 
> get any records
> !BF180441-9C61-40eb-B07C-A11F8BCEC2D0.png|width=557,height=230!
> 3、I debug  code and find some  problems
> {code:java}
> # Class: org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils
> private static String searchBinlogName(
>         BinaryLogClient client, long targetMs, List<String> binlogFiles)
>         throws IOException, InterruptedException {
>     int startIdx = 0;
>     int endIdx = binlogFiles.size() - 1;
>     while (startIdx <= endIdx) {
>         int mid = startIdx + (endIdx - startIdx) / 2;
>         long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
>         if (midTs < targetMs) {
>             startIdx = mid + 1;
>         } else if (targetMs < midTs) {
>             endIdx = mid - 1;
>         } else {
>             return binlogFiles.get(mid);
>         }
>     }
>     return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
> }
> private static long getBinlogTimestamp(BinaryLogClient client, String 
> binlogFile)
>         throws IOException, InterruptedException {
>     ArrayBlockingQueue<Long> binlogTimestamps = new ArrayBlockingQueue<>(1);
>     BinaryLogClient.EventListener eventListener =
>             event -> {
>                 EventData data = event.getData();
>                 if (data instanceof RotateEventData) {
>                     // We skip RotateEventData because it does not contain 
> the timestamp we are
>                     // interested in.
>                     return;
>                 }
>                 EventHeaderV4 header = event.getHeader();
>                 long timestamp = header.getTimestamp();
>                 if (timestamp > 0) {
>                     binlogTimestamps.offer(timestamp);
>                     try {
>                         client.disconnect();
>                     } catch (IOException e) {
>                         throw new RuntimeException(e);
>                     }
>                 }
>             };
>     try {
>         client.registerEventListener(eventListener);
>         client.setBinlogFilename(binlogFile);
>         client.setBinlogPosition(0);
>         LOG.info("begin parse binlog: {}", binlogFile);
>         client.connect();
>     } finally {
>         client.unregisterEventListener(eventListener);
>     }
>     return binlogTimestamps.take();
> }{code}
> 5、 the funciton binlogTimestamps.take() is blocking until the queue has 
> records.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to