Skip to content

Commit

Permalink
Implement RESET in async API: Cancel subscriptions and monitor mode
Browse files Browse the repository at this point in the history
Additionally, accept commands in monitor mode. (For example the RESET
command, but also other commands.)

Apart from being useful but itself, this change makes the async API's
reply queue stay in sync (mapping each reply to the callback given
when the command was sent) when bombed with random commands (fuzzing).
  • Loading branch information
zuiderkwast committed Jul 20, 2023
1 parent 5468136 commit 41cf219
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 17 deletions.
83 changes: 70 additions & 13 deletions async.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
#define PUBSUB_REPLY_PATTERN 16
#define PUBSUB_REPLY_SHARDED 32

/* Special negative values for a callback's `pending_replies` fields. */
#define PENDING_REPLY_UNSUBSCRIBE_ALL -1
#define PENDING_REPLY_MONITOR -2
#define PENDING_REPLY_RESET -3

/* Forward declarations of hiredis.c functions */
int __redisAppendCommand(redisContext *c, const char *cmd, size_t len);
void __redisSetError(redisContext *c, int type, const char *str);
Expand Down Expand Up @@ -172,6 +177,7 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
ac->sub.patterns = patterns;
ac->sub.shard_channels = shard_channels;
ac->sub.pending_commands = 0;
ac->monitor_cb = NULL;

return ac;
oom:
Expand Down Expand Up @@ -420,6 +426,10 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
dictRelease(ac->sub.shard_channels);
}

if (ac->monitor_cb != NULL) {
callbackDecrRefCount(ac, ac->monitor_cb);
}

/* Signal event lib to clean up */
_EL_CLEANUP(ac);

