From 342cb9290eccf7c479a59f864988356f06b16730 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Thu, 13 Jul 2023 10:10:04 +0200 Subject: [PATCH 01/11] Rewrite async callback queue (push/shift) to not do allocations The callback queue in the changed so that each callback is allocated once and freed once. The operations "push" and "shift" don't copy the callback struct between heap and stack, but instead they and just set the 'next' pointer to link callbacks into a list, without doing any allocations. For pubsub channels, one callback can be mapped to multiple channels, and they can be unsubscribed independently, so we use a reference counter to keep track of references to a callback. This prepares for adding finalizer support and to be sure it is called only once, when the callback is freed. The way hiredis kept tack of 'pubsub mode' using `pending_subs` and `unsubscribe_sent` was flawed. It couldn't handle multiple pending subscribe commands with an overlapping but different set of channels and it couldn't handle an error reply to a SUBSCRIBE or UNSUBSCRIBE command. Now we change this so that all commands, even SUBSCRIBE and UNSUBSCRIBE, are added to the reply queue and the 'subscribe' and 'unsubscribe' replies are matched against them in the queue. The channel-to-callback dicts are updated when a 'subscribe' or 'unsubscribe' reply is received, confirming that the client has subscribed or unsubscribed to a channel, rather than when the command is sent. This change makes it possible to handle an error reply for a pubsub command. With this change, there is no need to keep a different replies queue for subscribe commands and regular commands. All are added to the same replies queue. --- async.c | 514 +++++++++++++++++++++++++++----------------------------- async.h | 12 +- 2 files changed, 259 insertions(+), 267 deletions(-) diff --git a/async.c b/async.c index f82f567f8..2fcafc14c 100644 --- a/async.c +++ b/async.c @@ -52,28 +52,36 @@ #define assert(e) (void)(e) #endif +/* Internal macros used by checkPubsubReply(). They have one bit each, although + * only some combinations are used. */ +#define PUBSUB_REPLY_MESSAGE 1 +#define PUBSUB_REPLY_SUBSCRIBE 2 +#define PUBSUB_REPLY_UNSUBSCRIBE 4 +#define PUBSUB_REPLY_REGULAR 8 +#define PUBSUB_REPLY_PATTERN 16 +#define PUBSUB_REPLY_SHARDED 32 + /* Forward declarations of hiredis.c functions */ int __redisAppendCommand(redisContext *c, const char *cmd, size_t len); void __redisSetError(redisContext *c, int type, const char *str); +/* Reference counting for callback struct. */ +static void callbackIncrRefCount(redisCallback *cb) { + cb->refcount++; +} +static void callbackDecrRefCount(redisCallback *cb) { + cb->refcount--; + if (cb->refcount == 0) { + hi_free(cb); + } +} + /* Functions managing dictionary of callbacks for pub/sub. */ static unsigned int callbackHash(const void *key) { return dictGenHashFunction((const unsigned char *)key, sdslen((const sds)key)); } -static void *callbackValDup(void *privdata, const void *src) { - ((void) privdata); - redisCallback *dup; - - dup = hi_malloc(sizeof(*dup)); - if (dup == NULL) - return NULL; - - memcpy(dup,src,sizeof(*dup)); - return dup; -} - static int callbackKeyCompare(void *privdata, const void *key1, const void *key2) { int l1, l2; ((void) privdata); @@ -89,14 +97,20 @@ static void callbackKeyDestructor(void *privdata, void *key) { sdsfree((sds)key); } +static void *callbackValDup(void *privdata, const void *val) { + (void)privdata; + callbackIncrRefCount((redisCallback *)val); + return (void *)val; +} + static void callbackValDestructor(void *privdata, void *val) { - ((void) privdata); - hi_free(val); + (void)privdata; + callbackDecrRefCount((redisCallback *)val); } static dictType callbackDict = { callbackHash, - NULL, + NULL, /* key dup */ callbackValDup, callbackKeyCompare, callbackKeyDestructor, @@ -105,7 +119,7 @@ static dictType callbackDict = { static redisAsyncContext *redisAsyncInitialize(redisContext *c) { redisAsyncContext *ac; - dict *channels = NULL, *patterns = NULL; + dict *channels = NULL, *patterns = NULL, *shard_channels = NULL; channels = dictCreate(&callbackDict,NULL); if (channels == NULL) @@ -115,6 +129,10 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) { if (patterns == NULL) goto oom; + shard_channels = dictCreate(&callbackDict,NULL); + if (shard_channels == NULL) + goto oom; + ac = hi_realloc(c,sizeof(redisAsyncContext)); if (ac == NULL) goto oom; @@ -145,16 +163,16 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) { ac->replies.head = NULL; ac->replies.tail = NULL; - ac->sub.replies.head = NULL; - ac->sub.replies.tail = NULL; ac->sub.channels = channels; ac->sub.patterns = patterns; - ac->sub.pending_unsubs = 0; + ac->sub.shard_channels = shard_channels; + ac->sub.pending_commands = 0; return ac; oom: if (channels) dictRelease(channels); if (patterns) dictRelease(patterns); + if (shard_channels) dictRelease(shard_channels); return NULL; } @@ -266,18 +284,9 @@ int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallba } /* Helper functions to push/shift callbacks */ -static int __redisPushCallback(redisCallbackList *list, redisCallback *source) { - redisCallback *cb; - - /* Copy callback from stack to heap */ - cb = hi_malloc(sizeof(*cb)); - if (cb == NULL) - return REDIS_ERR_OOM; - - if (source != NULL) { - memcpy(cb,source,sizeof(*cb)); - cb->next = NULL; - } +static int __redisPushCallback(redisCallbackList *list, redisCallback *cb) { + assert(cb != NULL); + cb->next = NULL; /* Store callback in list */ if (list->head == NULL) @@ -288,22 +297,28 @@ static int __redisPushCallback(redisCallbackList *list, redisCallback *source) { return REDIS_OK; } -static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) { +static int __redisShiftCallback(redisCallbackList *list, redisCallback **target) { redisCallback *cb = list->head; if (cb != NULL) { list->head = cb->next; if (cb == list->tail) list->tail = NULL; - /* Copy callback from heap to stack */ if (target != NULL) - memcpy(target,cb,sizeof(*cb)); - hi_free(cb); + *target = cb; return REDIS_OK; } return REDIS_ERR; } +static int __redisUnshiftCallback(redisCallbackList *list, redisCallback *cb) { + assert(cb != NULL); + cb->next = list->head; + list->head = cb; + return REDIS_OK; +} + +/* Runs callback and frees the reply (except if REDIS_NO_AUTO_FREE_REPLIES is set) */ static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) { redisContext *c = &(ac->c); if (cb->fn != NULL) { @@ -311,6 +326,9 @@ static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisRe cb->fn(ac,reply,cb->privdata); c->flags &= ~REDIS_IN_CALLBACK; } + if (!(c->flags & REDIS_NO_AUTO_FREE_REPLIES) || cb->fn == NULL) { + if (reply != NULL) c->reader->fn->freeObject(reply); + } } static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) { @@ -361,15 +379,15 @@ static void __redisRunDisconnectCallback(redisAsyncContext *ac, int status) /* Helper function to free the context. */ static void __redisAsyncFree(redisAsyncContext *ac) { redisContext *c = &(ac->c); - redisCallback cb; + redisCallback *cb; dictIterator it; dictEntry *de; /* Execute pending callbacks with NULL reply. */ - while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) - __redisRunCallback(ac,&cb,NULL); - while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) - __redisRunCallback(ac,&cb,NULL); + while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) { + __redisRunCallback(ac,cb,NULL); + callbackDecrRefCount(cb); + } /* Run subscription callbacks with NULL reply */ if (ac->sub.channels) { @@ -388,6 +406,14 @@ static void __redisAsyncFree(redisAsyncContext *ac) { dictRelease(ac->sub.patterns); } + if (ac->sub.shard_channels) { + dictInitIterator(&it,ac->sub.shard_channels); + while ((de = dictNext(&it)) != NULL) + __redisRunCallback(ac,dictGetEntryVal(de),NULL); + + dictRelease(ac->sub.shard_channels); + } + /* Signal event lib to clean up */ _EL_CLEANUP(ac); @@ -467,74 +493,120 @@ void redisAsyncDisconnect(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); } -static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) { - redisContext *c = &(ac->c); - dict *callbacks; - redisCallback *cb = NULL; - dictEntry *de; - int pvariant; - char *stype; - sds sname = NULL; +/* Returns a bitwise or of the PUBSUB_REPLY_ macros. 0 means not pubsub. */ +static int checkPubsubReply(redisReply *reply, int expect_push) { /* Match reply with the expected format of a pushed message. * The type and number of elements (3 to 4) are specified at: * https://redis.io/topics/pubsub#format-of-pushed-messages */ - if ((reply->type == REDIS_REPLY_ARRAY && !(c->flags & REDIS_SUPPORTS_PUSH) && reply->elements >= 3) || - reply->type == REDIS_REPLY_PUSH) { - assert(reply->element[0]->type == REDIS_REPLY_STRING); - stype = reply->element[0]->str; - pvariant = (tolower(stype[0]) == 'p') ? 1 : 0; - - if (pvariant) - callbacks = ac->sub.patterns; - else - callbacks = ac->sub.channels; + if (reply->type != (expect_push ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) || + reply->elements < 3 || + reply->elements > 4 || + reply->element[0]->type != REDIS_REPLY_STRING || + reply->element[0]->len < sizeof("message") - 1) + { + return 0; + } - /* Locate the right callback */ - if (reply->element[1]->type == REDIS_REPLY_STRING) { - sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len); - if (sname == NULL) goto oom; + char *str = reply->element[0]->str; + size_t len = reply->element[0]->len; + + if (!strncmp(str, "message", len)) + return PUBSUB_REPLY_MESSAGE | PUBSUB_REPLY_REGULAR; + if (!strncmp(str, "subscribe", len)) + return PUBSUB_REPLY_SUBSCRIBE | PUBSUB_REPLY_REGULAR; + if (!strncmp(str, "unsubscribe", len)) + return PUBSUB_REPLY_UNSUBSCRIBE | PUBSUB_REPLY_REGULAR; + + if (!strncmp(str, "pmessage", len)) + return PUBSUB_REPLY_MESSAGE | PUBSUB_REPLY_PATTERN; + if (!strncmp(str, "psubscribe", len)) + return PUBSUB_REPLY_SUBSCRIBE | PUBSUB_REPLY_PATTERN; + if (!strncmp(str, "punsubscribe", len)) + return PUBSUB_REPLY_UNSUBSCRIBE | PUBSUB_REPLY_PATTERN; + + if (!strncmp(str, "smessage", len)) + return PUBSUB_REPLY_MESSAGE | PUBSUB_REPLY_SHARDED; + if (!strncmp(str, "ssubscribe", len)) + return PUBSUB_REPLY_SUBSCRIBE | PUBSUB_REPLY_SHARDED; + if (!strncmp(str, "sunsubscribe", len)) + return PUBSUB_REPLY_UNSUBSCRIBE | PUBSUB_REPLY_SHARDED; + + return 0; +} - if ((de = dictFind(callbacks,sname)) != NULL) { - cb = dictGetEntryVal(de); - memcpy(dstcb,cb,sizeof(*dstcb)); - } +/* Returns the dict used for callbacks per channel/pattern/shard-channel. */ +static dict *getPubsubCallbackDict(redisAsyncContext *ac, int pubsub_flags) { + if (pubsub_flags & PUBSUB_REPLY_REGULAR) return ac->sub.channels; + if (pubsub_flags & PUBSUB_REPLY_PATTERN) return ac->sub.patterns; + if (pubsub_flags & PUBSUB_REPLY_SHARDED) return ac->sub.shard_channels; + return NULL; +} + +/* Handles a pubsub reply, delegates the reply to the right callback and frees + * the reply. The passed callback `cb` should be the one queued when the + * corresponding command was sent ('subscribe' and 'unsubscribe') and NULL for + * 'message'. */ +static int handlePubsubReply(redisAsyncContext *ac, redisReply *reply, + int pubsub_flags, redisCallback *cb) { + dict *callbacks = getPubsubCallbackDict(ac, pubsub_flags); + sds sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len); + if (sname == NULL) goto oom; + dictEntry *de = dictFind(callbacks, sname); + redisCallback *existcb = (de != NULL) ? dictGetEntryVal(de) : NULL; + + if (pubsub_flags & PUBSUB_REPLY_MESSAGE) { + sdsfree(sname); + __redisRunCallback(ac, existcb, reply); + return REDIS_OK; + } + + /* Subscribe and unsubscribe */ + if (pubsub_flags & PUBSUB_REPLY_SUBSCRIBE) { + /* Add channel subscription and call the callback */ + if (existcb != NULL && cb->fn == NULL) { + /* Don't replace the existing callback */ + sdsfree(sname); + __redisRunCallback(ac, existcb, reply); + } else { + /* Set or replace callback */ + int ret = dictReplace(callbacks, sname, cb); + if (ret == 0) sdsfree(sname); + __redisRunCallback(ac, cb, reply); + } + } else if (pubsub_flags & PUBSUB_REPLY_UNSUBSCRIBE) { + /* If we've unsubscribed to the last channel, the command is done. */ + /* Check if this was the last channel unsubscribed. */ + assert(reply->element[2]->type == REDIS_REPLY_INTEGER); + if (cb->pending_replies == -1 && reply->element[2]->integer == 0) { + cb->pending_replies = 0; } - /* If this is an subscribe reply decrease pending counter. */ - if (strcasecmp(stype+pvariant,"subscribe") == 0) { - assert(cb != NULL); - cb->pending_subs -= 1; - - } else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) { - if (cb == NULL) - ac->sub.pending_unsubs -= 1; - else if (cb->pending_subs == 0) - dictDelete(callbacks,sname); - - /* If this was the last unsubscribe message, revert to - * non-subscribe mode. */ - assert(reply->element[2]->type == REDIS_REPLY_INTEGER); - - /* Unset subscribed flag only when no pipelined pending subscribe - * or pending unsubscribe replies. */ - if (reply->element[2]->integer == 0 - && dictSize(ac->sub.channels) == 0 - && dictSize(ac->sub.patterns) == 0 - && ac->sub.pending_unsubs == 0) { - c->flags &= ~REDIS_SUBSCRIBED; - - /* Move ongoing regular command callbacks. */ - redisCallback cb; - while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) { - __redisPushCallback(&ac->replies,&cb); - } - } + if (existcb != NULL) { + /* Invoke the callback used when subscribing. */ + __redisRunCallback(ac, existcb, reply); + } else { + /* We were not subscribed to this channel. We could invoke the + * callback `cb` if passed with the [P|S]UNSUBSCRIBE command here, + * but legacy is to just free the reply. */ + ac->c.reader->fn->freeObject(reply); } + + /* Delete channel subscription. */ + dictDelete(callbacks,sname); sdsfree(sname); - } else { - /* Shift callback for pending command in subscribed context. */ - __redisShiftCallback(&ac->sub.replies,dstcb); + } + + if (cb->pending_replies > 0) cb->pending_replies--; + if (cb->pending_replies == 0) ac->sub.pending_commands--; + + /* Unset subscribed flag only when not subscribed to any channel and no + * pipelined pending subscribe or pending unsubscribe replies. */ + if (ac->sub.pending_commands == 0 + && dictSize(ac->sub.channels) == 0 + && dictSize(ac->sub.patterns) == 0 + && dictSize(ac->sub.shard_channels) == 0) { + ac->c.flags &= ~REDIS_SUBSCRIBED; } return REDIS_OK; oom: @@ -543,30 +615,6 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, return REDIS_ERR; } -#define redisIsSpontaneousPushReply(r) \ - (redisIsPushReply(r) && !redisIsSubscribeReply(r)) - -static int redisIsSubscribeReply(redisReply *reply) { - char *str; - size_t len, off; - - /* We will always have at least one string with the subscribe/message type */ - if (reply->elements < 1 || reply->element[0]->type != REDIS_REPLY_STRING || - reply->element[0]->len < sizeof("message") - 1) - { - return 0; - } - - /* Get the string/len moving past 'p' if needed */ - off = tolower(reply->element[0]->str[0]) == 'p'; - str = reply->element[0]->str + off; - len = reply->element[0]->len - off; - - return !strncasecmp(str, "subscribe", len) || - !strncasecmp(str, "message", len) || - !strncasecmp(str, "unsubscribe", len); -} - void redisProcessCallbacks(redisAsyncContext *ac) { redisContext *c = &(ac->c); void *reply = NULL; @@ -589,20 +637,30 @@ void redisProcessCallbacks(redisAsyncContext *ac) { /* Keep track of push message support for subscribe handling */ if (redisIsPushReply(reply)) c->flags |= REDIS_SUPPORTS_PUSH; + /* Categorize pubsub reply if we're in subscribed mode. */ + int pubsub_reply_flags = 0; + if (c->flags & REDIS_SUBSCRIBED) { + pubsub_reply_flags = checkPubsubReply(reply, c->flags & REDIS_SUPPORTS_PUSH); + } + /* Send any non-subscribe related PUSH messages to our PUSH handler * while allowing subscribe related PUSH messages to pass through. * This allows existing code to be backward compatible and work in * either RESP2 or RESP3 mode. */ - if (redisIsSpontaneousPushReply(reply)) { + if (redisIsPushReply(reply) && !pubsub_reply_flags) { __redisRunPushCallback(ac, reply); c->reader->fn->freeObject(reply); continue; } - /* Even if the context is subscribed, pending regular - * callbacks will get a reply before pub/sub messages arrive. */ - redisCallback cb = {NULL, NULL, 0, 0, NULL}; - if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) { + /* Get callback from queue which was added when the command was sent. */ + redisCallback *cb = NULL; + if (pubsub_reply_flags & PUBSUB_REPLY_MESSAGE) { + /* Pubsub message is the only true out-of-band pubsub reply. There + * is no callback in the queue. (Subscribe and unsubscribe are + * actually in-band replies to their corresponding commands even + * though they are of push type.) */ + } else if (__redisShiftCallback(&ac->replies, &cb) != REDIS_OK) { /* * A spontaneous reply in a not-subscribed context can be the error * reply that is sent when a new connection exceeds the maximum @@ -618,41 +676,38 @@ void redisProcessCallbacks(redisAsyncContext *ac) { * In this case we also want to close the connection, and have the * user wait until the server is ready to take our request. */ - if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) { - c->err = REDIS_ERR_OTHER; - snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str); - c->reader->fn->freeObject(reply); - __redisAsyncDisconnect(ac); - return; - } - /* No more regular callbacks and no errors, the context *must* be subscribed. */ - assert(c->flags & REDIS_SUBSCRIBED); - if (c->flags & REDIS_SUBSCRIBED) - __redisGetSubscribeCallback(ac,reply,&cb); + assert(((redisReply*)reply)->type == REDIS_REPLY_ERROR); + c->err = REDIS_ERR_OTHER; + snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str); + c->reader->fn->freeObject(reply); + __redisAsyncDisconnect(ac); + return; } - if (cb.fn != NULL) { - __redisRunCallback(ac,&cb,reply); - if (!(c->flags & REDIS_NO_AUTO_FREE_REPLIES)){ - c->reader->fn->freeObject(reply); - } + if (pubsub_reply_flags != 0) { + handlePubsubReply(ac, reply, pubsub_reply_flags, cb); + } else { + /* Regular reply. This includes ERR reply for subscribe commands. */ + __redisRunCallback(ac, cb, reply); + cb->pending_replies = 0; + } - /* Proceed with free'ing when redisAsyncFree() was called. */ - if (c->flags & REDIS_FREEING) { - __redisAsyncFree(ac); - return; + if (cb != NULL) { + /* If in monitor mode, repush the callback */ + if ((c->flags & REDIS_MONITORING) && !(c->flags & REDIS_FREEING)) { + __redisPushCallback(&ac->replies, cb); + } else if (cb->pending_replies != 0) { + /* The command needs more repies. Put it first in queue. */ + __redisUnshiftCallback(&ac->replies, cb); + } else { + callbackDecrRefCount(cb); } - } else { - /* No callback for this reply. This can either be a NULL callback, - * or there were no callbacks to begin with. Either way, don't - * abort with an error, but simply ignore it because the client - * doesn't know what the server will spit out over the wire. */ - c->reader->fn->freeObject(reply); } - /* If in monitor mode, repush the callback */ - if (c->flags & REDIS_MONITORING) { - __redisPushCallback(&ac->replies,&cb); + /* Proceed with free'ing when redisAsyncFree() was called. */ + if (c->flags & REDIS_FREEING) { + __redisAsyncFree(ac); + return; } } @@ -775,12 +830,12 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { void redisAsyncHandleTimeout(redisAsyncContext *ac) { redisContext *c = &(ac->c); - redisCallback cb; + redisCallback *cb; /* must not be called from a callback */ assert(!(c->flags & REDIS_IN_CALLBACK)); if ((c->flags & REDIS_CONNECTED)) { - if (ac->replies.head == NULL && ac->sub.replies.head == NULL) { + if (ac->replies.head == NULL) { /* Nothing to do - just an idle timeout */ return; } @@ -802,7 +857,8 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac) { } while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) { - __redisRunCallback(ac, &cb, NULL); + __redisRunCallback(ac, cb, NULL); + callbackDecrRefCount(cb); } /** @@ -828,133 +884,66 @@ static const char *nextArgument(const char *start, const char **str, size_t *len return p+2+(*len)+2; } +static int isPubsubCommand(const char *cmd, size_t len) { + if (len < strlen("subscribe") || len > strlen("punsubscribe")) + return 0; /* fast path */ + return + strncasecmp(cmd, "subscribe", len) == 0 || + strncasecmp(cmd, "unsubscribe", len) == 0 || + strncasecmp(cmd, "psubscribe", len) == 0 || + strncasecmp(cmd, "punsubscribe", len) == 0 || + strncasecmp(cmd, "ssubscribe", len) == 0 || + strncasecmp(cmd, "sunsubscribe", len) == 0; +} + /* Helper function for the redisAsyncCommand* family of functions. Writes a * formatted command to the output buffer and registers the provided callback * function with the context. */ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) { redisContext *c = &(ac->c); - redisCallback cb; - struct dict *cbdict; - dictIterator it; - dictEntry *de; - redisCallback *existcb; - int pvariant, hasnext; + redisCallback *cb; const char *cstr, *astr; size_t clen, alen; const char *p; - sds sname; - int ret; /* Don't accept new commands when the connection is about to be closed. */ if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR; /* Setup callback */ - cb.fn = fn; - cb.privdata = privdata; - cb.pending_subs = 1; - cb.unsubscribe_sent = 0; + cb = hi_malloc(sizeof(*cb)); + if (cb == NULL) + goto oom; + cb->fn = fn; + cb->privdata = privdata; + cb->refcount = 1; + cb->pending_replies = 1; /* Most commands have exactly 1 reply. */ /* Find out which command will be appended. */ p = nextArgument(cmd,&cstr,&clen); assert(p != NULL); - hasnext = (p[0] == '$'); - pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0; - cstr += pvariant; - clen -= pvariant; - - if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) { - c->flags |= REDIS_SUBSCRIBED; - - /* Add every channel/pattern to the list of subscription callbacks. */ - while ((p = nextArgument(p,&astr,&alen)) != NULL) { - sname = sdsnewlen(astr,alen); - if (sname == NULL) - goto oom; - - if (pvariant) - cbdict = ac->sub.patterns; - else - cbdict = ac->sub.channels; - - de = dictFind(cbdict,sname); - - if (de != NULL) { - existcb = dictGetEntryVal(de); - cb.pending_subs = existcb->pending_subs + 1; - } - - ret = dictReplace(cbdict,sname,&cb); - if (ret == 0) sdsfree(sname); + if (isPubsubCommand(cstr, clen)) { + /* The number of replies is the number of channels or patterns. Count + * the arguments. */ + cb->pending_replies = 0; + while ((p = nextArgument(p, &astr, &alen)) != NULL) { + cb->pending_replies++; } - } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) { - /* It is only useful to call (P)UNSUBSCRIBE when the context is - * subscribed to one or more channels or patterns. */ - if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR; - - if (pvariant) - cbdict = ac->sub.patterns; - else - cbdict = ac->sub.channels; - - if (hasnext) { - /* Send an unsubscribe with specific channels/patterns. - * Bookkeeping the number of expected replies */ - while ((p = nextArgument(p,&astr,&alen)) != NULL) { - sname = sdsnewlen(astr,alen); - if (sname == NULL) - goto oom; - - de = dictFind(cbdict,sname); - if (de != NULL) { - existcb = dictGetEntryVal(de); - if (existcb->unsubscribe_sent == 0) - existcb->unsubscribe_sent = 1; - else - /* Already sent, reply to be ignored */ - ac->sub.pending_unsubs += 1; - } else { - /* Not subscribed to, reply to be ignored */ - ac->sub.pending_unsubs += 1; - } - sdsfree(sname); - } - } else { - /* Send an unsubscribe without specific channels/patterns. - * Bookkeeping the number of expected replies */ - int no_subs = 1; - dictInitIterator(&it,cbdict); - while ((de = dictNext(&it)) != NULL) { - existcb = dictGetEntryVal(de); - if (existcb->unsubscribe_sent == 0) { - existcb->unsubscribe_sent = 1; - no_subs = 0; - } - } - /* Unsubscribing to all channels/patterns, where none is - * subscribed to, results in a single reply to be ignored. */ - if (no_subs == 1) - ac->sub.pending_unsubs += 1; + if (cb->pending_replies == 0) { + /* No channels specified. This is unsubscribe 'all' or an error. */ + cb->pending_replies = -1; } - - /* (P)UNSUBSCRIBE does not have its own response: every channel or - * pattern that is unsubscribed will receive a message. This means we - * should not append a callback function for this command. */ + c->flags |= REDIS_SUBSCRIBED; + ac->sub.pending_commands++; } else if (strncasecmp(cstr,"monitor\r\n",9) == 0) { - /* Set monitor flag and push callback */ + /* Set monitor flag */ c->flags |= REDIS_MONITORING; - if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK) - goto oom; - } else { - if (c->flags & REDIS_SUBSCRIBED) { - if (__redisPushCallback(&ac->sub.replies,&cb) != REDIS_OK) - goto oom; - } else { - if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK) - goto oom; - } + cb->pending_replies = -1; } + if (__redisPushCallback(&ac->replies, cb) != REDIS_OK) + goto oom; + __redisAppendCommand(c,cmd,len); /* Always schedule a write when the write buffer is non-empty */ @@ -964,6 +953,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void oom: __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory"); __redisAsyncCopyError(ac); + callbackDecrRefCount(cb); return REDIS_ERR; } diff --git a/async.h b/async.h index 4f94660b1..f83e1aacc 100644 --- a/async.h +++ b/async.h @@ -45,9 +45,11 @@ typedef void (redisCallbackFn)(struct redisAsyncContext*, void*, void*); typedef struct redisCallback { struct redisCallback *next; /* simple singly linked list */ redisCallbackFn *fn; - int pending_subs; - int unsubscribe_sent; void *privdata; + unsigned int refcount; /* Reference counter used when callback is used + * for multiple pubsub channels. */ + int pending_replies; /* Number of replies expected; -1 means + * unsubscribe all. */ } redisCallback; /* List of callbacks for either regular replies or pub/sub */ @@ -96,7 +98,7 @@ typedef struct redisAsyncContext { redisConnectCallback *onConnect; redisConnectCallbackNC *onConnectNC; - /* Regular command callbacks */ + /* Callback queue for pending replies */ redisCallbackList replies; /* Address used for connect() */ @@ -105,10 +107,10 @@ typedef struct redisAsyncContext { /* Subscription callbacks */ struct { - redisCallbackList replies; struct dict *channels; struct dict *patterns; - int pending_unsubs; + struct dict *shard_channels; + int pending_commands; /* Count pending [P|S][UN]SUBSCRIBE */ } sub; /* Any configured RESP3 PUSH handler */ From 1dff7b412ccbfa3447a033c6fcd70bd8d498c0b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Wed, 19 Jul 2023 12:25:57 +0200 Subject: [PATCH 02/11] Add finalizer support to async Redis client * For each time redisAsyncCommandWithFinalizer or one of its variants has been called with a finalizer, the finalizer will be called exactly once. * The finalizer is called when a callback (even if the callback function is NULL) is no longer associated with any command or pubsub channel or pattern. * The finalizer is called after the last call to the callback. The callback is never called after the finalizer has been called. New async API functions added: * redisvAsyncCommandWithFinalizer * redisAsyncCommandWithFinalizer * redisAsyncCommandArgvWithFinalizer * redisAsyncFormattedCommandWithFinalizer Co-Authored-By: Tudor Bosman --- async.c | 77 +++++++++++++++++++++++++++++++++++++++++---------------- async.h | 7 ++++++ test.c | 18 +++++++++----- 3 files changed, 75 insertions(+), 27 deletions(-) diff --git a/async.c b/async.c index 2fcafc14c..15b6f42b3 100644 --- a/async.c +++ b/async.c @@ -66,12 +66,19 @@ int __redisAppendCommand(redisContext *c, const char *cmd, size_t len); void __redisSetError(redisContext *c, int type, const char *str); /* Reference counting for callback struct. */ -static void callbackIncrRefCount(redisCallback *cb) { +static void callbackIncrRefCount(redisAsyncContext *ac, redisCallback *cb) { + (void)ac; cb->refcount++; } -static void callbackDecrRefCount(redisCallback *cb) { +static void callbackDecrRefCount(redisAsyncContext *ac, redisCallback *cb) { cb->refcount--; if (cb->refcount == 0) { + if (cb->finalizer != NULL) { + redisContext *c = &(ac->c); + c->flags |= REDIS_IN_CALLBACK; + cb->finalizer(ac, cb->privdata); + c->flags &= ~REDIS_IN_CALLBACK; + } hi_free(cb); } } @@ -98,14 +105,12 @@ static void callbackKeyDestructor(void *privdata, void *key) { } static void *callbackValDup(void *privdata, const void *val) { - (void)privdata; - callbackIncrRefCount((redisCallback *)val); + callbackIncrRefCount((redisAsyncContext *)privdata, (redisCallback *)val); return (void *)val; } static void callbackValDestructor(void *privdata, void *val) { - (void)privdata; - callbackDecrRefCount((redisCallback *)val); + callbackDecrRefCount((redisAsyncContext *)privdata, (redisCallback *)val); } static dictType callbackDict = { @@ -121,22 +126,22 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) { redisAsyncContext *ac; dict *channels = NULL, *patterns = NULL, *shard_channels = NULL; - channels = dictCreate(&callbackDict,NULL); + ac = hi_realloc(c,sizeof(redisAsyncContext)); + if (ac == NULL) + goto oom; + + channels = dictCreate(&callbackDict, ac); if (channels == NULL) goto oom; - patterns = dictCreate(&callbackDict,NULL); + patterns = dictCreate(&callbackDict, ac); if (patterns == NULL) goto oom; - shard_channels = dictCreate(&callbackDict,NULL); + shard_channels = dictCreate(&callbackDict, ac); if (shard_channels == NULL) goto oom; - ac = hi_realloc(c,sizeof(redisAsyncContext)); - if (ac == NULL) - goto oom; - c = &(ac->c); /* The regular connect functions will always set the flag REDIS_CONNECTED. @@ -173,6 +178,7 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) { if (channels) dictRelease(channels); if (patterns) dictRelease(patterns); if (shard_channels) dictRelease(shard_channels); + if (ac) hi_free(ac); return NULL; } @@ -386,7 +392,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) { /* Execute pending callbacks with NULL reply. */ while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) { __redisRunCallback(ac,cb,NULL); - callbackDecrRefCount(cb); + callbackDecrRefCount(ac, cb); } /* Run subscription callbacks with NULL reply */ @@ -700,7 +706,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) { /* The command needs more repies. Put it first in queue. */ __redisUnshiftCallback(&ac->replies, cb); } else { - callbackDecrRefCount(cb); + callbackDecrRefCount(ac, cb); } } @@ -858,7 +864,7 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac) { while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) { __redisRunCallback(ac, cb, NULL); - callbackDecrRefCount(cb); + callbackDecrRefCount(ac, cb); } /** @@ -899,7 +905,9 @@ static int isPubsubCommand(const char *cmd, size_t len) { /* Helper function for the redisAsyncCommand* family of functions. Writes a * formatted command to the output buffer and registers the provided callback * function with the context. */ -static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) { +static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, + redisFinalizerCallback *finalizer, void *privdata, + const char *cmd, size_t len) { redisContext *c = &(ac->c); redisCallback *cb; const char *cstr, *astr; @@ -914,6 +922,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void if (cb == NULL) goto oom; cb->fn = fn; + cb->finalizer = finalizer; cb->privdata = privdata; cb->refcount = 1; cb->pending_replies = 1; /* Most commands have exactly 1 reply. */ @@ -953,11 +962,16 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void oom: __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory"); __redisAsyncCopyError(ac); - callbackDecrRefCount(cb); + callbackDecrRefCount(ac, cb); return REDIS_ERR; } int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) { + return redisvAsyncCommandWithFinalizer(ac, fn, NULL, privdata, format, ap); +} + +int redisvAsyncCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, + void *privdata, const char *format, va_list ap) { char *cmd; int len; int status; @@ -967,7 +981,7 @@ int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdat if (len < 0) return REDIS_ERR; - status = __redisAsyncCommand(ac,fn,privdata,cmd,len); + status = __redisAsyncCommand(ac,fn,finalizer,privdata,cmd,len); hi_free(cmd); return status; } @@ -981,20 +995,41 @@ int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata return status; } +int redisAsyncCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, + void *privdata, const char *format, ...) { + va_list ap; + int status; + va_start(ap,format); + status = redisvAsyncCommandWithFinalizer(ac,fn,finalizer,privdata,format,ap); + va_end(ap); + return status; +} + int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) { + return redisAsyncCommandArgvWithFinalizer(ac, fn, NULL, privdata, argc, argv, argvlen); +} + +int redisAsyncCommandArgvWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, + void *privdata, int argc, const char **argv, const size_t *argvlen) { sds cmd; long long len; int status; len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen); if (len < 0) return REDIS_ERR; - status = __redisAsyncCommand(ac,fn,privdata,cmd,len); + status = __redisAsyncCommand(ac,fn,finalizer,privdata,cmd,len); sdsfree(cmd); return status; } int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) { - int status = __redisAsyncCommand(ac,fn,privdata,cmd,len); + int status = __redisAsyncCommand(ac,fn,NULL,privdata,cmd,len); + return status; +} + +int redisAsyncFormattedCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, + void *privdata, const char *cmd, size_t len) { + int status = __redisAsyncCommand(ac,fn,finalizer,privdata,cmd,len); return status; } diff --git a/async.h b/async.h index f83e1aacc..483830039 100644 --- a/async.h +++ b/async.h @@ -42,9 +42,11 @@ struct dict; /* dictionary header is included in async.c */ /* Reply callback prototype and container */ typedef void (redisCallbackFn)(struct redisAsyncContext*, void*, void*); +typedef void (redisFinalizerCallback)(struct redisAsyncContext *ac, void *privdata); typedef struct redisCallback { struct redisCallback *next; /* simple singly linked list */ redisCallbackFn *fn; + redisFinalizerCallback *finalizer; void *privdata; unsigned int refcount; /* Reference counter used when callback is used * for multiple pubsub channels. */ @@ -147,6 +149,11 @@ int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen); int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len); +int redisvAsyncCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *format, va_list ap); +int redisAsyncCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *format, ...); +int redisAsyncCommandArgvWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, int argc, const char **argv, const size_t *argvlen); +int redisAsyncFormattedCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *cmd, size_t len); + #ifdef __cplusplus } #endif diff --git a/test.c b/test.c index 6ac3ea0de..d63f94bcd 100644 --- a/test.c +++ b/test.c @@ -1682,6 +1682,12 @@ void null_cb(redisAsyncContext *ac, void *r, void *privdata) { state->checkpoint++; } +void finalizer_cb(redisAsyncContext *ac, void *privdata) { + TestState *state = privdata; + (void)ac; + state->checkpoint++; +} + static void test_pubsub_handling(struct config config) { test("Subscribe, handle published message and unsubscribe: "); /* Setup event dispatcher with a testcase timeout */ @@ -1701,10 +1707,10 @@ static void test_pubsub_handling(struct config config) { /* Start subscribe */ TestState state = {.options = &options}; - redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel"); + redisAsyncCommandWithFinalizer(ac,subscribe_cb,finalizer_cb,&state,"subscribe mychannel"); /* Make sure non-subscribe commands are handled */ - redisAsyncCommand(ac,array_cb,&state,"PING"); + redisAsyncCommandWithFinalizer(ac,array_cb,finalizer_cb,&state,"PING"); /* Start event dispatching loop */ test_cond(event_base_dispatch(base) == 0); @@ -1712,7 +1718,7 @@ static void test_pubsub_handling(struct config config) { event_base_free(base); /* Verify test checkpoints */ - assert(state.checkpoint == 3); + assert(state.checkpoint == 5); } /* Unexpected push message, will trigger a failure */ @@ -1930,8 +1936,8 @@ static void test_pubsub_multiple_channels(struct config config) { /* Start subscribing to two channels */ TestState state = {.options = &options}; - redisAsyncCommand(ac,subscribe_channel_a_cb,&state,"subscribe A"); - redisAsyncCommand(ac,subscribe_channel_b_cb,&state,"subscribe B"); + redisAsyncCommandWithFinalizer(ac,subscribe_channel_a_cb,finalizer_cb,&state,"subscribe A"); + redisAsyncCommandWithFinalizer(ac,subscribe_channel_b_cb,finalizer_cb,&state,"subscribe B"); /* Start event dispatching loop */ assert(event_base_dispatch(base) == 0); @@ -1939,7 +1945,7 @@ static void test_pubsub_multiple_channels(struct config config) { event_base_free(base); /* Verify test checkpoints */ - test_cond(state.checkpoint == 6); + test_cond(state.checkpoint == 8); } /* Command callback for test_monitor() */ From 546813644da7344d9b5b22be2446d608b9d107f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Wed, 19 Jul 2023 12:46:47 +0200 Subject: [PATCH 03/11] Add example-libevent-pubsub Co-Authored-By: Tudor Bosman --- Makefile | 5 +- examples/CMakeLists.txt | 2 + examples/example-libevent-pubsub.c | 171 +++++++++++++++++++++++++++++ 3 files changed, 177 insertions(+), 1 deletion(-) create mode 100644 examples/example-libevent-pubsub.c diff --git a/Makefile b/Makefile index 56e3d59be..64fab6523 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ # This file is released under the BSD license, see the COPYING file OBJ=alloc.o net.o hiredis.o sds.o async.o read.o sockcompat.o -EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-push hiredis-example-poll +EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libevent-pubsub hiredis-example-libev hiredis-example-glib hiredis-example-push hiredis-example-poll TESTS=hiredis-test LIBNAME=libhiredis PKGCONFNAME=hiredis.pc @@ -185,6 +185,9 @@ $(SSL_OBJ): ssl.c hiredis.h read.h sds.h alloc.h async.h win32.h async_private.h hiredis-example-libevent: examples/example-libevent.c adapters/libevent.h $(STLIBNAME) $(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< -levent $(STLIBNAME) $(REAL_LDFLAGS) +hiredis-example-libevent-pubsub: examples/example-libevent-pubsub.c adapters/libevent.h $(STLIBNAME) + $(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< -levent $(STLIBNAME) $(REAL_LDFLAGS) + hiredis-example-libevent-ssl: examples/example-libevent-ssl.c adapters/libevent.h $(STLIBNAME) $(SSL_STLIBNAME) $(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< -levent $(STLIBNAME) $(SSL_STLIBNAME) $(REAL_LDFLAGS) $(SSL_LDFLAGS) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 214898b07..813a9729c 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -23,6 +23,8 @@ FIND_PATH(LIBEVENT event.h) if (LIBEVENT) ADD_EXECUTABLE(example-libevent example-libevent.c) TARGET_LINK_LIBRARIES(example-libevent hiredis event) + ADD_EXECUTABLE(example-libevent-pubsub example-libevent-pubsub.c) + TARGET_LINK_LIBRARIES(example-libevent-pubsub hiredis event) ENDIF() FIND_PATH(LIBHV hv/hv.h) diff --git a/examples/example-libevent-pubsub.c b/examples/example-libevent-pubsub.c new file mode 100644 index 000000000..7820a71f6 --- /dev/null +++ b/examples/example-libevent-pubsub.c @@ -0,0 +1,171 @@ +#include +#include +#include +#include + +#include +#include +#include +#include + +void printReplyInternal(const redisReply* r) { + switch (r->type) { + case REDIS_REPLY_INTEGER: + printf("(integer %lld)", r->integer); + break; + case REDIS_REPLY_DOUBLE: + printf("(double %f)", r->dval); + break; + case REDIS_REPLY_ERROR: + printf("(error %s)", r->str); + break; + case REDIS_REPLY_STATUS: + printf("(status %s)", r->str); + break; + case REDIS_REPLY_STRING: + printf("(string %s)", r->str); + break; + case REDIS_REPLY_VERB: + printf("(verb %s)", r->str); + break; + case REDIS_REPLY_ARRAY: + printf("(array %zu", r->elements); + for (size_t i = 0; i < r->elements; ++i) { + putchar(' '); + printReplyInternal(r->element[i]); + } + putchar(')'); + break; + default: + printf("(?%d)", r->type); + break; + } +} + +void printReply(const redisReply* r) { + printReplyInternal(r); + putchar('\n'); +} + +void getCallback(redisAsyncContext *c, void *r, void *privdata) { + redisReply *reply = r; + if (reply == NULL) { + if (c->errstr) { + printf("errstr: %s\n", c->errstr); + } + return; + } + printf("getCallback: privdata='%s' reply='%s'\n", (char*)privdata, reply->str); +} + +void getFinalizer(redisAsyncContext *c, void *privdata) { + (void)privdata; + printf("Get finalizer called\n"); + redisAsyncDisconnect(c); +} + +void connectCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + printf("Error: %s\n", c->errstr); + return; + } + printf("Connected...\n"); +} + +void disconnectCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + printf("Error: %s\n", c->errstr); + return; + } + printf("Disconnected...\n"); +} + +typedef struct _SubscribeData { + int break_loop; + int remaining_message_count; +} SubscribeData; + +void subscribeCallback(redisAsyncContext *c, void *r, void *privdata) { + redisReply *reply = r; + SubscribeData *sd = privdata; + if (reply == NULL) { + if (c->errstr) { + printf("errstr: %s\n", c->errstr); + } + return; + } + + printf("Subscribe reply: "); + printReply(reply); + + assert(reply->type == REDIS_REPLY_ARRAY); + assert(reply->elements == 3); + assert(reply->element[0]->type == REDIS_REPLY_STRING); + + if (!strcasecmp(reply->element[0]->str, "message")) { + if (--sd->remaining_message_count == 0) { + redisAsyncCommand(c, NULL, NULL, "UNSUBSCRIBE foo"); + } + } + + if (sd->break_loop) { + sd->break_loop = 0; + redisLibeventEvents* e = c->ev.data; + event_base_loopbreak(e->base); + } +} + +void subscribeFinalizer(redisAsyncContext *c, void *privdata) { + (void)privdata; + printf("Subscribe finalizer called\n"); + redisAsyncDisconnect(c); +} + +int main(void) { +#ifndef _WIN32 + signal(SIGPIPE, SIG_IGN); +#endif + + struct event_base *base = event_base_new(); + redisOptions options = {0}; + REDIS_OPTIONS_SET_TCP(&options, "127.0.0.1", 6379); + struct timeval tv = {0}; + tv.tv_sec = 1; + options.connect_timeout = &tv; + + + redisAsyncContext *sub = redisAsyncConnectWithOptions(&options); + if (sub->err) { + /* Let *c leak for now... */ + printf("Error: %s\n", sub->errstr); + return 1; + } + + redisLibeventAttach(sub,base); + + redisAsyncContext *c = redisAsyncConnectWithOptions(&options); + if (c->err) { + /* Let *c leak for now... */ + printf("Error: %s\n", c->errstr); + return 1; + } + + redisLibeventAttach(c,base); + redisAsyncSetConnectCallback(c,connectCallback); + redisAsyncSetDisconnectCallback(c,disconnectCallback); + + SubscribeData sd; + memset(&sd, 0, sizeof(sd)); + sd.break_loop = 1; + sd.remaining_message_count = 3; + redisAsyncCommandWithFinalizer(sub, subscribeCallback, subscribeFinalizer, &sd, "SUBSCRIBE foo"); + event_base_dispatch(base); + + redisAsyncCommand(c, NULL, NULL, "SET key %b", "hello"); + for (int i = 0; i < 3; ++i) { + redisAsyncCommand(c, NULL, NULL, "PUBLISH foo msg%d", i); + } + redisAsyncCommandWithFinalizer(c, getCallback, getFinalizer, (void *)"get-with-finalizer", "GET key"); + event_base_dispatch(base); + return 0; +} From 41cf219fc6983851bd594496d6e1a7ef0fe1f91b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Thu, 20 Jul 2023 19:03:36 +0200 Subject: [PATCH 04/11] Implement RESET in async API: Cancel subscriptions and monitor mode Additionally, accept commands in monitor mode. (For example the RESET command, but also other commands.) Apart from being useful but itself, this change makes the async API's reply queue stay in sync (mapping each reply to the callback given when the command was sent) when bombed with random commands (fuzzing). --- async.c | 83 +++++++++++++++++++++++++++++++++++------- async.h | 7 ++-- dict.c | 5 +++ dict.h | 1 + test.c | 109 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 5 files changed, 188 insertions(+), 17 deletions(-) diff --git a/async.c b/async.c index 15b6f42b3..d0702c4fb 100644 --- a/async.c +++ b/async.c @@ -61,6 +61,11 @@ #define PUBSUB_REPLY_PATTERN 16 #define PUBSUB_REPLY_SHARDED 32 +/* Special negative values for a callback's `pending_replies` fields. */ +#define PENDING_REPLY_UNSUBSCRIBE_ALL -1 +#define PENDING_REPLY_MONITOR -2 +#define PENDING_REPLY_RESET -3 + /* Forward declarations of hiredis.c functions */ int __redisAppendCommand(redisContext *c, const char *cmd, size_t len); void __redisSetError(redisContext *c, int type, const char *str); @@ -172,6 +177,7 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) { ac->sub.patterns = patterns; ac->sub.shard_channels = shard_channels; ac->sub.pending_commands = 0; + ac->monitor_cb = NULL; return ac; oom: @@ -420,6 +426,10 @@ static void __redisAsyncFree(redisAsyncContext *ac) { dictRelease(ac->sub.shard_channels); } + if (ac->monitor_cb != NULL) { + callbackDecrRefCount(ac, ac->monitor_cb); + } + /* Signal event lib to clean up */ _EL_CLEANUP(ac); @@ -584,7 +594,9 @@ static int handlePubsubReply(redisAsyncContext *ac, redisReply *reply, /* If we've unsubscribed to the last channel, the command is done. */ /* Check if this was the last channel unsubscribed. */ assert(reply->element[2]->type == REDIS_REPLY_INTEGER); - if (cb->pending_replies == -1 && reply->element[2]->integer == 0) { + if (cb->pending_replies == PENDING_REPLY_UNSUBSCRIBE_ALL && + reply->element[2]->integer == 0) + { cb->pending_replies = 0; } @@ -621,12 +633,28 @@ static int handlePubsubReply(redisAsyncContext *ac, redisReply *reply, return REDIS_ERR; } +/* Handle the effects of the RESET command. */ +static void handleReset(redisAsyncContext *ac) { + /* Cancel monitoring mode */ + ac->c.flags &= ~REDIS_MONITORING; + if (ac->monitor_cb != NULL) { + callbackDecrRefCount(ac, ac->monitor_cb); + ac->monitor_cb = NULL; + } + + /* Cancel subscriptions (finalizers are called if any) */ + ac->c.flags &= ~REDIS_SUBSCRIBED; + if (ac->sub.channels) dictEmpty(ac->sub.channels); + if (ac->sub.patterns) dictEmpty(ac->sub.patterns); + if (ac->sub.shard_channels) dictEmpty(ac->sub.shard_channels); +} + void redisProcessCallbacks(redisAsyncContext *ac) { redisContext *c = &(ac->c); - void *reply = NULL; + redisReply *reply = NULL; int status; - while((status = redisGetReply(c,&reply)) == REDIS_OK) { + while((status = redisGetReply(c, (void**)&reply)) == REDIS_OK) { if (reply == NULL) { /* When the connection is being disconnected and there are * no more replies, this is the cue to really disconnect. */ @@ -659,6 +687,15 @@ void redisProcessCallbacks(redisAsyncContext *ac) { continue; } + /* Send monitored command to monitor callback */ + if ((c->flags & REDIS_MONITORING) && + reply->type == REDIS_REPLY_STATUS && reply->len > 0 && + reply->str[0] >= '0' && reply->str[0] <= '9') + { + __redisRunCallback(ac, ac->monitor_cb, reply); + continue; + } + /* Get callback from queue which was added when the command was sent. */ redisCallback *cb = NULL; if (pubsub_reply_flags & PUBSUB_REPLY_MESSAGE) { @@ -694,15 +731,33 @@ void redisProcessCallbacks(redisAsyncContext *ac) { handlePubsubReply(ac, reply, pubsub_reply_flags, cb); } else { /* Regular reply. This includes ERR reply for subscribe commands. */ + + /* Handle special effects of the command's reply, if any. */ + if (cb->pending_replies == PENDING_REPLY_RESET && + reply->type == REDIS_REPLY_STATUS && + strncmp(reply->str, "RESET", reply->len) == 0) + { + handleReset(ac); + } else if (cb->pending_replies == PENDING_REPLY_MONITOR && + reply->type == REDIS_REPLY_STATUS && + strncmp(reply->str, "OK", reply->len) == 0) + { + /* Set monitor flag and callback, freeing any old callback. */ + c->flags |= REDIS_MONITORING; + if (ac->monitor_cb != NULL) { + callbackDecrRefCount(ac, ac->monitor_cb); + } + ac->monitor_cb = cb; + callbackIncrRefCount(ac, cb); + } + + /* Invoke callback */ __redisRunCallback(ac, cb, reply); cb->pending_replies = 0; } if (cb != NULL) { - /* If in monitor mode, repush the callback */ - if ((c->flags & REDIS_MONITORING) && !(c->flags & REDIS_FREEING)) { - __redisPushCallback(&ac->replies, cb); - } else if (cb->pending_replies != 0) { + if (cb->pending_replies != 0) { /* The command needs more repies. Put it first in queue. */ __redisUnshiftCallback(&ac->replies, cb); } else { @@ -939,15 +994,17 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, cb->pending_replies++; } if (cb->pending_replies == 0) { - /* No channels specified. This is unsubscribe 'all' or an error. */ - cb->pending_replies = -1; + /* No channels specified means unsubscribe all. (This can happens + * for SUBSCRIBE, but it is an error and then the value of pending + * replies doesn't matter.) */ + cb->pending_replies = PENDING_REPLY_UNSUBSCRIBE_ALL; } c->flags |= REDIS_SUBSCRIBED; ac->sub.pending_commands++; - } else if (strncasecmp(cstr,"monitor\r\n",9) == 0) { - /* Set monitor flag */ - c->flags |= REDIS_MONITORING; - cb->pending_replies = -1; + } else if (strncasecmp(cstr, "monitor", clen) == 0) { + cb->pending_replies = PENDING_REPLY_MONITOR; + } else if (strncasecmp(cstr, "reset", clen) == 0) { + cb->pending_replies = PENDING_REPLY_RESET; } if (__redisPushCallback(&ac->replies, cb) != REDIS_OK) diff --git a/async.h b/async.h index 483830039..47ff8df69 100644 --- a/async.h +++ b/async.h @@ -50,8 +50,8 @@ typedef struct redisCallback { void *privdata; unsigned int refcount; /* Reference counter used when callback is used * for multiple pubsub channels. */ - int pending_replies; /* Number of replies expected; -1 means - * unsubscribe all. */ + int pending_replies; /* Number of replies expected; negative values + * are special. */ } redisCallback; /* List of callbacks for either regular replies or pub/sub */ @@ -117,6 +117,9 @@ typedef struct redisAsyncContext { /* Any configured RESP3 PUSH handler */ redisAsyncPushFn *push_cb; + + /* Monitor, only when MONITOR has been called. */ + redisCallback *monitor_cb; } redisAsyncContext; /* Functions that proxy to hiredis */ diff --git a/dict.c b/dict.c index ad571818e..e00a311d8 100644 --- a/dict.c +++ b/dict.c @@ -246,6 +246,11 @@ static int _dictClear(dict *ht) { return DICT_OK; /* never fails */ } +/* Delete all the keys and values */ +static void dictEmpty(dict *ht) { + _dictClear(ht); +} + /* Clear & Release the hash table */ static void dictRelease(dict *ht) { _dictClear(ht); diff --git a/dict.h b/dict.h index 6ad0acd8d..af3269264 100644 --- a/dict.h +++ b/dict.h @@ -117,6 +117,7 @@ static int dictExpand(dict *ht, unsigned long size); static int dictAdd(dict *ht, void *key, void *val); static int dictReplace(dict *ht, void *key, void *val); static int dictDelete(dict *ht, const void *key); +static void dictEmpty(dict *ht); static void dictRelease(dict *ht); static dictEntry * dictFind(dict *ht, const void *key); static void dictInitIterator(dictIterator *iter, dict *ht); diff --git a/test.c b/test.c index d63f94bcd..271386db3 100644 --- a/test.c +++ b/test.c @@ -2023,6 +2023,108 @@ static void test_monitor(struct config config) { /* Verify test checkpoints */ assert(state.checkpoint == 3); } + +/* Reset callback for test_reset() */ +static void reset_reset_cb(redisAsyncContext *ac, void *r, void *privdata) { + redisReply *reply = r; + TestState *state = privdata; + assert(reply != NULL && reply->elements == 2); + char *str = reply->element[0]->str; + size_t len = reply->element[0]->len; + assert(strncmp(str, "RESET", len) == 0); + state->checkpoint++; + /* Check that when the RESET callback is called, the context has already + * been reset. Monitor and pubsub have been cancelled. */ + assert(!(ac->c.flags & REDIS_SUBSCRIBED)); + assert(!(ac->c.flags & REDIS_MONITORING)); + event_base_loopbreak(base); +} + +/* Ping callback for test_reset() */ +static void reset_ping_cb(redisAsyncContext *ac, void *r, void *privdata) { + redisReply *reply = r; + TestState *state = privdata; + assert(reply != NULL && reply->elements == 2); + char *str = reply->element[0]->str; + size_t len = reply->element[0]->len; + assert(strncmp(str, "pong", len) == 0); + state->checkpoint++; + redisAsyncCommandWithFinalizer(ac, reset_reset_cb, finalizer_cb, &state, + "reset"); +} + +/* Subscribe callback for test_reset() */ +static void reset_subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) { + redisReply *reply = r; + TestState *state = privdata; + assert(reply != NULL && + reply->type == REDIS_REPLY_ARRAY && + reply->elements > 0); + char *str = reply->element[0]->str; + size_t len = reply->element[0]->len; + assert(strncmp(str, "subscribe", len) == 0); + state->checkpoint++; + redisAsyncCommandWithFinalizer(ac, reset_ping_cb, finalizer_cb, + &state, "ping"); +} + +/* Monitor callback for test_reset(). */ +static void reset_monitor_cb(redisAsyncContext *ac, void *r, void *privdata) { + redisReply *reply = r; + TestState *state = privdata; + assert(reply != NULL && reply->type == REDIS_REPLY_STATUS); + state->checkpoint++; + if (strncmp(reply->str, "OK", reply->len) == 0) { + /* Reply to the MONITOR command */ + redisAsyncCommandWithFinalizer(ac, reset_subscribe_cb, finalizer_cb, &state, + "subscribe %s", "ch"); + } else { + /* A monitored command starts with a numeric timestamp, e.g. + * +1689801837.986559 [0 127.0.0.1:44308] "ping" */ + assert(reply->str[0] >= '0' && reply->str[0] <= '9'); + } +} + +/* Check that RESET cancels all subscriptions and monitoring (Redis >= 6.2) */ +static void test_reset(struct config config) { + test("RESET cancels monitoring and pubsub: "); + /* Setup event dispatcher with a testcase timeout */ + base = event_base_new(); + struct event *timeout = evtimer_new(base, timeout_cb, NULL); + assert(timeout != NULL); + + evtimer_assign(timeout,base,timeout_cb,NULL); + struct timeval timeout_tv = {.tv_sec = 10}; + evtimer_add(timeout, &timeout_tv); + + /* Connect */ + redisOptions options = get_redis_tcp_options(config); + redisAsyncContext *ac = redisAsyncConnectWithOptions(&options); + assert(ac != NULL && ac->err == 0); + redisLibeventAttach(ac,base); + + /* Not expecting any push messages in this test */ + redisAsyncSetPushCallback(ac,unexpected_push_cb); + + /* Start monitor */ + TestState state = {.options = &options}; + redisAsyncCommandWithFinalizer(ac, reset_monitor_cb, finalizer_cb, &state, + "monitor"); + + /* Start event dispatching loop */ + test_cond(event_base_dispatch(base) == 0); + event_free(timeout); + event_base_free(base); + + /* Verify test checkpoint sum. + * + * Replies for monitor, subscribe, ping and reset = 4 + * Monitored subscribe and ping = 2 + * Finalizer for monitor, subscribe, ping, reset = 4 + * Sum: 4 + 2 + 4 = 10 */ + assert(state.checkpoint == 10); +} + #endif /* HIREDIS_TEST_ASYNC */ /* tests for async api using polling adapter, requires no extra libraries*/ @@ -2394,9 +2496,9 @@ int main(int argc, char **argv) { printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port); cfg.type = CONN_TCP; - int major; + int major, minor; redisContext *c = do_connect(cfg); - get_redis_version(c, &major, NULL); + get_redis_version(c, &major, &minor); disconnect(c, 0); test_pubsub_handling(cfg); @@ -2406,6 +2508,9 @@ int main(int argc, char **argv) { test_pubsub_handling_resp3(cfg); test_command_timeout_during_pubsub(cfg); } + if (major > 6 || (major == 6 && minor >= 2)) { + test_reset(cfg); + } #endif /* HIREDIS_TEST_ASYNC */ cfg.type = CONN_TCP; From 17c391b0b2b6e56986c99b3e9831f2a71d10b475 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Fri, 21 Jul 2023 14:45:37 +0200 Subject: [PATCH 05/11] Fix pointer errors in test_reset --- async.c | 2 +- test.c | 16 +++++++--------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/async.c b/async.c index d0702c4fb..835199859 100644 --- a/async.c +++ b/async.c @@ -1019,7 +1019,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, oom: __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory"); __redisAsyncCopyError(ac); - callbackDecrRefCount(ac, cb); + if (cb) callbackDecrRefCount(ac, cb); return REDIS_ERR; } diff --git a/test.c b/test.c index 271386db3..199e34b92 100644 --- a/test.c +++ b/test.c @@ -2028,10 +2028,8 @@ static void test_monitor(struct config config) { static void reset_reset_cb(redisAsyncContext *ac, void *r, void *privdata) { redisReply *reply = r; TestState *state = privdata; - assert(reply != NULL && reply->elements == 2); - char *str = reply->element[0]->str; - size_t len = reply->element[0]->len; - assert(strncmp(str, "RESET", len) == 0); + assert(reply != NULL && reply->type == REDIS_REPLY_STATUS); + assert(strncmp(reply->str, "RESET", reply->len) == 0); state->checkpoint++; /* Check that when the RESET callback is called, the context has already * been reset. Monitor and pubsub have been cancelled. */ @@ -2044,12 +2042,13 @@ static void reset_reset_cb(redisAsyncContext *ac, void *r, void *privdata) { static void reset_ping_cb(redisAsyncContext *ac, void *r, void *privdata) { redisReply *reply = r; TestState *state = privdata; + /* Ping returns an array in subscribed mode. */ assert(reply != NULL && reply->elements == 2); char *str = reply->element[0]->str; size_t len = reply->element[0]->len; assert(strncmp(str, "pong", len) == 0); state->checkpoint++; - redisAsyncCommandWithFinalizer(ac, reset_reset_cb, finalizer_cb, &state, + redisAsyncCommandWithFinalizer(ac, reset_reset_cb, finalizer_cb, state, "reset"); } @@ -2064,8 +2063,8 @@ static void reset_subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) { size_t len = reply->element[0]->len; assert(strncmp(str, "subscribe", len) == 0); state->checkpoint++; - redisAsyncCommandWithFinalizer(ac, reset_ping_cb, finalizer_cb, - &state, "ping"); + redisAsyncCommandWithFinalizer(ac, reset_ping_cb, finalizer_cb, state, + "ping"); } /* Monitor callback for test_reset(). */ @@ -2076,7 +2075,7 @@ static void reset_monitor_cb(redisAsyncContext *ac, void *r, void *privdata) { state->checkpoint++; if (strncmp(reply->str, "OK", reply->len) == 0) { /* Reply to the MONITOR command */ - redisAsyncCommandWithFinalizer(ac, reset_subscribe_cb, finalizer_cb, &state, + redisAsyncCommandWithFinalizer(ac, reset_subscribe_cb, finalizer_cb, state, "subscribe %s", "ch"); } else { /* A monitored command starts with a numeric timestamp, e.g. @@ -2438,7 +2437,6 @@ int main(int argc, char **argv) { /* Unix sockets don't exist in Windows */ test_unix_socket = 0; #endif - test_allocator_injection(); test_format_commands(); From cbe49dd651f6b7024d6ab007ed4ed305cc55b844 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Fri, 21 Jul 2023 16:15:42 +0200 Subject: [PATCH 06/11] Test case fix: Break event loop on server close on QUIT --- test.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/test.c b/test.c index 199e34b92..0380f4b53 100644 --- a/test.c +++ b/test.c @@ -1594,6 +1594,14 @@ void async_disconnect(redisAsyncContext *ac) { event_base_loopbreak(base); } +/* Use as disconnect callback e.g. when calling QUIT */ +void disconnectCbExpectServerClose(const struct redisAsyncContext *ac, int status) { + assert(status == REDIS_ERR); + assert(ac->err == REDIS_ERR_EOF); + assert(strcmp(ac->errstr, "Server closed the connection") == 0); + event_base_loopbreak(base); +} + /* Testcase timeout, will trigger a failure */ void timeout_cb(int fd, short event, void *arg) { (void) fd; (void) event; (void) arg; @@ -1982,6 +1990,9 @@ void monitor_cb(redisAsyncContext *ac, void *r, void *privdata) { } else if (state->checkpoint == 3) { /* Response for monitored command 'SET second 2' */ assert(strstr(reply->str,"second") != NULL); + /* To exit the event loop on server disconnect, we need to do so in a + * disconnect callback. */ + redisAsyncSetDisconnectCallback(ac, disconnectCbExpectServerClose); /* Send QUIT to disconnect */ redisAsyncCommand(ac,NULL,NULL,"QUIT"); } From 7081c218e106a17733c960530643e7b5962926c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Fri, 21 Jul 2023 16:51:17 +0200 Subject: [PATCH 07/11] Test fix: Disconnect properly in test_reset() --- test.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test.c b/test.c index 0380f4b53..f0514f4ba 100644 --- a/test.c +++ b/test.c @@ -2046,7 +2046,7 @@ static void reset_reset_cb(redisAsyncContext *ac, void *r, void *privdata) { * been reset. Monitor and pubsub have been cancelled. */ assert(!(ac->c.flags & REDIS_SUBSCRIBED)); assert(!(ac->c.flags & REDIS_MONITORING)); - event_base_loopbreak(base); + async_disconnect(ac); } /* Ping callback for test_reset() */ From 7f8a1bb855924174435e1b2451ebf5f757c01f1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Fri, 21 Jul 2023 16:56:11 +0200 Subject: [PATCH 08/11] Remove unused fields in async context --- async.h | 4 ---- 1 file changed, 4 deletions(-) diff --git a/async.h b/async.h index 47ff8df69..4b38208fb 100644 --- a/async.h +++ b/async.h @@ -103,10 +103,6 @@ typedef struct redisAsyncContext { /* Callback queue for pending replies */ redisCallbackList replies; - /* Address used for connect() */ - struct sockaddr *saddr; - size_t addrlen; - /* Subscription callbacks */ struct { struct dict *channels; From 63968e9ad69c6a958ebf0cce49c4e6bd7e1008c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Wed, 23 Aug 2023 16:20:16 +0200 Subject: [PATCH 09/11] Add docs of the WithFinalizer functions in README --- README.md | 98 +++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 84 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 74364b411..388b26bdc 100644 --- a/README.md +++ b/README.md @@ -496,15 +496,35 @@ The `privdata` argument can be used to curry arbitrary data to the callback from the command is initially queued for execution. The functions that can be used to issue commands in an asynchronous context are: + ```c -int redisAsyncCommand( - redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, - const char *format, ...); -int redisAsyncCommandArgv( - redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, - int argc, const char **argv, const size_t *argvlen); -``` -Both functions work like their blocking counterparts. The return value is `REDIS_OK` when the command +int redisAsyncCommand(redisAsyncContext *ac, + redisCallbackFn *fn, + void *privdata, + const char *format, + ...); + +int redisvAsyncCommand(redisAsyncContext *ac, + redisCallbackFn *fn, + void *privdata, + const char *format, + va_list ap); + +int redisAsyncCommandArgv(redisAsyncContext *ac, + redisCallbackFn *fn, + void *privdata, + int argc, + const char **argv, + const size_t *argvlen); + +int redisAsyncFormattedCommand(redisAsyncContext *ac, + redisCallbackFn *fn, + void *privdata, + const char *cmd, + size_t len); +``` + +These functions work like their blocking counterparts. The return value is `REDIS_OK` when the command was successfully added to the output buffer and `REDIS_ERR` otherwise. Example: when the connection is being disconnected per user-request, no new commands may be added to the output buffer and `REDIS_ERR` is returned on calls to the `redisAsyncCommand` family. @@ -514,14 +534,64 @@ for a command is non-`NULL`, the memory is freed immediately following the callb valid for the duration of the callback. All pending callbacks are called with a `NULL` reply when the context encountered an error. - -For every command issued, with the exception of **SUBSCRIBE** and **PSUBSCRIBE**, the callback is -called exactly once. Even if the context object id disconnected or deleted, every pending callback +Even if the context object id disconnected or deleted, every pending callback will be called with a `NULL` reply. -For **SUBSCRIBE** and **PSUBSCRIBE**, the callbacks may be called repeatedly until an `unsubscribe` -message arrives. This will be the last invocation of the callback. In case of error, the callbacks -may receive a final `NULL` reply instead. +For every command issued, with the exception of the **`[S|P][UN]SUBSCRIBE`** +commands, the callback is called exactly once. For **SUBSCRIBE**, **PSUBSCRIBE** +and **SSUBSCRIBE**, the callbacks are called repeatedly for each message +arriving on the channel or pattern, until an `unsubscribe` message arrives for +each subscribed channel or pattern. This will be the last invocation of the +callback. In case of error, the callbacks may receive a final `NULL` reply +instead. + +The callback for the **`[S|P]UNSUBSCRIBE`** commands are never called, except +if the command returns an error. Instead, on successful unsubscribe, the +unsubscribe replies are delivered to the callback provided to the +**`[S|P]SUBSCRIBE`** when the channel or pattern was subscribed. + +#### Sending commands with callback and finalizer + +Most commands get exactly one reply and for these, the callback is called +exactly once. The exception is the **`[S|P][UN]SUBSCRIBE`** functions, as +described above. To be able to free the privdata associated with the callback, a +finalizer can be supplied. The finalizer is called exactly once for each call to +the following functions, after the callback function has been called for the +last time for the command to which it was provided. A finalizer has the +prototype `void(struct redisAsyncContext *ac, void *privdata)`. Apart from the +finalizer, these `WithFinalizer` functions behave exactly like their +counterparts without finalizer. + +```c +int redisAsyncCommandWithFinalizer(redisAsyncContext *ac, + redisCallbackFn *fn, + redisFinalizerCallback *finalizer, + void *privdata, + const char *format, + ...); + +int redisvAsyncCommandWithFinalizer(redisAsyncContext *ac, + redisCallbackFn *fn, + redisFinalizerCallback *finalizer, + void *privdata, + const char *format, + va_list ap); + +int redisAsyncCommandArgvWithFinalizer(redisAsyncContext *ac, + redisCallbackFn *fn, + redisFinalizerCallback *finalizer, + void *privdata, + int argc, + const char **argv, + const size_t *argvlen); + +int redisAsyncFormattedCommandWithFinalizer(redisAsyncContext *ac, + redisCallbackFn *fn, + redisFinalizerCallback *finalizer, + void *privdata, + const char *cmd, + size_t len); +``` ### Disconnecting From 401053a8af8c2cff7ead1f444b794f9445253d5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Wed, 23 Aug 2023 16:29:44 +0200 Subject: [PATCH 10/11] Fixup: Spelling --- .github/wordlist.txt | 1 + README.md | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/wordlist.txt b/.github/wordlist.txt index 244bf5b80..f02831b04 100644 --- a/.github/wordlist.txt +++ b/.github/wordlist.txt @@ -28,6 +28,7 @@ de deallocation ElastiCache extensibility +finalizer FPM getaddrinfo gmail diff --git a/README.md b/README.md index 6b3688f13..8eb09f729 100644 --- a/README.md +++ b/README.md @@ -565,9 +565,9 @@ unsubscribe replies are delivered to the callback provided to the Most commands get exactly one reply and for these, the callback is called exactly once. The exception is the **`[S|P][UN]SUBSCRIBE`** functions, as -described above. To be able to free the privdata associated with the callback, a -finalizer can be supplied. The finalizer is called exactly once for each call to -the following functions, after the callback function has been called for the +described above. To be able to free the `privdata` associated with the callback, +a finalizer can be supplied. The finalizer is called exactly once for each call +to the following functions, after the callback function has been called for the last time for the command to which it was provided. A finalizer has the prototype `void(struct redisAsyncContext *ac, void *privdata)`. Apart from the finalizer, these `WithFinalizer` functions behave exactly like their From 27d36f467f6d1e2cb4a83df4752f847e28e0fe8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Wed, 23 Aug 2023 16:32:06 +0200 Subject: [PATCH 11/11] Fixup: Spelling again --- README.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 8eb09f729..560aecf43 100644 --- a/README.md +++ b/README.md @@ -549,12 +549,12 @@ Even if the context object id disconnected or deleted, every pending callback will be called with a `NULL` reply. For every command issued, with the exception of the **`[S|P][UN]SUBSCRIBE`** -commands, the callback is called exactly once. For **SUBSCRIBE**, **PSUBSCRIBE** -and **SSUBSCRIBE**, the callbacks are called repeatedly for each message -arriving on the channel or pattern, until an `unsubscribe` message arrives for -each subscribed channel or pattern. This will be the last invocation of the -callback. In case of error, the callbacks may receive a final `NULL` reply -instead. +commands, the callback is called exactly once. For **`SUBSCRIBE`**, +**`PSUBSCRIBE`** and **`SSUBSCRIBE`**, the callbacks are called repeatedly for +each message arriving on the channel or pattern, until an `unsubscribe` message +arrives for each subscribed channel or pattern. This will be the last invocation +of the callback. In case of error, the callbacks may receive a final `NULL` +reply instead. The callback for the **`[S|P]UNSUBSCRIBE`** commands are never called, except if the command returns an error. Instead, on successful unsubscribe, the