Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis read-only replica support #1019

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 129 additions & 41 deletions cachedb/redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,33 @@
#include "hiredis/hiredis.h"

struct redis_moddata {
redisContext** ctxs; /* thread-specific redis contexts */
int numctxs; /* number of ctx entries */
const char* server_host; /* server's IP address or host name */
int server_port; /* server's TCP port */
const char* server_path; /* server's unix path, or "", NULL if unused */
const char* server_password; /* server's AUTH password, or "", NULL if unused */
struct timeval timeout; /* timeout for connection setup and commands */
int logical_db; /* the redis logical database to use */
/* thread-specific redis contexts */
redisContext** ctxs;
redisContext** replica_ctxs;
/* number of ctx entries */
int numctxs;
/* server's IP address or host name */
const char* server_host;
const char* replica_server_host;
/* server's TCP port */
int server_port;
int replica_server_port;
/* server's unix path, or "", NULL if unused */
const char* server_path;
const char* replica_server_path;
/* server's AUTH password, or "", NULL if unused */
const char* server_password;
const char* replica_server_password;
/* timeout for connection setup and commands */
struct timeval timeout;
struct timeval replica_timeout;
/* the redis logical database to use */
int logical_db;
int replica_logical_db;
};

static redisReply* redis_command(struct module_env*, struct cachedb_env*,
const char*, const uint8_t*, size_t);
const char*, const uint8_t*, size_t, int);

static void
moddata_clean(struct redis_moddata** moddata) {
Expand All @@ -77,21 +92,28 @@ moddata_clean(struct redis_moddata** moddata) {
}
free((*moddata)->ctxs);
}
if((*moddata)->replica_ctxs) {
int i;
for(i = 0; i < (*moddata)->numctxs; i++) {
if((*moddata)->replica_ctxs[i])
redisFree((*moddata)->replica_ctxs[i]);
}
free((*moddata)->replica_ctxs);
}
free(*moddata);
*moddata = NULL;
}

static redisContext*
redis_connect(const struct redis_moddata* moddata)
redis_connect(const char* host, int port, const char* path,
const char* password, int logical_db, const struct timeval timeout)
{
redisContext* ctx;

if(moddata->server_path && moddata->server_path[0]!=0) {
ctx = redisConnectUnixWithTimeout(moddata->server_path,
moddata->timeout);
if(path && path[0]!=0) {
ctx = redisConnectUnixWithTimeout(path, timeout);
} else {
ctx = redisConnectWithTimeout(moddata->server_host,
moddata->server_port, moddata->timeout);
ctx = redisConnectWithTimeout(host, port, timeout);
}
if(!ctx || ctx->err) {
const char *errstr = "out of memory";
Expand All @@ -100,32 +122,39 @@ redis_connect(const struct redis_moddata* moddata)
log_err("failed to connect to redis server: %s", errstr);
goto fail;
}
if(redisSetTimeout(ctx, moddata->timeout) != REDIS_OK) {
if(redisSetTimeout(ctx, timeout) != REDIS_OK) {
log_err("failed to set redis timeout");
goto fail;
}
if(moddata->server_password && moddata->server_password[0]!=0) {
if(password && password[0]!=0) {
redisReply* rep;
rep = redisCommand(ctx, "AUTH %s", moddata->server_password);
rep = redisCommand(ctx, "AUTH %s", password);
if(!rep || rep->type == REDIS_REPLY_ERROR) {
log_err("failed to authenticate with password");
freeReplyObject(rep);
goto fail;
}
freeReplyObject(rep);
}
if(moddata->logical_db > 0) {
if(logical_db > 0) {
redisReply* rep;
rep = redisCommand(ctx, "SELECT %d", moddata->logical_db);
rep = redisCommand(ctx, "SELECT %d", logical_db);
if(!rep || rep->type == REDIS_REPLY_ERROR) {
log_err("failed to set logical database (%d)",
moddata->logical_db);
logical_db);
freeReplyObject(rep);
goto fail;
}
freeReplyObject(rep);
}
verbose(VERB_OPS, "Connection to Redis established");
if(verbosity >= VERB_OPS) {
char port_str[6+1];
port_str[0] = ' ';
(void)snprintf(port_str+1, sizeof(port_str-1), "%d", port);
verbose(VERB_OPS, "Connection to Redis established (%s%s)",
path&&path[0]!=0?path:host,
path&&path[0]!=0?"":port_str);
}
return ctx;

fail:
Expand All @@ -148,35 +177,75 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env)
goto fail;
}
moddata->numctxs = env->cfg->num_threads;
moddata->ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*));
if(!moddata->ctxs) {
log_err("out of memory");
goto fail;
}
/* note: server_host is a shallow reference to configured string.
* we don't have to free it in this module. */
/* note: server_host and similar string configuration options are
* shallow references to configured strings; we don't have to free them
* in this module. */
moddata->server_host = env->cfg->redis_server_host;
moddata->replica_server_host = env->cfg->redis_replica_server_host;
moddata->server_port = env->cfg->redis_server_port;
moddata->replica_server_port = env->cfg->redis_replica_server_port;
moddata->server_path = env->cfg->redis_server_path;
moddata->replica_server_path = env->cfg->redis_replica_server_path;
moddata->server_password = env->cfg->redis_server_password;
moddata->replica_server_password = env->cfg->redis_replica_server_password;
moddata->timeout.tv_sec = env->cfg->redis_timeout / 1000;
moddata->timeout.tv_usec = (env->cfg->redis_timeout % 1000) * 1000;
moddata->replica_timeout.tv_sec = env->cfg->redis_replica_timeout / 1000;
moddata->replica_timeout.tv_usec = (env->cfg->redis_replica_timeout % 1000) * 1000;
moddata->logical_db = env->cfg->redis_logical_db;
moddata->replica_logical_db = env->cfg->redis_replica_logical_db;