Expand Down Expand Up @@ -584,7 +594,9 @@ static int handlePubsubReply(redisAsyncContext *ac, redisReply *reply,
/* If we've unsubscribed to the last channel, the command is done. */
/* Check if this was the last channel unsubscribed. */
assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
if (cb->pending_replies == -1 && reply->element[2]->integer == 0) {
if (cb->pending_replies == PENDING_REPLY_UNSUBSCRIBE_ALL &&
reply->element[2]->integer == 0)
{
cb->pending_replies = 0;
}

Expand Down Expand Up @@ -621,12 +633,28 @@ static int handlePubsubReply(redisAsyncContext *ac, redisReply *reply,
return REDIS_ERR;
}

/* Handle the effects of the RESET command. */
static void handleReset(redisAsyncContext *ac) {
/* Cancel monitoring mode */
ac->c.flags &= ~REDIS_MONITORING;
if (ac->monitor_cb != NULL) {
callbackDecrRefCount(ac, ac->monitor_cb);
ac->monitor_cb = NULL;
}

/* Cancel subscriptions (finalizers are called if any) */
ac->c.flags &= ~REDIS_SUBSCRIBED;
if (ac->sub.channels) dictEmpty(ac->sub.channels);
if (ac->sub.patterns) dictEmpty(ac->sub.patterns);
if (ac->sub.shard_channels) dictEmpty(ac->sub.shard_channels);
}

void redisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
void *reply = NULL;
redisReply *reply = NULL;
int status;

while((status = redisGetReply(c,&reply)) == REDIS_OK) {
while((status = redisGetReply(c, (void**)&reply)) == REDIS_OK) {
if (reply == NULL) {
/* When the connection is being disconnected and there are
* no more replies, this is the cue to really disconnect. */
Expand Down Expand Up @@ -659,6 +687,15 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
continue;
}

/* Send monitored command to monitor callback */
if ((c->flags & REDIS_MONITORING) &&
reply->type == REDIS_REPLY_STATUS && reply->len > 0 &&
reply->str[0] >= '0' && reply->str[0] <= '9')
{
__redisRunCallback(ac, ac->monitor_cb, reply);
continue;
}

/* Get callback from queue which was added when the command was sent. */
redisCallback *cb = NULL;
if (pubsub_reply_flags & PUBSUB_REPLY_MESSAGE) {
Expand Down Expand Up @@ -694,15 +731,33 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
handlePubsubReply(ac, reply, pubsub_reply_flags, cb);
} else {
/* Regular reply. This includes ERR reply for subscribe commands. */

/* Handle special effects of the command's reply, if any. */
if (cb->pending_replies == PENDING_REPLY_RESET &&
reply->type == REDIS_REPLY_STATUS &&
strncmp(reply->str, "RESET", reply->len) == 0)
{
handleReset(ac);
} else if (cb->pending_replies == PENDING_REPLY_MONITOR &&
reply->type == REDIS_REPLY_STATUS &&
strncmp(reply->str, "OK", reply->len) == 0)
{
/* Set monitor flag and callback, freeing any old callback. */
c->flags |= REDIS_MONITORING;
if (ac->monitor_cb != NULL) {
callbackDecrRefCount(ac, ac->monitor_cb);
}
ac->monitor_cb = cb;
callbackIncrRefCount(ac, cb);
}

/* Invoke callback */
__redisRunCallback(ac, cb, reply);
cb->pending_replies = 0;
}

if (cb != NULL) {
/* If in monitor mode, repush the callback */
if ((c->flags & REDIS_MONITORING) && !(c->flags & REDIS_FREEING)) {
__redisPushCallback(&ac->replies, cb);
} else if (cb->pending_replies != 0) {
if (cb->pending_replies != 0) {
/* The command needs more repies. Put it first in queue. */
__redisUnshiftCallback(&ac->replies, cb);
} else {
Expand Down Expand Up @@ -939,15 +994,17 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn,
cb->pending_replies++;
}
if (cb->pending_replies == 0) {
/* No channels specified. This is unsubscribe 'all' or an error. */
cb->pending_replies = -1;
/* No channels specified means unsubscribe all. (This can happens
* for SUBSCRIBE, but it is an error and then the value of pending
* replies doesn't matter.) */
cb->pending_replies = PENDING_REPLY_UNSUBSCRIBE_ALL;
}
c->flags |= REDIS_SUBSCRIBED;
ac->sub.pending_commands++;
} else if (strncasecmp(cstr,"monitor\r\n",9) == 0) {
/* Set monitor flag */
c->flags |= REDIS_MONITORING;
cb->pending_replies = -1;
} else if (strncasecmp(cstr, "monitor", clen) == 0) {
cb->pending_replies = PENDING_REPLY_MONITOR;
} else if (strncasecmp(cstr, "reset", clen) == 0) {
cb->pending_replies = PENDING_REPLY_RESET;
}

if (__redisPushCallback(&ac->replies, cb) != REDIS_OK)
Expand Down
7 changes: 5 additions & 2 deletions async.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ typedef struct redisCallback {
void *privdata;
unsigned int refcount; /* Reference counter used when callback is used
* for multiple pubsub channels. */
int pending_replies; /* Number of replies expected; -1 means
* unsubscribe all. */
int pending_replies; /* Number of replies expected; negative values
* are special. */
} redisCallback;

/* List of callbacks for either regular replies or pub/sub */
Expand Down Expand Up @@ -117,6 +117,9 @@ typedef struct redisAsyncContext {

/* Any configured RESP3 PUSH handler */
redisAsyncPushFn *push_cb;

/* Monitor, only when MONITOR has been called. */
redisCallback *monitor_cb;
} redisAsyncContext;

/* Functions that proxy to hiredis */
Expand Down
5 changes: 5 additions & 0 deletions dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ static int _dictClear(dict *ht) {
return DICT_OK; /* never fails */
}

/* Delete all the keys and values */
static void dictEmpty(dict *ht) {
_dictClear(ht);
}

/* Clear & Release the hash table */
static void dictRelease(dict *ht) {
_dictClear(ht);
Expand Down
1 change: 1 addition & 0 deletions dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ static int dictExpand(dict *ht, unsigned long size);
static int dictAdd(dict *ht, void *key, void *val);
static int dictReplace(dict *ht, void *key, void *val);
static int dictDelete(dict *ht, const void *key);
static void dictEmpty(dict *ht);
static void dictRelease(dict *ht);
static dictEntry * dictFind(dict *ht, const void *key);
static void dictInitIterator(dictIterator *iter, dict *ht);
Expand Down
109 changes: 107 additions & 2 deletions test.c
Original file line number Diff line number Diff line change
Expand Up @@ -2023,6 +2023,108 @@ static void test_monitor(struct config config) {
/* Verify test checkpoints */
assert(state.checkpoint == 3);
}

/* Reset callback for test_reset() */
static void reset_reset_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = r;
TestState *state = privdata;
assert(reply != NULL && reply->elements == 2);
char *str = reply->element[0]->str;
size_t len = reply->element[0]->len;
assert(strncmp(str, "RESET", len) == 0);
state->checkpoint++;
/* Check that when the RESET callback is called, the context has already
* been reset. Monitor and pubsub have been cancelled. */
assert(!(ac->c.flags & REDIS_SUBSCRIBED));
assert(!(ac->c.flags & REDIS_MONITORING));
event_base_loopbreak(base);
}

/* Ping callback for test_reset() */
static void reset_ping_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = r;
TestState *state = privdata;
assert(reply != NULL && reply->elements == 2);
char *str = reply->element[0]->str;
size_t len = reply->element[0]->len;
assert(strncmp(str, "pong", len) == 0);
state->checkpoint++;
redisAsyncCommandWithFinalizer(ac, reset_reset_cb, finalizer_cb, &state,
"reset");
}

/* Subscribe callback for test_reset() */
static void reset_subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = r;
TestState *state = privdata;
assert(reply != NULL &&
reply->type == REDIS_REPLY_ARRAY &&
reply->elements > 0);
char *str = reply->element[0]->str;
size_t len = reply->element[0]->len;
assert(strncmp(str, "subscribe", len) == 0);
state->checkpoint++;
redisAsyncCommandWithFinalizer(ac, reset_ping_cb, finalizer_cb,
&state, "ping");
}

/* Monitor callback for test_reset(). */
static void reset_monitor_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = r;
TestState *state = privdata;
assert(reply != NULL && reply->type == REDIS_REPLY_STATUS);
state->checkpoint++;
if (strncmp(reply->str, "OK", reply->len) == 0) {
/* Reply to the MONITOR command */
redisAsyncCommandWithFinalizer(ac, reset_subscribe_cb, finalizer_cb, &state,
"subscribe %s", "ch");
} else {
/* A monitored command starts with a numeric timestamp, e.g.
* +1689801837.986559 [0 127.0.0.1:44308] "ping" */
assert(reply->str[0] >= '0' && reply->str[0] <= '9');
}
}

/* Check that RESET cancels all subscriptions and monitoring (Redis >= 6.2) */
static void test_reset(struct config config) {
test("RESET cancels monitoring and pubsub: ");
/* Setup event dispatcher with a testcase timeout */
base = event_base_new();
struct event *timeout = evtimer_new(base, timeout_cb, NULL);
assert(timeout != NULL);

evtimer_assign(timeout,base,timeout_cb,NULL);
struct timeval timeout_tv = {.tv_sec = 10};
evtimer_add(timeout, &timeout_tv);

/* Connect */
redisOptions options = get_redis_tcp_options(config);
redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
assert(ac != NULL && ac->err == 0);
redisLibeventAttach(ac,base);

/* Not expecting any push messages in this test */
redisAsyncSetPushCallback(ac,unexpected_push_cb);

/* Start monitor */
TestState state = {.options = &options};
redisAsyncCommandWithFinalizer(ac, reset_monitor_cb, finalizer_cb, &state,
"monitor");

/* Start event dispatching loop */
test_cond(event_base_dispatch(base) == 0);
event_free(timeout);
event_base_free(base);

/* Verify test checkpoint sum.
*
* Replies for monitor, subscribe, ping and reset = 4
* Monitored subscribe and ping = 2
* Finalizer for monitor, subscribe, ping, reset = 4
* Sum: 4 + 2 + 4 = 10 */
assert(state.checkpoint == 10);
}

#endif /* HIREDIS_TEST_ASYNC */

/* tests for async api using polling adapter, requires no extra libraries*/
Expand Down Expand Up @@ -2394,9 +2496,9 @@ int main(int argc, char **argv) {
printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
cfg.type = CONN_TCP;

int major;
int major, minor;
redisContext *c = do_connect(cfg);
get_redis_version(c, &major, NULL);
get_redis_version(c, &major, &minor);
disconnect(c, 0);

test_pubsub_handling(cfg);
Expand All @@ -2406,6 +2508,9 @@ int main(int argc, char **argv) {
test_pubsub_handling_resp3(cfg);
test_command_timeout_during_pubsub(cfg);
}
if (major > 6 || (major == 6 && minor >= 2)) {
test_reset(cfg);
}
#endif /* HIREDIS_TEST_ASYNC */

cfg.type = CONN_TCP;
Expand Down

0 comments on commit 41cf219

Please sign in to comment.