✅ The verified answer to this question is available below. Our community-reviewed solutions help you understand the material better.
Im Raft-Paper ist die RPC Funktion AppendEntries folgendermaßen definiert worden:
Im Java Code der Raft-grpc Implementierung ist die Funktion so implementiert worden:
public AppendEntriesResponse handleAppendEntries(AppendEntriesRequest request) {
// Reject AppendEntries when suspended
if (suspended) {
log.debug("Rejecting AppendEntries - node is suspended");
return AppendEntriesResponse.newBuilder()
.setTerm(currentTerm.get())
.setSuccess(false)
.build();
}
lastHeartbeat = System.currentTimeMillis();
if (request.getTerm() > currentTerm.get()) {
becomeFollower(request.getTerm());
}
if (request.getTerm() < currentTerm.get()) {
return AppendEntriesResponse.newBuilder()
.setTerm(currentTerm.get())
.setSuccess(false)
.build();
}
if (state != NodeState.FOLLOWER) {
becomeFollower(request.getTerm());
}
currentLeader = request.getLeaderId();
startElectionTimer();
if (request.getEntriesCount() == 0) {
logEvent(RaftEvent.EventType.HEARTBEAT_RECEIVED,
"Heartbeat from leader " + request.getLeaderId());
}
// Check prevLogIndex/prevLogTerm consistency (logical indices)
int prevLogIndex = request.getPrevLogIndex();
if (prevLogIndex >= 0) {
// Check if we have the entry at prevLogIndex
int prevLogTerm = getLogTermAt(prevLogIndex);
if (prevLogTerm == 0 || prevLogTerm != request.getPrevLogTerm()) {
log.debug("Log consistency check failed at index {}: expected term {}, got {}",
prevLogIndex, request.getPrevLogTerm(), prevLogTerm);
return AppendEntriesResponse.newBuilder()
.setTerm(currentTerm.get())
.setSuccess(false)
.build();
}
}
// Process new entries (logical indices)
int newEntryLogicalIndex = prevLogIndex + 1;
int entriesAdded = 0;
List<LogEntry> entriesToPersist = new ArrayList<>();
for (GrpcLogEntry entry : request.getEntriesList()) {
int physicalIndex = logicalToPhysical(newEntryLogicalIndex);
// Check for conflicts
if (physicalIndex >= 0 && physicalIndex < raftLog.size()) {
if (raftLog.get(physicalIndex).getTerm() != entry.getTerm()) {
// Conflict detected - delete conflicting entries
synchronized (raftLog) {
raftLog.subList(physicalIndex, raftLog.size()).clear();
// Persist log deletion (CRITICAL for consistency)
persistenceService.deleteLogEntriesFrom(config.getNodeId(), newEntryLogicalIndex);
}
}
}
// Append if we don't have this entry
physicalIndex = logicalToPhysical(newEntryLogicalIndex);
if (physicalIndex >= raftLog.size()) {
// Convert gRPC entry back to internal LogEntry
LogEntry logEntry = convertFromGrpcLogEntry(entry);
synchronized (raftLog) {
raftLog.add(logEntry);
}
entriesToPersist.add(logEntry);
entriesAdded++;
}
newEntryLogicalIndex++;
}
// Persist all new entries in batch (CRITICAL for durability)
if (!entriesToPersist.isEmpty()) {
int startLogicalIndex = prevLogIndex + 1;
persistenceService.appendLogEntries(config.getNodeId(), startLogicalIndex, entriesToPersist);
}
if (entriesAdded > 0) {
logEvent(RaftEvent.EventType.LOG_REPLICATED,
"Replicated " + entriesAdded + " entries from leader " + request.getLeaderId());
}
// Update commit index based on leader's commit index (logical indices)
if (request.getLeaderCommit() > commitIndex.get()) {
commitIndex.set(Math.min(request.getLeaderCommit(), getLastLogIndex()));
applyCommittedEntries();
}
return AppendEntriesResponse.newBuilder()
.setTerm(currentTerm.get())
.setSuccess(true)
.setMatchIndex(getLastLogIndex()) // Logical index of last entry
.build();
}
Mit den folgenden gRPC Messages:
message AppendEntriesRequest {
int32 term = 1;
string leaderId = 2;
int32 prevLogIndex = 3;
int32 prevLogTerm = 4;
repeated GrpcLogEntry entries = 5;
int32 leaderCommit = 6;
}
message AppendEntriesResponse {
int32 term = 1;
bool success = 2;
int32 matchIndex = 3;
}
Welche Aussagen treffen dabei zu?