Skip to content

Commit

Permalink
support subdir at snapshot dir
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jul 3, 2017
1 parent 8867e76 commit 460f425
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -507,12 +507,13 @@ private long packEntries(long nextIndex, RaftMessage.AppendEntriesRequest.Builde
}

private boolean installSnapshot(Peer peer) {
if (!snapshot.getIsInSnapshot().compareAndSet(false, true)) {
LOG.info("already in snapshot");
if (!snapshot.getIsTakeSnapshot().get()) {
LOG.info("already in take snapshot, please send install snapshot request later");
return false;
}

LOG.info("begin install snapshot");
LOG.info("begin send install snapshot request to server={}", peer.getServer().getServerId());
snapshot.getIsInstallSnapshot().set(true);
boolean isSuccess = true;
try {
boolean isLastRequest = false;
Expand Down Expand Up @@ -561,9 +562,10 @@ private boolean installSnapshot(Peer peer) {
}
}
} finally {
snapshot.getIsInSnapshot().compareAndSet(true, false);
snapshot.getIsInstallSnapshot().set(false);
}
LOG.info("install snapshot success={}", isSuccess);
LOG.info("end send install snapshot request to server={}, success={}",
peer.getServer().getServerId(), isSuccess);
return isSuccess;
}

Expand Down Expand Up @@ -662,11 +664,12 @@ public void stepDown(long newTerm) {
}

