Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bugs mainly caused by snapshot #118

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
*.o
*.gcno
*~
libcraft.so
libcraft.a
*.so
*.a
*.gcda
/tests.c
__pycache__
.hypothesis
*.gcov
CLinkedListQueue/
tests/main_test.c
tests_main

19 changes: 12 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ TEST_DIR = ./tests
LLQUEUE_DIR = $(CONTRIB_DIR)/CLinkedListQueue
VPATH = src

GCOV_OUTPUT = *.gcda *.gcno *.gcov
GCOV_OUTPUT = *.gcda *.gcno *.gcov src/*.gcda src/*.gcno src/*.gcov
GCOV_CCFLAGS = -fprofile-arcs -ftest-coverage
SHELL = /bin/bash
CFLAGS += -Iinclude -Werror -Werror=return-type -Werror=uninitialized -Wcast-align \
override CFLAGS += -Iinclude -Werror -Werror=return-type -Werror=uninitialized -Wcast-align \
-Wno-pointer-sign -fno-omit-frame-pointer -fno-common -fsigned-char \
-Wunused-variable \
$(GCOV_CCFLAGS) -I$(LLQUEUE_DIR) -Iinclude -g -O2 -fPIC
Expand All @@ -18,10 +18,10 @@ ASANFLAGS = -fsanitize=address
SHAREDFLAGS = -dynamiclib
SHAREDEXT = dylib
# We need to include the El Capitan specific /usr/includes, aargh
CFLAGS += -I/Applications/Xcode.app/Contents/Developer/Platforms/MacOSX.platform/Developer/SDKs/MacOSX10.11.sdk/usr/include/
CFLAGS += -I/Applications/Xcode.app/Contents/Developer/Platforms/MacOSX.platform/Developer/SDKs/MacOSX10.12.sdk/usr/include
CFLAGS += $(ASANFLAGS)
CFLAGS += -Wno-nullability-completeness
override CFLAGS += -I/Applications/Xcode.app/Contents/Developer/Platforms/MacOSX.platform/Developer/SDKs/MacOSX10.11.sdk/usr/include/
override CFLAGS += -I/Applications/Xcode.app/Contents/Developer/Platforms/MacOSX.platform/Developer/SDKs/MacOSX10.12.sdk/usr/include
override CFLAGS += $(ASANFLAGS)
override CFLAGS += -Wno-nullability-completeness
else
SHAREDFLAGS = -shared
SHAREDEXT = so
Expand Down Expand Up @@ -96,4 +96,9 @@ clean:
@rm -f $(TEST_DIR)/main_test.c src/*.o $(GCOV_OUTPUT); \
if [ -f "libraft.$(SHAREDEXT)" ]; then rm libraft.$(SHAREDEXT); fi;\
if [ -f libraft.a ]; then rm libraft.a; fi;\
if [ -f tests_main ]; then rm tests_main; fi;
if [ -f tests_main ]; then rm tests_main; fi;\
if [ -f tests.c ]; then rm tests.c; fi;\
if [ -f tests.o ]; then rm tests.o; fi;\
if [ -f tests.cpython* ]; then rm tests.cpython*; fi;\
if [ -d CLinkedListQueue ]; then rm -rf CLinkedListQueue; fi;\
if [ -d .hypothesis ]; then rm -rf .hypothesis; fi;
157 changes: 104 additions & 53 deletions src/raft_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ int raft_recv_appendentries_response(raft_server_t* me_,
raft_index_t next_idx = raft_node_get_next_idx(node);
assert(0 < next_idx);
/* Stale response -- ignore */
if (r->current_idx < match_idx)
if (match_idx == next_idx - 1)
return 0;
if (r->current_idx < next_idx - 1)
raft_node_set_next_idx(node, min(r->current_idx + 1, raft_get_current_idx(me_)));
Expand Down Expand Up @@ -416,7 +416,7 @@ int raft_recv_appendentries(
}
else if (ae->term < me->current_term)
{
/* 1. Reply false if term < currentTerm (§5.1) */
/* Reply false if term < currentTerm (§5.1) */
__log(me_, node, "AE term %d is less than current term %d",
ae->term, me->current_term);
goto out;
Expand All @@ -427,52 +427,51 @@ int raft_recv_appendentries(

me->timeout_elapsed = 0;

/* Not the first appendentries we've received */
/* NOTE: the log starts at 1 */
if (0 < ae->prev_log_idx)
if (ae->prev_log_idx == me->snapshot_last_idx && me->snapshot_last_term != ae->prev_log_term)
{
raft_entry_t* ety = raft_get_entry_from_idx(me_, ae->prev_log_idx);
/* Should never happen; something is seriously wrong! */
__log(me_, node, "Snapshot AE prev conflicts with committed entry");
e = RAFT_ERR_SHUTDOWN;
goto out;
}

/* Is a snapshot */
if (ae->prev_log_idx == me->snapshot_last_idx)
{
if (me->snapshot_last_term != ae->prev_log_term)
{
/* Should never happen; something is seriously wrong! */
__log(me_, node, "Snapshot AE prev conflicts with committed entry");
e = RAFT_ERR_SHUTDOWN;
goto out;
}
}
/* 2. Reply false if log doesn't contain an entry at prevLogIndex
whose term matches prevLogTerm (§5.3) */
else if (!ety)
{
__log(me_, node, "AE no log at prev_idx %d", ae->prev_log_idx);
goto out;
}
else if (ety->term != ae->prev_log_term)
/* Reject entries that would leave a gap */
if (ae->prev_log_idx > raft_get_current_idx(me_)) {
__log(me_, node, "AE prev_idx is greater than current idx pli:%d ci:%d",
ae->prev_log_idx, raft_get_current_idx(me_));
goto out;
}

/* NOTE: the log starts at 1 */
raft_entry_t* ety = raft_get_entry_from_idx(me_, ae->prev_log_idx);

/* ety is NULL when:
* 1. prev_log_idx is 0
* 2. log at prev_log_idx has been compacted
* 3. prev_log_idx is greater than current index (already replied false)
*/
if (ety && ety->term != ae->prev_log_term)
{
/* log mismatch */
__log(me_, node, "AE term doesn't match prev_term (ie. %d vs %d) ci:%d comi:%d lcomi:%d pli:%d",
ety->term, ae->prev_log_term, raft_get_current_idx(me_),
raft_get_commit_idx(me_), ae->leader_commit, ae->prev_log_idx);
if (ae->prev_log_idx <= raft_get_commit_idx(me_))
{
__log(me_, node, "AE term doesn't match prev_term (ie. %d vs %d) ci:%d comi:%d lcomi:%d pli:%d",
ety->term, ae->prev_log_term, raft_get_current_idx(me_),
raft_get_commit_idx(me_), ae->leader_commit, ae->prev_log_idx);
if (ae->prev_log_idx <= raft_get_commit_idx(me_))
{
/* Should never happen; something is seriously wrong! */
__log(me_, node, "AE prev conflicts with committed entry");
e = RAFT_ERR_SHUTDOWN;
goto out;
}
/* Delete all the following log entries because they don't match */
e = raft_delete_entry_from_idx(me_, ae->prev_log_idx);
/* Should never happen; something is seriously wrong! */
__log(me_, node, "AE prev conflicts with committed entry");
e = RAFT_ERR_SHUTDOWN;
goto out;
}
/* Delete all the following log entries because they don't match */
e = raft_delete_entry_from_idx(me_, ae->prev_log_idx);
goto out;
}

r->success = 1;
r->current_idx = ae->prev_log_idx;

/* 3. If an existing entry conflicts with a new one (same index
/* If an existing entry conflicts with a new one (same index
but different terms), delete the existing entry and all that
follow it (§5.3) */
int i;
Expand All @@ -481,7 +480,15 @@ int raft_recv_appendentries(
raft_entry_t* ety = &ae->entries[i];
raft_index_t ety_index = ae->prev_log_idx + 1 + i;
raft_entry_t* existing_ety = raft_get_entry_from_idx(me_, ety_index);
if (existing_ety && existing_ety->term != ety->term)
if (ety_index <= log_get_base(((raft_server_private_t*)me_)->log))
{
/* Already snapshotted */
}
else if (!existing_ety)
{
break;
}
else if (existing_ety->term != ety->term)
{
if (ety_index <= raft_get_commit_idx(me_))
{
Expand All @@ -497,8 +504,6 @@ int raft_recv_appendentries(
goto out;
break;
}
else if (!existing_ety)
break;
r->current_idx = ety_index;
}

Expand Down Expand Up @@ -898,7 +903,7 @@ int raft_send_appendentries(raft_server_t* me_, raft_node_t* node)
raft_index_t next_idx = raft_node_get_next_idx(node);

/* figure out if the client needs a snapshot sent */
if (0 < me->snapshot_last_idx && next_idx < me->snapshot_last_idx)
if (next_idx <= me->snapshot_last_idx)
{
if (me->cb.send_snapshot)
me->cb.send_snapshot(me_, me->udata, node);
Expand Down Expand Up @@ -948,7 +953,7 @@ int raft_send_appendentries_all(raft_server_t* me_)
continue;

e = raft_send_appendentries(me_, me->nodes[i]);
if (0 != e)
if (0 != e && RAFT_ERR_NEEDS_SNAPSHOT != e)
return e;
}

Expand Down Expand Up @@ -1346,7 +1351,7 @@ int raft_end_snapshot(raft_server_t *me_)
raft_index_t next_idx = raft_node_get_next_idx(node);

/* figure out if the client needs a snapshot sent */
if (0 < me->snapshot_last_idx && next_idx < me->snapshot_last_idx)
if (next_idx <= me->snapshot_last_idx)
{
if (me->cb.send_snapshot)
me->cb.send_snapshot(me_, me->udata, node);
Expand All @@ -1373,15 +1378,59 @@ int raft_begin_load_snapshot(
if (last_included_index < me->last_applied_idx)
return -1;

/* snapshot was unnecessary */
if (last_included_index < raft_get_current_idx(me_))
return -1;
if (last_included_index <= raft_get_current_idx(me_)) {
raft_entry_t* ety = raft_get_entry_from_idx(me_, last_included_index);
/* ety is NULL when:
* 1. last_included_index is 0 (already returned false)
* 2. log at last_included_index has been compacted (stale msg or already loaded snapshot)
* 3. last_included_index is greater than current index (not in the current condition)
*/

if (last_included_term == me->snapshot_last_term && last_included_index == me->snapshot_last_idx)
return RAFT_ERR_SNAPSHOT_ALREADY_LOADED;
if (last_included_index == me->snapshot_last_idx)
{
/* already loaded */
if (me->snapshot_last_term != last_included_term)
{
/* Should never happen; something is seriously wrong! */
__log(me_, NULL, "Load snapshot last_included_index conflicts with committed entry");
return RAFT_ERR_SHUTDOWN;
}
return RAFT_ERR_SNAPSHOT_ALREADY_LOADED;
}
else if (!ety)
{
/* stale msg */
__log(me_, NULL, "Load snapshot log at last_included_index was compacted lii:%d", last_included_index);
return -1;
}
else if (ety->term != last_included_term)
{
/* log mismatch */
__log(me_, NULL, "Load snapshot term doesn't match last_included_term (ie. %d vs %d) ci:%d comi:%d lii:%d",
ety->term, last_included_term, raft_get_current_idx(me_),
raft_get_commit_idx(me_), last_included_index);
if (last_included_index <= raft_get_commit_idx(me_))
{
/* Should never happen; something is seriously wrong! */
__log(me_, NULL, "Load snapshot last_included_index conflicts with committed entry");
return RAFT_ERR_SHUTDOWN;
}
/* Snapshot is necessary */
/* No need to delete mismatch log entries because loading snapshot clears all */
}
else {
/* Snapshot was unnecessary, reply success to inform the Leader to update next idx and match idx */
return RAFT_ERR_SNAPSHOT_ALREADY_LOADED;
/* Another choice: can also load the snapshot. However, matching logs should be retained.
* Since Follower itself can call raft_begin_snapshot, there is no need to load the snapshot */
}
}

me->current_term = last_included_term;
me->voted_for = -1;
if (me->current_term < last_included_term) {
int e = raft_set_current_term(me_, last_included_term);
if (0 != e)
return e;
}
raft_set_state((raft_server_t*)me, RAFT_STATE_FOLLOWER);
me->current_leader = NULL;

Expand All @@ -1399,8 +1448,10 @@ int raft_begin_load_snapshot(
{
if (raft_get_nodeid(me_) == raft_node_get_id(me->nodes[i]))
my_node_by_idx = i;
else
raft_node_set_active(me->nodes[i], 0);
else {
raft_node_free(me->nodes[i]);
me->nodes[i] = NULL;
}
}

/* this will be realloc'd by a raft_add_node */
Expand Down
10 changes: 10 additions & 0 deletions src/raft_server_properties.c
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,16 @@ raft_term_t raft_get_last_log_term(raft_server_t* me_)
raft_entry_t* ety = raft_get_entry_from_idx(me_, current_idx);
if (ety)
return ety->term;
else
{
/* ety is NULL when:
* 1. current_idx is 0
* 2. current_idx is compacted (must equals to snapshot_last_idx)
* 3. current_idx is greater than current_idx (never)
*/
assert(current_idx == raft_get_snapshot_last_idx(me_));
return raft_get_snapshot_last_term(me_);
}
}
return 0;
}
Expand Down
Loading