[ https://issues.apache.org/jira/browse/FLINK-35674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17859566#comment-17859566 ]
Thorne commented on FLINK-35674: -------------------------------- I wil fix it > MySQL connector cause blocking when searching for binlog timestamps > ------------------------------------------------------------------- > > Key: FLINK-35674 > URL: https://issues.apache.org/jira/browse/FLINK-35674 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: cdc-3.1.1 > Environment: flink-cdc-3.1.x > Reporter: Thorne > Priority: Blocker > Fix For: cdc-3.2.0 > > Attachments: A7AE0D63-365D-4572-B63D-96DF5F096BF9.png, > BF180441-9C61-40eb-B07C-A11F8BCEC2D0.png, > FBA32597-8783-4678-B391-E450148C1B30.png > > > When a task is started by multiple mysql connector timestamp start mode at > the same time, when searching for binlog timestamp, there will be task > blocking problem, which may cause source to be unable to obtain data all the > time. > > 1、 I have four tables(products,orders,orders_copy,shipments,)to capture in a > task . For these four tables, I made a lot of binlogs,such as 10 million。 > 2、I try start it with timestamp mode and the products table could not get any > records . > !FBA32597-8783-4678-B391-E450148C1B30.png|width=550,height=264! > 3、I try start it with timestamp mode ,but the orders_copy table could not > get any records > !BF180441-9C61-40eb-B07C-A11F8BCEC2D0.png|width=557,height=230! > 3、I debug code and find some problems > {code:java} > # Class: org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils > private static String searchBinlogName( > BinaryLogClient client, long targetMs, List<String> binlogFiles) > throws IOException, InterruptedException { > int startIdx = 0; > int endIdx = binlogFiles.size() - 1; > while (startIdx <= endIdx) { > int mid = startIdx + (endIdx - startIdx) / 2; > long midTs = getBinlogTimestamp(client, binlogFiles.get(mid)); > if (midTs < targetMs) { > startIdx = mid + 1; > } else if (targetMs < midTs) { > endIdx = mid - 1; > } else { > return binlogFiles.get(mid); > } > } > return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx); > } > private static long getBinlogTimestamp(BinaryLogClient client, String > binlogFile) > throws IOException, InterruptedException { > ArrayBlockingQueue<Long> binlogTimestamps = new ArrayBlockingQueue<>(1); > BinaryLogClient.EventListener eventListener = > event -> { > EventData data = event.getData(); > if (data instanceof RotateEventData) { > // We skip RotateEventData because it does not contain > the timestamp we are > // interested in. > return; > } > EventHeaderV4 header = event.getHeader(); > long timestamp = header.getTimestamp(); > if (timestamp > 0) { > binlogTimestamps.offer(timestamp); > try { > client.disconnect(); > } catch (IOException e) { > throw new RuntimeException(e); > } > } > }; > try { > client.registerEventListener(eventListener); > client.setBinlogFilename(binlogFile); > client.setBinlogPosition(0); > LOG.info("begin parse binlog: {}", binlogFile); > client.connect(); > } finally { > client.unregisterEventListener(eventListener); > } > return binlogTimestamps.take(); > }{code} > 5、 the funciton binlogTimestamps.take() is blocking until the queue has > records. -- This message was sent by Atlassian Jira (v8.20.10#820010)