Skip to content

Commit

Permalink
Merge branch 'RedisLabs:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
LLiuJJ committed Mar 1, 2023
2 parents 5fb6c65 + 90587b9 commit 8454b27
Show file tree
Hide file tree
Showing 15 changed files with 279 additions and 128 deletions.
18 changes: 11 additions & 7 deletions .github/workflows/daily.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ on:
description: 'git branch or sha to use'
required: false
default: 'master'
pytest_args:
description: 'additional pytest args (e.g. run tests matching a regex n times: -k lua --repeat 3)'
required: false
default: ''

jobs:
tests-with-elle:
Expand All @@ -40,7 +44,7 @@ jobs:
- name: Build
run: |
mkdir build && cd build
cmake .. -DPYTEST_OPTS="--redis-executable=redis/src/redis-server --elle-cli=elle-cli/target/elle-cli-0.1.5-standalone.jar --elle-threads=3 -v"
cmake .. -DPYTEST_OPTS="--redis-executable=redis/src/redis-server --elle-cli=elle-cli/target/elle-cli-0.1.5-standalone.jar --elle-threads=3 -v ${{github.event.inputs.pytest_args}}"
make
- name: Checkout Redis
uses: actions/checkout@v2
Expand Down Expand Up @@ -92,7 +96,7 @@ jobs:
- name: Build
run: |
mkdir build && cd build
cmake .. -DSANITIZER=address -DPYTEST_OPTS="--redis-executable=redis/src/redis-server -v"
cmake .. -DSANITIZER=address -DPYTEST_OPTS="--redis-executable=redis/src/redis-server -v ${{github.event.inputs.pytest_args}}"
make
- name: Checkout Redis
uses: actions/checkout@v2
Expand Down Expand Up @@ -136,7 +140,7 @@ jobs:
- name: Build
run: |
mkdir build && cd build
cmake .. -DSANITIZER=undefined -DPYTEST_OPTS="--redis-executable=redis/src/redis-server -v"
cmake .. -DSANITIZER=undefined -DPYTEST_OPTS="--redis-executable=redis/src/redis-server -v ${{github.event.inputs.pytest_args}}"
make
- name: Checkout Redis
uses: actions/checkout@v2
Expand Down Expand Up @@ -180,7 +184,7 @@ jobs:
- name: Build
run: |
mkdir build && cd build
cmake .. -DPYTEST_OPTS="--redis-executable=redis/src/redis-server -v"
cmake .. -DPYTEST_OPTS="--redis-executable=redis/src/redis-server -v ${{github.event.inputs.pytest_args}}"
make
- name: Checkout Redis
uses: actions/checkout@v2
Expand Down Expand Up @@ -225,7 +229,7 @@ jobs:
- name: Build
run: |
mkdir build && cd build
cmake .. -DSANITIZER=address -DBUILD_TLS=1 -DPYTEST_OPTS="--redis-executable=redis/src/redis-server --tls -v"
cmake .. -DSANITIZER=address -DBUILD_TLS=1 -DPYTEST_OPTS="--redis-executable=redis/src/redis-server --tls -v ${{github.event.inputs.pytest_args}}"
make
- name: Checkout Redis
uses: actions/checkout@v2
Expand Down Expand Up @@ -257,7 +261,7 @@ jobs:
- name: Build
run: |
mkdir build && cd build
cmake .. -DSANITIZER=address -DBUILD_TLS=1 -DPYTEST_OPTS="--redis-executable=redis/src/redis-server --tls -v"
cmake .. -DSANITIZER=address -DBUILD_TLS=1 -DPYTEST_OPTS="--redis-executable=redis/src/redis-server --tls -v ${{github.event.inputs.pytest_args}}"
make
- name: Checkout Redis
uses: actions/checkout@v2
Expand Down Expand Up @@ -303,7 +307,7 @@ jobs:
- name: Build
run: |
mkdir build && cd build
cmake .. -DPYTEST_OPTS="--redis-executable=redis/src/redis-server -v"
cmake .. -DPYTEST_OPTS="--redis-executable=redis/src/redis-server -v ${{github.event.inputs.pytest_args}}"
make -j 4
- name: Checkout Redis
uses: actions/checkout@v2
Expand Down
56 changes: 28 additions & 28 deletions src/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,37 +29,23 @@ const char *getStateStr(RedisRaftCtx *rr)