public void takeSnapshot() {
if (!snapshot.getIsInSnapshot().compareAndSet(false, true)) {
LOG.info("already in snapshot, ignore takeSnapshot");
if (snapshot.getIsInstallSnapshot().get()) {
LOG.info("already in install snapshot, ignore take snapshot");
return;
}

snapshot.getIsTakeSnapshot().compareAndSet(false, true);
try {
long localLastAppliedIndex;
long lastAppliedTerm = 0;
Expand Down Expand Up @@ -738,7 +741,7 @@ public void takeSnapshot() {
}
}
} finally {
snapshot.getIsInSnapshot().compareAndSet(true, false);
snapshot.getIsTakeSnapshot().compareAndSet(true, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,22 +190,23 @@ public RaftMessage.InstallSnapshotResponse installSnapshot(RaftMessage.InstallSn
raftNode.getLock().unlock();
}

if (!raftNode.getSnapshot().getIsInSnapshot().compareAndSet(false, true)) {
LOG.warn("alreay in snapshot");
if (raftNode.getSnapshot().getIsTakeSnapshot().get()) {
LOG.warn("alreay in take snapshot, do not handle install snapshot request now");
return responseBuilder.build();
}

raftNode.getSnapshot().getIsInstallSnapshot().set(true);
RandomAccessFile randomAccessFile = null;
raftNode.getSnapshot().getLock().lock();
try {
// write snapshot data to local
String tmpSnapshotDir = raftNode.getSnapshot().getSnapshotDir() + ".tmp";
File file = new File(tmpSnapshotDir);
if (file.exists() && request.getIsFirst()) {
file.delete();
file.mkdir();
}
if (request.getIsFirst()) {
if (file.exists()) {
file.delete();
}
file.mkdir();
LOG.info("begin accept install snapshot request from serverId={}", request.getServerId());
raftNode.getSnapshot().updateMetaData(tmpSnapshotDir,
request.getSnapshotMetaData().getLastIncludedIndex(),
Expand All @@ -221,6 +222,10 @@ public RaftMessage.InstallSnapshotResponse installSnapshot(RaftMessage.InstallSn

String currentDataFileName = currentDataDirName + File.separator + request.getFileName();
File currentDataFile = new File(currentDataFileName);
// 文件名可能是个相对路径,比如topic/0/message.txt
if (!currentDataFile.getParentFile().exists()) {
currentDataFile.getParentFile().mkdirs();
}
if (!currentDataFile.exists()) {
currentDataFile.createNewFile();
}
Expand All @@ -247,7 +252,6 @@ public RaftMessage.InstallSnapshotResponse installSnapshot(RaftMessage.InstallSn
} finally {
RaftFileUtils.closeFile(randomAccessFile);
raftNode.getSnapshot().getLock().unlock();
raftNode.getSnapshot().getIsInSnapshot().compareAndSet(true, false);
}

if (request.getIsLast() && responseBuilder.getResCode() == RaftMessage.ResCode.RES_CODE_SUCCESS) {
Expand Down Expand Up @@ -275,6 +279,10 @@ public RaftMessage.InstallSnapshotResponse installSnapshot(RaftMessage.InstallSn
LOG.info("end accept install snapshot request from serverId={}", request.getServerId());
}

if (request.getIsLast()) {
raftNode.getSnapshot().getIsInstallSnapshot().set(false);
}

return responseBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,16 +263,16 @@ public void loadSegmentData(Segment segment) {
}

public void readSegments() {
List<String> fileNames = RaftFileUtils.getSortedFilesInDirectory(logDataDir);
for (String fileName: fileNames) {
String[] splitArray = fileName.split("-");
if (splitArray.length != 2) {
LOG.warn("segment filename[{}] is not valid", fileName);
continue;
}
Segment segment = new Segment();
segment.setFileName(fileName);
try {
try {
List<String> fileNames = RaftFileUtils.getSortedFilesInDirectory(logDataDir, logDataDir);
for (String fileName : fileNames) {
String[] splitArray = fileName.split("-");
if (splitArray.length != 2) {
LOG.warn("segment filename[{}] is not valid", fileName);
continue;
}
Segment segment = new Segment();
segment.setFileName(fileName);
if (splitArray[0].equals("open")) {
segment.setCanWrite(true);
segment.setStartIndex(Long.valueOf(splitArray[1]));
Expand All @@ -290,11 +290,10 @@ public void readSegments() {
segment.setRandomAccessFile(RaftFileUtils.openFile(logDataDir, fileName, "rw"));
segment.setFileSize(segment.getRandomAccessFile().length());
startLogIndexSegmentMap.put(segment.getStartIndex(), segment);
} catch (IOException ioException) {
LOG.warn("open segment file error, file={}, msg={}",
fileName, ioException.getMessage());
throw new RuntimeException("open segment file error");
}
} catch(IOException ioException){
LOG.warn("readSegments exception:", ioException);
throw new RuntimeException("open segment file error");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ public class SnapshotDataFile {
private String snapshotDir = RaftOptions.dataDir + File.separator + "snapshot";
private RaftMessage.SnapshotMetaData metaData;
private TreeMap<String, SnapshotDataFile> snapshotDataFileMap;
private AtomicBoolean isInSnapshot = new AtomicBoolean(false);
// 表示是否正在安装snapshot,leader向follower安装,leader和follower同时处于installSnapshot状态
private AtomicBoolean isInstallSnapshot = new AtomicBoolean(false);
// 表示节点自己是否在对状态机做snapshot
private AtomicBoolean isTakeSnapshot = new AtomicBoolean(false);
private Lock lock = new ReentrantLock();

public Snapshot() {
Expand Down Expand Up @@ -70,18 +73,18 @@ public TreeMap<String, SnapshotDataFile> readSnapshotDataFiles() {
Path snapshotDataPath = FileSystems.getDefault().getPath(snapshotDataDir);
snapshotDataPath = snapshotDataPath.toRealPath();
snapshotDataDir = snapshotDataPath.toString();
List<String> fileNames = RaftFileUtils.getSortedFilesInDirectory(snapshotDataDir, snapshotDataDir);
for (String fileName : fileNames) {
RandomAccessFile randomAccessFile = RaftFileUtils.openFile(snapshotDataDir, fileName, "r");
SnapshotDataFile snapshotFile = new SnapshotDataFile();
snapshotFile.fileName = fileName;
snapshotFile.randomAccessFile = randomAccessFile;
snapshotDataFileMap.put(fileName, snapshotFile);
}
} catch (IOException ex) {
LOG.warn("readSnapshotDataFiles exception:", ex);
throw new RuntimeException(ex);
}
List<String> fileNames = RaftFileUtils.getSortedFilesInDirectory(snapshotDataDir);
for (String fileName : fileNames) {
RandomAccessFile randomAccessFile = RaftFileUtils.openFile(snapshotDataDir, fileName, "r");
SnapshotDataFile snapshotFile = new SnapshotDataFile();
snapshotFile.fileName = fileName;
snapshotFile.randomAccessFile = randomAccessFile;
snapshotDataFileMap.put(fileName, snapshotFile);
}
return snapshotDataFileMap;
}

Expand Down Expand Up @@ -136,16 +139,16 @@ public String getSnapshotDir() {
return snapshotDir;
}

public TreeMap<String, SnapshotDataFile> getSnapshotDataFileMap() {
return snapshotDataFileMap;
public AtomicBoolean getIsInstallSnapshot() {
return isInstallSnapshot;
}

public AtomicBoolean getIsInSnapshot() {
return isInSnapshot;
public AtomicBoolean getIsTakeSnapshot() {
return isTakeSnapshot;
}

public void setIsInSnapshot(AtomicBoolean isInSnapshot) {
this.isInSnapshot = isInSnapshot;
public TreeMap<String, SnapshotDataFile> getSnapshotDataFileMap() {
return snapshotDataFileMap;
}

public Lock getLock() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@

import java.io.*;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.*;
import java.util.zip.CRC32;

/**
Expand All @@ -20,27 +17,28 @@ public class RaftFileUtils {

private static final Logger LOG = LoggerFactory.getLogger(RaftFileUtils.class);

public static List<String> getSortedFilesInDirectory(String dirName) {
public static List<String> getSortedFilesInDirectory(
String dirName, String rootDirName) throws IOException {
List<String> fileList = new ArrayList<>();
File rootDir = new File(rootDirName);
File dir = new File(dirName);
if (!rootDir.isDirectory() || !dir.isDirectory()) {
return fileList;
}
String rootPath = rootDir.getCanonicalPath();
if (!rootPath.endsWith("/")) {
rootPath = rootPath + "/";
}
File[] files = dir.listFiles();
Arrays.sort(files, new Comparator<File>() {
@Override
public int compare(File o1, File o2) {
if (o1.isDirectory() && o2.isFile()) {
return -1;
}
if (o1.isFile() && o2.isDirectory()) {
return 1;
}
return o2.getName().compareTo(o1.getName());
}
});

List<String> fileNames = new ArrayList<>(files.length);
for (File file : files) {
fileNames.add(file.getName());
if (file.isDirectory()) {
fileList.addAll(getSortedFilesInDirectory(file.getCanonicalPath(), rootPath));
} else {
fileList.add(file.getCanonicalPath().substring(rootPath.length()));
}
}
return fileNames;
Collections.sort(fileList);
return fileList;
}

public static RandomAccessFile openFile(String dir, String fileName, String mode) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.github.wenweihu86.raft.util;

import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.util.List;

/**
* Created by wenweihu86 on 2017/7/3.
*/
public class RaftUtilsTest {

@Test
public void testGetSortedFilesInDirectory() throws IOException {
File queue0 = new File("./data/message/example-topic/0");
queue0.mkdirs();
File segmentLog00 = new File("./data/message/example-topic/0/segment0");
segmentLog00.createNewFile();
File segmentLog01 = new File("./data/message/example-topic/0/segment1");
segmentLog01.createNewFile();

File queue1 = new File("./data/message/example-topic/1");
queue1.mkdirs();
File segmentLog12 = new File("./data/message/example-topic/1/segment2");
segmentLog12.createNewFile();
File segmentLog13 = new File("./data/message/example-topic/1/segment3");
segmentLog13.createNewFile();

List<String> files = RaftFileUtils.getSortedFilesInDirectory(
"./data/message", "./data/message");
System.out.println(files);
Assert.assertTrue(files.size() == 4);
Assert.assertTrue(files.contains("example-topic/0/segment0"));

File dataDir = new File("./data");
FileUtils.deleteDirectory(dataDir);
}
}

0 comments on commit 460f425

Please sign in to comment.