Skip to content

Commit

Permalink
fix bug:snapshot opened file leak
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jul 5, 2017
1 parent a852b3b commit 2eada10
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ public void run() {

// in lock, 开始心跳,对leader有效
private void startNewHeartbeat() {
LOG.debug("start new heartbeat");
LOG.debug("start new heartbeat, peers={}", peerMap.keySet());
for (final Peer peer : peerMap.values()) {
executorService.submit(new Runnable() {
@Override
Expand Down Expand Up @@ -370,6 +370,7 @@ public void appendEntries(Peer peer) {
lock.unlock();
}

LOG.debug("is need snapshot={}, peer={}", isNeedInstallSnapshot, peer.getServer().getServerId());
if (isNeedInstallSnapshot) {
if (!installSnapshot(peer)) {
return;
Expand Down Expand Up @@ -507,22 +508,27 @@ private long packEntries(long nextIndex, RaftMessage.AppendEntriesRequest.Builde
}

private boolean installSnapshot(Peer peer) {
if (!snapshot.getIsTakeSnapshot().get()) {
if (snapshot.getIsTakeSnapshot().get()) {
LOG.info("already in take snapshot, please send install snapshot request later");
return false;
}
if (!snapshot.getIsInstallSnapshot().compareAndSet(false, true)) {
LOG.info("already in install snapshot");
return false;
}

LOG.info("begin send install snapshot request to server={}", peer.getServer().getServerId());
snapshot.getIsInstallSnapshot().set(true);
boolean isSuccess = true;
TreeMap<String, Snapshot.SnapshotDataFile> snapshotDataFileMap = snapshot.openSnapshotDataFiles();
LOG.info("total snapshot files={}", snapshotDataFileMap.keySet());
try {
boolean isLastRequest = false;
String lastFileName = null;
long lastOffset = 0;
long lastLength = 0;
while (!isLastRequest) {
RaftMessage.InstallSnapshotRequest request
= buildInstallSnapshotRequest(lastFileName, lastOffset, lastLength);
= buildInstallSnapshotRequest(snapshotDataFileMap, lastFileName, lastOffset, lastLength);
if (request == null) {
LOG.warn("snapshot request == null");
isSuccess = false;
Expand Down Expand Up @@ -562,20 +568,21 @@ private boolean installSnapshot(Peer peer) {
}
}
} finally {
snapshot.getIsInstallSnapshot().set(false);
snapshot.closeSnapshotDataFiles(snapshotDataFileMap);
snapshot.getIsInstallSnapshot().compareAndSet(true, false);
}
LOG.info("end send install snapshot request to server={}, success={}",
peer.getServer().getServerId(), isSuccess);
return isSuccess;
}

private RaftMessage.InstallSnapshotRequest buildInstallSnapshotRequest(
TreeMap<String, Snapshot.SnapshotDataFile> snapshotDataFileMap,
String lastFileName, long lastOffset, long lastLength) {
RaftMessage.InstallSnapshotRequest.Builder requestBuilder = RaftMessage.InstallSnapshotRequest.newBuilder();

snapshot.getLock().lock();
try {
TreeMap<String, Snapshot.SnapshotDataFile> snapshotDataFileMap = snapshot.getSnapshotDataFileMap();
if (lastFileName == null) {
lastFileName = snapshotDataFileMap.firstKey();
lastOffset = 0;
Expand Down Expand Up @@ -845,7 +852,7 @@ public NodeState getState() {
return state;
}

public Map<Integer, Peer> getPeerMap() {
public ConcurrentMap<Integer, Peer> getPeerMap() {
return peerMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public RaftMessage.AddPeersResponse addPeers(RaftMessage.AddPeersRequest request
final Peer peer = new Peer(server);
peer.setNextIndex(1);
requestPeers.add(peer);
raftNode.getPeerMap().put(server.getServerId(), peer);
raftNode.getPeerMap().putIfAbsent(server.getServerId(), peer);
raftNode.getExecutorService().submit(new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,9 @@ public long append(List<RaftMessage.LogEntry> entries) {
segment.getStartIndex(), segment.getEndIndex());
String newFullFileName = logDataDir + File.separator + newFileName;
File newFile = new File(newFullFileName);
newFile.createNewFile();
String oldFullFileName = logDataDir + File.separator + segment.getFileName();
File oldFile = new File(oldFullFileName);
oldFile.renameTo(newFile);
FileUtils.moveFile(oldFile, newFile);
segment.setFileName(newFileName);
segment.setRandomAccessFile(RaftFileUtils.openFile(logDataDir, newFileName, "r"));
}
Expand Down Expand Up @@ -152,7 +151,7 @@ public long append(List<RaftMessage.LogEntry> entries) {
}
totalSize += entrySize;
} catch (IOException ex) {
throw new RuntimeException("meet exception, msg=" + ex.getMessage());
throw new RuntimeException("append raft log exception, msg=" + ex.getMessage());
}
}
return newLastLogIndex;
Expand All @@ -165,6 +164,9 @@ public void truncatePrefix(long newFirstIndex) {
long oldFirstIndex = getFirstLogIndex();
while (!startLogIndexSegmentMap.isEmpty()) {
Segment segment = startLogIndexSegmentMap.firstEntry().getValue();
if (segment.isCanWrite()) {
break;
}
if (newFirstIndex > segment.getEndIndex()) {
File oldFile = new File(logDataDir + File.separator + segment.getFileName());
try {
Expand All @@ -183,7 +185,7 @@ public void truncatePrefix(long newFirstIndex) {
if (startLogIndexSegmentMap.size() == 0) {
newActualFirstIndex = newFirstIndex;
} else {
newActualFirstIndex = getFirstLogIndex();
newActualFirstIndex = startLogIndexSegmentMap.firstKey();
}
updateMetaData(null, null, newActualFirstIndex);
LOG.info("Truncating log from old first index {} to new first index {}",
Expand Down Expand Up @@ -327,6 +329,8 @@ public void updateMetaData(Long currentTerm, Integer votedFor, Long firstLogInde
File file = new File(fileName);
try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw")) {
RaftFileUtils.writeProtoToFile(randomAccessFile, metaData);
LOG.info("new segment meta info, currentTerm={}, votedFor={}, firstLogIndex={}",
metaData.getCurrentTerm(), metaData.getVotedFor(), metaData.getFirstLogIndex());
} catch (IOException ex) {
LOG.warn("meta file not exist, name={}", fileName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
Expand All @@ -32,7 +33,6 @@ public class SnapshotDataFile {
private static final Logger LOG = LoggerFactory.getLogger(Snapshot.class);
private String snapshotDir = RaftOptions.dataDir + File.separator + "snapshot";
private RaftMessage.SnapshotMetaData metaData;
private TreeMap<String, SnapshotDataFile> snapshotDataFileMap;
// 表示是否正在安装snapshot,leader向follower安装,leader和follower同时处于installSnapshot状态
private AtomicBoolean isInstallSnapshot = new AtomicBoolean(false);
// 表示节点自己是否在对状态机做snapshot
Expand All @@ -48,25 +48,18 @@ public Snapshot() {
}

public void reload() {
if (snapshotDataFileMap != null) {
for (SnapshotDataFile file : snapshotDataFileMap.values()) {
RaftFileUtils.closeFile(file.randomAccessFile);
}
}
this.snapshotDataFileMap = readSnapshotDataFiles();
metaData = this.readMetaData();
if (metaData == null) {
if (snapshotDataFileMap.size() > 0) {
LOG.error("No readable metadata file but found snapshot in {}", snapshotDir);
throw new RuntimeException("No readable metadata file but found snapshot");
}
metaData = RaftMessage.SnapshotMetaData.newBuilder().build();
snapshotDataFileMap = new TreeMap<>();
}
}

// 如果是软链接,需要打开实际文件句柄
public TreeMap<String, SnapshotDataFile> readSnapshotDataFiles() {
/**
* 打开snapshot data目录下的文件,
* 如果是软链接,需要打开实际文件句柄
* @return 文件名以及文件句柄map
*/
public TreeMap<String, SnapshotDataFile> openSnapshotDataFiles() {
TreeMap<String, SnapshotDataFile> snapshotDataFileMap = new TreeMap<>();
String snapshotDataDir = snapshotDir + File.separator + "data";
try {
Expand All @@ -88,6 +81,16 @@ public TreeMap<String, SnapshotDataFile> readSnapshotDataFiles() {
return snapshotDataFileMap;
}

public void closeSnapshotDataFiles(TreeMap<String, SnapshotDataFile> snapshotDataFileMap) {
for (Map.Entry<String, SnapshotDataFile> entry : snapshotDataFileMap.entrySet()) {
try {
entry.getValue().randomAccessFile.close();
} catch (IOException ex) {
LOG.warn("close snapshot files exception:", ex);
}
}
}

public RaftMessage.SnapshotMetaData readMetaData() {
String fileName = snapshotDir + File.separator + "metadata";
File file = new File(fileName);
Expand Down Expand Up @@ -147,10 +150,6 @@ public AtomicBoolean getIsTakeSnapshot() {
return isTakeSnapshot;
}

public TreeMap<String, SnapshotDataFile> getSnapshotDataFileMap() {
return snapshotDataFileMap;
}

public Lock getLock() {
return lock;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void testReadSnapshotDataFiles() throws IOException {
Files.createSymbolicLink(link, target);

Snapshot snapshot = new Snapshot();
TreeMap<String, Snapshot.SnapshotDataFile> snapshotFileMap = snapshot.readSnapshotDataFiles();
TreeMap<String, Snapshot.SnapshotDataFile> snapshotFileMap = snapshot.openSnapshotDataFiles();
System.out.println(snapshotFileMap.keySet());
Assert.assertTrue(snapshotFileMap.size() == 2);
Assert.assertTrue(snapshotFileMap.firstKey().equals("queue1.txt"));
Expand Down

0 comments on commit 2eada10

Please sign in to comment.