This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7b23042eb [ISSUE #6306] Fix unexpected state from slave (#6307)
7b23042eb is described below
commit 7b23042eb2a0f445d0e06570f481dd4967eb99c5
Author: fujian-zfj <[email protected]>
AuthorDate: Fri Mar 10 13:39:28 2023 +0800
[ISSUE #6306] Fix unexpected state from slave (#6307)
* typo int readme[ecosystem]
* fix unexpected state from slave
---
.../store/ha/autoswitch/AutoSwitchHAClient.java | 24 +++++++++++-----------
.../ha/autoswitch/AutoSwitchHAConnection.java | 2 +-
2 files changed, 13 insertions(+), 13 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index b95d3814a..49a59e251 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -332,21 +332,21 @@ public class AutoSwitchHAClient extends ServiceThread
implements HAClient {
}
}
- private boolean reportSlaveOffset(final long offsetToReport) throws
IOException {
+ private boolean reportSlaveOffset(HAConnectionState currentState, final
long offsetToReport) throws IOException {
this.transferHeaderBuffer.position(0);
this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
- this.transferHeaderBuffer.putInt(this.currentState.ordinal());
+ this.transferHeaderBuffer.putInt(currentState.ordinal());
this.transferHeaderBuffer.putLong(offsetToReport);
this.transferHeaderBuffer.flip();
return this.haWriter.write(this.socketChannel,
this.transferHeaderBuffer);
}
- private boolean reportSlaveMaxOffset() throws IOException {
+ private boolean reportSlaveMaxOffset(HAConnectionState currentState)
throws IOException {
boolean result = true;
final long maxPhyOffset = this.messageStore.getMaxPhyOffset();
if (maxPhyOffset > this.currentReportedOffset) {
this.currentReportedOffset = maxPhyOffset;
- result = reportSlaveOffset(this.currentReportedOffset);
+ result = reportSlaveOffset(currentState,
this.currentReportedOffset);
}
return result;
}
@@ -369,11 +369,11 @@ public class AutoSwitchHAClient extends ServiceThread
implements HAClient {
return this.socketChannel != null;
}
- private boolean transferFromMaster() throws IOException {
+ private boolean transferFromMaster(HAConnectionState currentState) throws
IOException {
boolean result;
if (isTimeToReportOffset()) {
LOGGER.info("Slave report current offset {}",
this.currentReportedOffset);
- result = reportSlaveOffset(this.currentReportedOffset);
+ result = reportSlaveOffset(currentState,
this.currentReportedOffset);
if (!result) {
return false;
}
@@ -386,7 +386,7 @@ public class AutoSwitchHAClient extends ServiceThread
implements HAClient {
return false;
}
- return this.reportSlaveMaxOffset();
+ return this.reportSlaveMaxOffset(currentState);
}
@Override
@@ -415,7 +415,7 @@ public class AutoSwitchHAClient extends ServiceThread
implements HAClient {
handshakeWithMaster();
continue;
case TRANSFER:
- if (!transferFromMaster()) {
+ if (!transferFromMaster(HAConnectionState.TRANSFER)) {
closeMasterAndWait();
continue;
}
@@ -445,7 +445,7 @@ public class AutoSwitchHAClient extends ServiceThread
implements HAClient {
/**
* Compare the master and slave's epoch file, find consistent point, do
truncate.
*/
- private boolean doTruncate(List<EpochEntry> masterEpochEntries, long
masterEndOffset) throws IOException {
+ private boolean doTruncate(List<EpochEntry> masterEpochEntries, long
masterEndOffset, HAConnectionState currentState) throws IOException {
if (this.epochCache.getEntrySize() == 0) {
// If epochMap is empty, means the broker is a new replicas
LOGGER.info("Slave local epochCache is empty, skip truncate log");
@@ -475,7 +475,7 @@ public class AutoSwitchHAClient extends ServiceThread
implements HAClient {
changeCurrentState(HAConnectionState.TRANSFER);
this.currentReportedOffset = truncateOffset;
}
- if (!reportSlaveMaxOffset()) {
+ if (!reportSlaveMaxOffset(currentState)) {
LOGGER.error("AutoSwitchHAClient report max offset to master
failed");
return false;
}
@@ -534,7 +534,7 @@ public class AutoSwitchHAClient extends ServiceThread
implements HAClient {
byteBufferRead.position(readSocketPos);
AutoSwitchHAClient.this.processPosition +=
bodySize;
LOGGER.info("Receive handshake,
masterMaxPosition {}, masterEpochEntries:{}, try truncate log", masterOffset,
epochEntries);
- if (!doTruncate(epochEntries, masterOffset)) {
+ if (!doTruncate(epochEntries, masterOffset,
HAConnectionState.HANDSHAKE)) {
waitForRunning(1000 * 2);
LOGGER.error("AutoSwitchHAClient truncate
log failed in handshake state");
return false;
@@ -573,7 +573,7 @@ public class AutoSwitchHAClient extends ServiceThread
implements HAClient {
haService.updateConfirmOffset(Math.min(confirmOffset,
messageStore.getMaxPhyOffset()));
- if (!reportSlaveMaxOffset()) {
+ if
(!reportSlaveMaxOffset(HAConnectionState.TRANSFER)) {
LOGGER.error("AutoSwitchHAClient report
max offset to master failed");
return false;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index 57f9e9619..8f79b55a9 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -363,7 +363,7 @@ public class AutoSwitchHAConnection implements HAConnection
{
break;
default:
LOGGER.error("Current state illegal {}",
currentState);
- break;
+ return false;
}
if (!slaveState.equals(currentState)) {