moddata->ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*));
if(!moddata->ctxs) {
log_err("out of memory");
goto fail;
}
if((moddata->replica_server_host && moddata->replica_server_host[0]!=0)
|| (moddata->replica_server_path && moddata->replica_server_path[0]!=0)) {
/* There is a replica configured, allocate ctxs */
moddata->replica_ctxs = calloc(env->cfg->num_threads, sizeof(redisContext*));
if(!moddata->replica_ctxs) {
log_err("out of memory");
goto fail;
}
}
for(i = 0; i < moddata->numctxs; i++) {
redisContext* ctx = redis_connect(moddata);
redisContext* ctx = redis_connect(
moddata->server_host,
moddata->server_port,
moddata->server_path,
moddata->server_password,
moddata->logical_db,
moddata->timeout);
if(!ctx) {
log_err("redis_init: failed to init redis");
goto fail;
}
moddata->ctxs[i] = ctx;
}
if(moddata->replica_ctxs) {
for(i = 0; i < moddata->numctxs; i++) {
redisContext* ctx = redis_connect(
moddata->replica_server_host,
moddata->replica_server_port,
moddata->replica_server_path,
moddata->replica_server_password,
moddata->replica_logical_db,
moddata->replica_timeout);
if(!ctx) {
log_err("redis_init: failed to init redis");
goto fail;
}
moddata->replica_ctxs[i] = ctx;
}
}
cachedb_env->backend_data = moddata;
if(env->cfg->redis_expire_records) {
redisReply* rep = NULL;
int redis_reply_type = 0;
/** check if setex command is supported */
rep = redis_command(env, cachedb_env,
"SETEX __UNBOUND_REDIS_CHECK__ 1 none", NULL, 0);
"SETEX __UNBOUND_REDIS_CHECK__ 1 none", NULL, 0, 1);
if(!rep) {
/** init failed, no response from redis server*/
log_err("redis_init: failed to init redis, the "
Expand Down Expand Up @@ -229,9 +298,9 @@ redis_deinit(struct module_env* env, struct cachedb_env* cachedb_env)
*/
static redisReply*
redis_command(struct module_env* env, struct cachedb_env* cachedb_env,
const char* command, const uint8_t* data, size_t data_len)
const char* command, const uint8_t* data, size_t data_len, int write)
{
redisContext* ctx;
redisContext* ctx, **ctx_selector;
redisReply* rep;
struct redis_moddata* d = (struct redis_moddata*)
cachedb_env->backend_data;
Expand All @@ -242,17 +311,36 @@ redis_command(struct module_env* env, struct cachedb_env* cachedb_env,
* assumption throughout the unbound architecture, so we simply assert
* it. */
log_assert(env->alloc->thread_num < d->numctxs);
ctx = d->ctxs[env->alloc->thread_num];

ctx_selector = !write && d->replica_ctxs
?d->replica_ctxs
:d->ctxs;
ctx = ctx_selector[env->alloc->thread_num];

/* If we've not established a connection to the server or we've closed
* it on a failure, try to re-establish a new one. Failures will be
* logged in redis_connect(). */
if(!ctx) {
ctx = redis_connect(d);
d->ctxs[env->alloc->thread_num] = ctx;
if(d->replica_ctxs) {
ctx = redis_connect(
d->replica_server_host,
d->replica_server_port,
d->replica_server_path,
d->replica_server_password,
d->replica_logical_db,
d->replica_timeout);
} else {
ctx = redis_connect(
d->server_host,
d->server_port,
d->server_path,
d->server_password,
d->logical_db,
d->timeout);
}
ctx_selector[env->alloc->thread_num] = ctx;
}
if(!ctx)
return NULL;
if(!ctx) return NULL;

/* Send the command and get a reply, synchronously. */
rep = (redisReply*)redisCommand(ctx, command, data, data_len);
Expand All @@ -262,7 +350,7 @@ redis_command(struct module_env* env, struct cachedb_env* cachedb_env,
log_err("redis_command: failed to receive a reply, "
"closing connection: %s", ctx->errstr);
redisFree(ctx);
d->ctxs[env->alloc->thread_num] = NULL;
ctx_selector[env->alloc->thread_num] = NULL;
return NULL;
}

Expand Down Expand Up @@ -292,7 +380,7 @@ redis_lookup(struct module_env* env, struct cachedb_env* cachedb_env,
return 0;
}

rep = redis_command(env, cachedb_env, cmdbuf, NULL, 0);
rep = redis_command(env, cachedb_env, cmdbuf, NULL, 0, 0);
if(!rep)
return 0;
switch(rep->type) {
Expand Down Expand Up @@ -357,7 +445,7 @@ redis_store(struct module_env* env, struct cachedb_env* cachedb_env,
return;
}

rep = redis_command(env, cachedb_env, cmdbuf, data, data_len);
rep = redis_command(env, cachedb_env, cmdbuf, data, data_len, 1);
if(rep) {
verbose(VERB_ALGO, "redis_store set completed");
if(rep->type != REDIS_REPLY_STATUS &&
Expand Down
25 changes: 13 additions & 12 deletions configure
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#! /bin/sh
# Guess values for system-dependent variables and create Makefiles.
# Generated by GNU Autoconf 2.69 for unbound 1.19.0.
# Generated by GNU Autoconf 2.69 for unbound 1.19.1.
#
# Report bugs to <[email protected] or https://github.com/NLnetLabs/unbound/issues>.
#
Expand Down Expand Up @@ -591,8 +591,8 @@ MAKEFLAGS=
# Identity of this package.
PACKAGE_NAME='unbound'
PACKAGE_TARNAME='unbound'
PACKAGE_VERSION='1.19.0'
PACKAGE_STRING='unbound 1.19.0'
PACKAGE_VERSION='1.19.1'
PACKAGE_STRING='unbound 1.19.1'
PACKAGE_BUGREPORT='[email protected] or https://github.com/NLnetLabs/unbound/issues'
PACKAGE_URL=''

Expand Down Expand Up @@ -1477,7 +1477,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF
\`configure' configures unbound 1.19.0 to adapt to many kinds of systems.
\`configure' configures unbound 1.19.1 to adapt to many kinds of systems.

Usage: $0 [OPTION]... [VAR=VALUE]...

Expand Down Expand Up @@ -1543,7 +1543,7 @@ fi

if test -n "$ac_init_help"; then
case $ac_init_help in
short | recursive ) echo "Configuration of unbound 1.19.0:";;
short | recursive ) echo "Configuration of unbound 1.19.1:";;
esac
cat <<\_ACEOF

Expand Down Expand Up @@ -1785,7 +1785,7 @@ fi
test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then
cat <<\_ACEOF
unbound configure 1.19.0
unbound configure 1.19.1
generated by GNU Autoconf 2.69

Copyright (C) 2012 Free Software Foundation, Inc.
Expand Down Expand Up @@ -2494,7 +2494,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake.

It was created by unbound $as_me 1.19.0, which was
It was created by unbound $as_me 1.19.1, which was
generated by GNU Autoconf 2.69. Invocation command line was

$ $0 $@
Expand Down Expand Up @@ -2846,11 +2846,11 @@ UNBOUND_VERSION_MAJOR=1

UNBOUND_VERSION_MINOR=19

UNBOUND_VERSION_MICRO=0
UNBOUND_VERSION_MICRO=1


LIBUNBOUND_CURRENT=9
LIBUNBOUND_REVISION=23
LIBUNBOUND_REVISION=24
LIBUNBOUND_AGE=1
# 1.0.0 had 0:12:0
# 1.0.1 had 0:13:0
Expand Down Expand Up @@ -2941,6 +2941,7 @@ LIBUNBOUND_AGE=1
# 1.17.1 had 9:21:1
# 1.18.0 had 9:22:1
# 1.19.0 had 9:23:1
# 1.19.1 had 9:24:1

# Current -- the number of the binary API that we're implementing
# Revision -- which iteration of the implementation of the binary
Expand Down Expand Up @@ -21894,7 +21895,7 @@ _ACEOF



version=1.19.0
version=1.19.1

date=`date +'%b %e, %Y'`

Expand Down Expand Up @@ -22413,7 +22414,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
# report actual input values of CONFIG_FILES etc. instead of their
# values after options handling.
ac_log="
This file was extended by unbound $as_me 1.19.0, which was
This file was extended by unbound $as_me 1.19.1, which was
generated by GNU Autoconf 2.69. Invocation command line was

CONFIG_FILES = $CONFIG_FILES
Expand Down Expand Up @@ -22479,7 +22480,7 @@ _ACEOF
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
ac_cs_version="\\
unbound config.status 1.19.0
unbound config.status 1.19.1
configured by $0, generated by GNU Autoconf 2.69,
with options \\"\$ac_cs_config\\"

Expand Down
Loading