Skip to content

Commit

Permalink
Merge pull request #787 from Netflix/dev
Browse files Browse the repository at this point in the history
Update v0.6 branch with latest fixes/improvements
  • Loading branch information
smukil committed Jan 20, 2021
2 parents 6fd8ee4 + b56cf6a commit 82f86e4
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 65 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Contributions will be accepted to the [dev](https://github.com/Netflix/dynomite/

c. Push to your branch (git push origin my_branch)

d. Initiate a pull request on github ( http:https://help.github.com/send-pull-requests/ )
d. Initiate a pull request on github ( http:https://help.github.com/en/articles/creating-a-pull-request/ )

e. Done :)

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

# Dynomite

[![Build Status](https://secure.travis-ci.org/Netflix/dynomite.png)](http:https://travis-ci.org/Netflix/dynomite)
[![Build Status](https://travis-ci.com/Netflix/dynomite.svg)](http:https://travis-ci.com/Netflix/dynomite)
[![Dev chat at https://gitter.im/Netflix/dynomite](https://badges.gitter.im/Netflix/dynomite.svg)](https://gitter.im/Netflix/dynomite?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![Apache V2 License](http:https://img.shields.io/badge/license-Apache%20V2-blue.svg)](https://github.com/Netflix/dynomite/blob/dev/LICENSE)

Expand Down Expand Up @@ -59,7 +59,7 @@ To build Dynomite in _debug mode_:

## Configuration

Dynomite can be configured through a YAML file specified by the -c or --conf-file command-line argument on process start. The configuration files parses and understands the following keys:
Dynomite can be configured through a YAML 1.1 (YAML 1.1 is not JSON compatible) file specified by the -c or --conf-file command-line argument on process start. The configuration files parses and understands the following keys:

+ **env**: Specify environment of a node. Currently supports aws and network (for physical datacenter).
+ **datacenter**: The name of the datacenter. Please refer to [architecture document](https://github.com/Netflix/dynomite/wiki/Architecture).
Expand Down
2 changes: 2 additions & 0 deletions src/dyn_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ static void client_close(struct context *ctx, struct conn *conn) {
*/
static rstatus_t client_handle_response(struct context *ctx, struct conn *conn, msgid_t reqid,
struct msg *rsp) {


// now the handler owns the response.
ASSERT(conn->type == CONN_CLIENT);
// Fetch the original request
Expand Down
24 changes: 12 additions & 12 deletions src/dyn_crypto.c
Original file line number Diff line number Diff line change
Expand Up @@ -312,17 +312,17 @@ rstatus_t dyn_aes_encrypt(const unsigned char *msg, size_t msg_len,
// aes_iv)) {
if (!EVP_EncryptInit_ex(aes_encrypt_ctx, aes_cipher, NULL, arg_aes_key,
arg_aes_key)) {
loga_hexdump(
msg, msg_len,
log_hexdump(
LOG_DEBUG, msg, msg_len,
"Bad data in EVP_EncryptInit_ex, crypto data with %ld bytes of data",
msg_len);
return DN_ERROR;
}

if (!EVP_EncryptUpdate(aes_encrypt_ctx, mbuf->start, &block_len,
(unsigned char *)msg, (int)msg_len)) {
loga_hexdump(
msg, msg_len,
log_hexdump(
LOG_DEBUG, msg, msg_len,
"Bad data in EVP_EncryptUpdate, crypto data with %ld bytes of data",
msg_len);
return DN_ERROR;
Expand All @@ -331,8 +331,8 @@ rstatus_t dyn_aes_encrypt(const unsigned char *msg, size_t msg_len,

if (!EVP_EncryptFinal_ex(aes_encrypt_ctx, mbuf->start + enc_msg_len,
&block_len)) {
loga_hexdump(
msg, msg_len,
log_hexdump(
LOG_DEBUG, msg, msg_len,
"Bad data in EVP_EncryptFinal_ex, crypto data with %ld bytes of data",
msg_len);
return DN_ERROR;
Expand Down Expand Up @@ -366,17 +366,17 @@ rstatus_t dyn_aes_decrypt(unsigned char *enc_msg, size_t enc_msg_len,
// aes_iv)) {
if (!EVP_DecryptInit_ex(aes_decrypt_ctx, aes_cipher, NULL, arg_aes_key,
arg_aes_key)) {
loga_hexdump(
enc_msg, enc_msg_len,
log_hexdump(
LOG_DEBUG, enc_msg, enc_msg_len,
"Bad data in EVP_DecryptInit_ex, crypto data with %ld bytes of data",
enc_msg_len);
return DN_ERROR;
}

if (!EVP_DecryptUpdate(aes_decrypt_ctx, mbuf->pos, (int *)&block_len,
enc_msg, (int)enc_msg_len)) {
loga_hexdump(
enc_msg, enc_msg_len,
log_hexdump(
LOG_DEBUG, enc_msg, enc_msg_len,
"Bad data in EVP_DecryptUpdate, crypto data with %ld bytes of data",
enc_msg_len);
return DN_ERROR;
Expand All @@ -385,8 +385,8 @@ rstatus_t dyn_aes_decrypt(unsigned char *enc_msg, size_t enc_msg_len,

if (!EVP_DecryptFinal_ex(aes_decrypt_ctx, mbuf->pos + dec_len,
(int *)&block_len)) {
loga_hexdump(
enc_msg, enc_msg_len,
log_hexdump(
LOG_DEBUG, enc_msg, enc_msg_len,
"Bad data in EVP_DecryptFinal_ex, crypto data with %ld bytes of data",
enc_msg_len);
return DN_ERROR;
Expand Down
2 changes: 1 addition & 1 deletion src/dyn_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ int log_init(int level, char *name) {
if (name == NULL || !strlen(name)) {
l->fd = STDERR_FILENO;
} else {
l->fd = open(name, O_WRONLY | O_APPEND | O_CREAT, 0644);
l->fd = open(name, O_WRONLY | O_APPEND | O_CREAT, 0666);
if (l->fd < 0) {
log_stderr("opening log file '%s' failed: %s", name, strerror(errno));
return -1;
Expand Down
2 changes: 2 additions & 0 deletions src/dyn_mbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ static uint64_t mbuf_alloc_count = 0;

uint64_t mbuf_alloc_get_count(void) { return mbuf_alloc_count; }

size_t mbuf_chunk_sz() { return mbuf_chunk_size; }

static struct mbuf *_mbuf_get(void) {
struct mbuf *mbuf;
uint8_t *buf;
Expand Down
2 changes: 2 additions & 0 deletions src/dyn_mbuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ static inline bool mbuf_full(struct mbuf *mbuf) {
return mbuf->last == mbuf->end ? true : false;
}

size_t mbuf_chunk_sz();

void mbuf_init(size_t mbuf_chunk_size);
void mbuf_deinit(void);
struct mbuf *mbuf_get(void);
Expand Down
128 changes: 79 additions & 49 deletions src/proto/dyn_redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -3053,7 +3053,19 @@ static rstatus_t redis_copy_bulk(struct msg *dst, struct msg *src, bool log) {
log_notice("dumping mbuf");
mbuf_dump(mbuf);
}
if (mbuf_length(mbuf) <= len) { /* steal this buf from src to dst */

size_t remaining_mbuf_len = mbuf_length(mbuf);
if (remaining_mbuf_len <= len &&
remaining_mbuf_len == mbuf_chunk_sz()) {
// Steal the entire buffer from src to dst if we need the whole buffer
//
// We only copy if it's the entire buffer because of the way our
// encryption/decryption works. Sending out multiple partially filled
// mbufs will fail to decrypt on the receiving side. This is because
// in the source side we encrypt each mbuf seperately but if all the
// partial mbufs can fit into one full mbuf on the receiving side,
// it will do so and hence fail to decrypt.
// TODO: Change this when the whole broken crypto scheme is changed.
nbuf = STAILQ_NEXT(mbuf, next);
mbuf_remove(&src->mhdr, mbuf);
if (dst != NULL) {
Expand Down Expand Up @@ -3419,33 +3431,90 @@ static rstatus_t redis_fragment_argx(struct msg *r, struct server_pool *pool,
r->nfrag = 0;
r->frag_owner = r;

for (i = 0; i < array_n(r->keys); i++) { /* for each key */
struct msg *sub_msg;
// Calculate number of tokens per participating peer.
// We need to know the total number of tokens before proceeding with
// crafting each peer's command since we're trying to fit as much as
// possible into one MBUF and not split them for convenience.
// TODO: This is the case because of our broken crypto scheme. Change once
// that is fixed.
for (i = 0; i < array_n(r->keys); i++) {
struct keypos *kpos = array_get(r->keys, i);
// use hash-tagged start and end for forwarding.
uint32_t idx = dnode_peer_idx_for_key_on_rack(
pool, rack, kpos->tag_start, kpos->tag_end - kpos->tag_start);

if (sub_msgs[idx] == NULL) {
sub_msgs[idx] = msg_get(r->owner, r->is_request, __FUNCTION__);
if (sub_msgs[idx] == NULL) {
dn_free(sub_msgs);
return DN_ENOMEM;
}

// Every 'sub_msg' is counted as one fragment.
r->nfrag++;
}

// One token for the key
sub_msgs[idx]->ntokens++;

// One token for the value (eg: for MSET)
if (key_step != 1) sub_msgs[idx]->ntokens++;
r->frag_seq[i] = sub_msgs[idx];

loga("frag_seq[%d]: %x || idx: %d sub_msgs[idx]: %x", i,
r->frag_seq[i], idx, sub_msgs[idx]);

}

for (i = 0; i < array_n(r->keys); i++) { /* for each key */
struct msg *sub_msg;
struct keypos *kpos = array_get(r->keys, i);
// use hash-tagged start and end for forwarding.
uint32_t idx = dnode_peer_idx_for_key_on_rack(
pool, rack, kpos->tag_start, kpos->tag_end - kpos->tag_start);

// We already created the 'sub_msg' in the previous loop.
ASSERT(sub_msgs[idx] != NULL);
sub_msg = sub_msgs[idx];

if (STAILQ_EMPTY(&sub_msg->mhdr)) {
if (r->type == MSG_REQ_REDIS_MGET) {
status = msg_prepend_format(sub_msg, "*%d\r\n$4\r\nmget\r\n",
sub_msg->ntokens + 1);
} else if (r->type == MSG_REQ_REDIS_DEL) {
status = msg_prepend_format(sub_msg, "*%d\r\n$3\r\ndel\r\n",
sub_msg->ntokens + 1);
} else if (r->type == MSG_REQ_REDIS_EXISTS) {
status = msg_prepend_format(sub_msg, "*%d\r\n$6\r\nexists\r\n",
sub_msg->ntokens + 1);
} else if (r->type == MSG_REQ_REDIS_MSET) {
status = msg_prepend_format(sub_msg, "*%d\r\n$4\r\nmset\r\n",
sub_msg->ntokens + 1);
} else {
NOT_REACHED();
}
if (status != DN_OK) {
dn_free(sub_msgs);
return status;
}

sub_msg->type = r->type;
sub_msg->frag_id = r->frag_id;
sub_msg->frag_owner = r->frag_owner;
sub_msg->is_read = r->is_read;

log_info("Fragment %d) %s", i, print_obj(sub_msg));
TAILQ_INSERT_TAIL(frag_msgq, sub_msg, m_tqe);
}
r->frag_seq[i] = sub_msg = sub_msgs[idx];

sub_msg->ntokens++;
status = redis_append_key(sub_msg, kpos);
if (status != DN_OK) {
dn_free(sub_msgs);
return status;
}

if (key_step == 1) { /* mget,del */
if (key_step == 1) { // mget,del
continue;
} else { /* mset */
status = redis_copy_bulk(NULL, r, false); /* eat key */
} else { // mset
status = redis_copy_bulk(NULL, r, false); // eat key
if (status != DN_OK) {
dn_free(sub_msgs);
return status;
Expand All @@ -3456,47 +3525,8 @@ static rstatus_t redis_fragment_argx(struct msg *r, struct server_pool *pool,
dn_free(sub_msgs);
return status;
}

sub_msg->ntokens++;
}
}

log_info("Fragmenting %s", print_obj(r));
for (i = 0; i < total_peers; i++) { /* prepend mget header, and forward it */
struct msg *sub_msg = sub_msgs[i];
if (sub_msg == NULL) {
continue;
}

if (r->type == MSG_REQ_REDIS_MGET) {
status = msg_prepend_format(sub_msg, "*%d\r\n$4\r\nmget\r\n",
sub_msg->ntokens + 1);
} else if (r->type == MSG_REQ_REDIS_DEL) {
status = msg_prepend_format(sub_msg, "*%d\r\n$3\r\ndel\r\n",
sub_msg->ntokens + 1);
} else if (r->type == MSG_REQ_REDIS_EXISTS) {
status = msg_prepend_format(sub_msg, "*%d\r\n$6\r\nexists\r\n",
sub_msg->ntokens + 1);
} else if (r->type == MSG_REQ_REDIS_MSET) {
status = msg_prepend_format(sub_msg, "*%d\r\n$4\r\nmset\r\n",
sub_msg->ntokens + 1);
} else {
NOT_REACHED();
}
if (status != DN_OK) {
dn_free(sub_msgs);
return status;
}

sub_msg->type = r->type;
sub_msg->frag_id = r->frag_id;
sub_msg->frag_owner = r->frag_owner;

log_info("Fragment %d) %s", i, print_obj(sub_msg));
TAILQ_INSERT_TAIL(frag_msgq, sub_msg, m_tqe);
r->nfrag++;
}

dn_free(sub_msgs);
return DN_OK;
}
Expand Down

0 comments on commit 82f86e4

Please sign in to comment.