/* Convert a Raft library error code to an error reply.
*/
void replyRaftError(RedisModuleCtx *ctx, int error)
void replyRaftError(RedisModuleCtx *ctx, const char *msg, int err)
{
char buf[128];
char buf[256] = {0};
RedisRaftCtx *rr = &redis_raft;

switch (error) {
case RAFT_ERR_NOT_LEADER:
RedisModule_ReplyWithError(ctx, "ERR not leader");
break;
case RAFT_ERR_SHUTDOWN:
LOG_WARNING("Raft requires immediate shutdown!");
enterRedisModuleCall();
RedisModule_Call(ctx, "SHUTDOWN", "");
exitRedisModuleCall();
break;
case RAFT_ERR_ONE_VOTING_CHANGE_ONLY:
RedisModule_ReplyWithError(ctx, "ERR a voting change is already in progress");
break;
case RAFT_ERR_NOMEM:
RedisModule_ReplyWithError(ctx, "OOM Raft out of memory");
break;
case RAFT_ERR_LEADER_TRANSFER_IN_PROGRESS:
RedisModule_ReplyWithError(ctx, "ERR transfer already in progress");
break;
case RAFT_ERR_INVALID_NODEID:
RedisModule_ReplyWithError(ctx, "ERR invalid node id");
break;
default:
snprintf(buf, sizeof(buf), "ERR Raft error %d", error);
RedisModule_ReplyWithError(ctx, buf);
break;
if (err == RAFT_ERR_SHUTDOWN) {
LOG_WARNING("Raft requires immediate shutdown!");
shutdownServer(rr);
}

if (msg) {
snprintf(buf, sizeof(buf), "ERR %s (%s)", msg, raft_get_error_str(err));
} else {
snprintf(buf, sizeof(buf), "ERR %s", raft_get_error_str(err));
}

RedisModule_ReplyWithError(ctx, buf);
}

/* Create a -MOVED reply. */
Expand Down Expand Up @@ -281,3 +267,17 @@ void raftNodeIdToString(char *output, const char *dbid, raft_node_id_t raft_id)
{
snprintf(output, RAFT_SHARDGROUP_NODEID_LEN + 1, "%.32s%08x", dbid, raft_id);
}

/* Try to shut down gracefully first, on failure abort() */
void shutdownServer(RedisRaftCtx *rr)
{
RedisModuleCallReply *r = RedisModule_Call(rr->ctx, "SHUTDOWN", "cE", "NOW");

size_t len;
const char *err = RedisModule_CallReplyStringPtr(r, &len);

LOG_WARNING("Shutdown failure: %.*s", (int) len, err);
(void) err;

abort();
}
2 changes: 2 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ static const char *trace_names[] = {
"raftlib",
"raftlog",
"generic",
"migration",
"all",
};

Expand All @@ -37,6 +38,7 @@ static const int trace_flags[] = {
TRACE_RAFTLIB,
TRACE_RAFTLOG,
TRACE_GENERIC,
TRACE_MIGRATION,
TRACE_ALL,
};

Expand Down
5 changes: 2 additions & 3 deletions src/migrate.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ int cmdRaftImport(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)

int e = raft_recv_entry(rr->raft, entry, NULL);
if (e != 0) {
replyRaftError(req->ctx, e);
replyRaftError(req->ctx, NULL, e);
entryDetachRaftReq(rr, entry);
raft_entry_release(entry);
goto fail;
Expand Down Expand Up @@ -215,8 +215,7 @@ static void raftAppendRaftUnlockDeleteEntry(RedisRaftCtx *rr, RaftReq *req)

int e = raft_recv_entry(rr->raft, entry, NULL);
if (e != 0) {
RedisModule_ReplyWithError(req->ctx, "ERR Unable to unlock/delete migrated keys, try again");
replyRaftError(req->ctx, e);
replyRaftError(req->ctx, "Unable to unlock/delete migrated keys, try again", e);
entryDetachRaftReq(rr, entry);
raft_entry_release(entry);
goto error;
Expand Down
32 changes: 19 additions & 13 deletions src/raft.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,7 @@ void shutdownAfterRemoval(RedisRaftCtx *rr)
archiveSnapshot(rr);
}

RedisModuleCallReply *r = RedisModule_Call(rr->ctx, "SHUTDOWN", "cE", "NOW");

size_t len;
const char *err = RedisModule_CallReplyStringPtr(r, &len);
LOG_WARNING("Shutdown failure: %.*s", (int) len, err);

abort();
shutdownServer(rr);
}

RaftReq *entryDetachRaftReq(RedisRaftCtx *rr, raft_entry_t *entry)
Expand Down Expand Up @@ -308,15 +302,17 @@ static void *getClientSession(RedisRaftCtx *rr, RaftRedisCommandArray *cmds)
unsigned long long id = cmds->client_id;
int nokey;

void *client_session = RedisModule_DictGetC(rr->client_session_dict, &id, sizeof(id), &nokey);
ClientSession *client_session = RedisModule_DictGetC(rr->client_session_dict, &id, sizeof(id), &nokey);

if (nokey) {
RaftRedisCommand *c = cmds->commands[0];
size_t cmd_len;
const char *cmd = RedisModule_StringPtrLen(c->argv[0], &cmd_len);

if (cmd_len == 5 && strncasecmp("watch", cmd, cmd_len) == 0) {
client_session = RedisModule_Alloc(sizeof(void *));
client_session = RedisModule_Alloc(sizeof(ClientSession));
client_session->client_id = id;
client_session->local = false;
RedisModule_DictSetC(rr->client_session_dict, &id, sizeof(id), client_session);
}
}
Expand Down Expand Up @@ -366,7 +362,10 @@ void RaftExecuteCommandArray(RedisRaftCtx *rr,
return;
}

void *client_session = getClientSession(rr, cmds);
ClientSession *client_session = getClientSession(rr, cmds);
if (client_session && reply_ctx) {
client_session->local = true;
}
(void) client_session;

for (int i = 0; i < cmds->len; i++) {
Expand Down Expand Up @@ -492,7 +491,7 @@ static void lockKeys(RedisRaftCtx *rr, raft_entry_t *entry)
if (RedisModule_KeyExists(rr->ctx, keys[i])) {
size_t str_len;
const char *str = RedisModule_StringPtrLen(keys[i], &str_len);
LOG_VERBOSE("locking %.*s", (int) str_len, str);
MIGRATION_TRACE("Locking key: %.*s", (int) str_len, str);

RedisModule_DictSet(rr->locked_keys, keys[i], NULL);
}
Expand Down Expand Up @@ -534,6 +533,11 @@ static void unlockDeleteKeys(RedisRaftCtx *rr, raft_entry_t *entry)
RedisModule_FreeCallReply(reply);

for (size_t i = 0; i < num_keys; i++) {
size_t str_len;
const char *str = RedisModule_StringPtrLen(keys[i], &str_len);

MIGRATION_TRACE("Unlocking key: %.*s", (int) str_len, str);

RedisModule_DictDel(rr->locked_keys, keys[i], NULL);
RedisModule_FreeString(rr->ctx, keys[i]);
}
Expand Down Expand Up @@ -577,10 +581,12 @@ static void handleEndClientSession(RedisRaftCtx *rr, raft_entry_t *entry)

static void clearClientSessions(RedisRaftCtx *rr)
{
void *client_session;
ClientSession *client_session;
RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(rr->client_session_dict, "^", NULL, 0);

while (RedisModule_DictNextC(iter, NULL, (void **) &client_session) != NULL) {
if (client_session->local) {
RedisModule_DeauthenticateAndCloseClient(rr->ctx, client_session->client_id);
}
freeClientSession(client_session);
}
RedisModule_DictIteratorStop(iter);
Expand Down
15 changes: 8 additions & 7 deletions src/redisraft.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ static int cmdRaftNode(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
int e = raft_recv_entry(rr->raft, entry, NULL);
if (e != 0) {
entryDetachRaftReq(rr, entry);
replyRaftError(req->ctx, e);
replyRaftError(req->ctx, NULL, e);
raft_entry_release(entry);
RaftReqFree(req);
return REDISMODULE_OK;
Expand Down Expand Up @@ -192,7 +192,7 @@ static int cmdRaftNode(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
int e = raft_recv_entry(rr->raft, entry, NULL);
if (e != 0) {
entryDetachRaftReq(rr, entry);
replyRaftError(req->ctx, e);
replyRaftError(req->ctx, NULL, e);
raft_entry_release(entry);
RaftReqFree(req);
return REDISMODULE_OK;
Expand Down Expand Up @@ -243,7 +243,7 @@ static int cmdRaftTransferLeader(RedisModuleCtx *ctx, RedisModuleString **argv,

int err = raft_transfer_leader(rr->raft, target, 0);
if (err != 0) {
replyRaftError(ctx, err);
replyRaftError(ctx, NULL, err);
return REDISMODULE_OK;
}

Expand Down Expand Up @@ -528,7 +528,7 @@ static void handleMigrateCommand(RedisRaftCtx *rr, RedisModuleCtx *ctx, RaftRedi
entryAttachRaftReq(rr, entry, req);
int e = raft_recv_entry(rr->raft, entry, NULL);
if (e != 0) {
replyRaftError(req->ctx, e);
replyRaftError(req->ctx, NULL, e);
RaftReqFree(req);
entryDetachRaftReq(rr, entry);
}
Expand Down Expand Up @@ -569,7 +569,7 @@ static void appendEndClientSession(RedisRaftCtx *rr, RaftReq *req, unsigned long

int e = raft_recv_entry(rr->raft, entry, NULL);
if (req && e != 0) {
replyRaftError(req->ctx, e);
replyRaftError(req->ctx, NULL, e);
RaftReqFree(req);
entryDetachRaftReq(rr, entry);
}
Expand Down Expand Up @@ -751,7 +751,8 @@ static void handleRedisCommandAppend(RedisRaftCtx *rr,

int rc = raft_recv_read_request(rr->raft, handleReadOnlyCommand, req);
if (rc != 0) {
replyRaftError(ctx, rc);
replyRaftError(ctx, NULL, rc);
RaftReqFree(req);
}
return;
}
Expand All @@ -767,7 +768,7 @@ static void handleRedisCommandAppend(RedisRaftCtx *rr,

int e = raft_recv_entry(rr->raft, entry, NULL);
if (e != 0) {
replyRaftError(ctx, e);
replyRaftError(ctx, NULL, e);
entryDetachRaftReq(rr, entry);
raft_entry_release(entry);
RaftReqFree(req);
Expand Down
26 changes: 18 additions & 8 deletions src/redisraft.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,14 @@ extern RedisModuleCtx *redisraft_log_ctx;
#define LOG_LEVEL_WARNING 3
#define LOG_LEVEL_COUNT (LOG_LEVEL_WARNING + 1)

#define TRACE_OFF 0
#define TRACE_NODE 1
#define TRACE_CONN 2
#define TRACE_RAFTLIB 4
#define TRACE_RAFTLOG 8
#define TRACE_GENERIC 16
#define TRACE_ALL ((TRACE_GENERIC * 2) - 1)
#define TRACE_OFF 0
#define TRACE_NODE (1 << 0)
#define TRACE_CONN (1 << 1)
#define TRACE_RAFTLIB (1 << 2)
#define TRACE_RAFTLOG (1 << 3)
#define TRACE_GENERIC (1 << 4)
#define TRACE_MIGRATION (1 << 5)
#define TRACE_ALL ((TRACE_MIGRATION * 2) - 1)

#define LOG(level, fmt, ...) \
do { \
Expand Down Expand Up @@ -123,6 +124,9 @@ extern RedisModuleCtx *redisraft_log_ctx;
#define NODE_LOG_NOTICE(node, fmt, ...) NODE_LOG(LOG_LEVEL_NOTICE, node, fmt, ##__VA_ARGS__)
#define NODE_LOG_WARNING(node, fmt, ...) NODE_LOG(LOG_LEVEL_WARNING, node, fmt, ##__VA_ARGS__)

#define MIGRATION_TRACE(fmt, ...) \
TRACE_MODULE(MIGRATION, "<migration> " fmt, ##__VA_ARGS__)

/* -------------------- Connections -------------------- */

/* Longest length of a NodeAddr string, including null terminator */
Expand Down Expand Up @@ -756,6 +760,11 @@ typedef struct ClientState {
bool watched;
} ClientState;

typedef struct ClientSession {
unsigned long long client_id;
bool local;
} ClientSession;

/* common.c */
void joinLinkIdleCallback(Connection *conn);
void joinLinkFreeCallback(void *privdata);
Expand All @@ -768,8 +777,9 @@ RRStatus checkRaftState(RedisRaftCtx *rr, RedisModuleCtx *ctx);
bool parseMovedReply(const char *str, NodeAddr *addr);
void raftNodeToString(char *output, const char *dbid, raft_node_t *raft_node);
void raftNodeIdToString(char *output, const char *dbid, raft_node_id_t raft_id);
void shutdownServer(RedisRaftCtx *rr);
/* common.c - common reply function */
void replyRaftError(RedisModuleCtx *ctx, int error);
void replyRaftError(RedisModuleCtx *ctx, const char *msg, int error);
void replyRedirect(RedisModuleCtx *ctx, unsigned int slot, NodeAddr *addr);
void replyAsk(RedisModuleCtx *ctx, unsigned int slot, NodeAddr *addr);
void replyCrossSlot(RedisModuleCtx *ctx);
Expand Down
Loading

0 comments on commit 8454b27

Please sign in to comment.