Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

RC for first release #10

Merged
merged 8 commits into from
Oct 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
Eventuate chaos testing utilities
=================================

This is very early work in progress on chaos testing utilities for [Eventuate][eventuate] and [Apache
Cassandra](http://cassandra.apache.org/). They support running Cassandra clusters and Eventuate applications with
[Docker][docker] and using [blockade][blockade] to easily generate failures like stopping and restarting of containers
and introducing network failures such as partitions, packet loss and slow connections.
This repository contains some chaos testing utilities for [Eventuate][eventuate], [Apache
Cassandra](http://cassandra.apache.org/) and [Level-DB](https://github.com/google/leveldb). They support running
Cassandra clusters and Eventuate applications with [Docker][docker] and using [blockade][blockade] to easily generate
failures like stopping and restarting of containers and introducing network failures such as partitions, packet loss and
slow connections.

This repository can be seen as a toolkit or collection of utilities which you can use to test your Eventuate
applications. Moreover we are going to describe an examplary test setup that gives an introduction into these tools and
Expand All @@ -29,7 +30,7 @@ Prerequisites
#### Linux

- [Docker][docker] (tested with docker >= 1.6)
- [blockade][blockade] (currently a fork of the original [dcm-oss/blockade](https://github.com/dcm-oss/blockade))
- [blockade][blockade] (`>= 0.2.0`, currently a fork of the original [dcm-oss/blockade](https://github.com/dcm-oss/blockade))

##### Initial setup

Expand Down Expand Up @@ -60,6 +61,7 @@ These steps only have to be taken once for the initial bootstrapping.

#### Mac OS

- [Docker Toolbox][toolbox]
- [Vagrant][vagrant] (tested with 1.7.4)
- [VirtualBox](https://www.virtualbox.org/)

Expand All @@ -81,6 +83,14 @@ cd /vagrant
```


##### Docker toolbox

Although Mac recently got a *native* [Docker for Mac](https://docs.docker.com/docker-for-mac/) implementation you still
have to use the [Docker Toolbox][toolbox] (which interfaces VirtualBox) to use blockade and therefore *eventuate-chaos*
itself. This is because blockade uses the linux `iptables` and `tc` tools to establish and simulate network
interference.


Example test setup
------------------

Expand Down Expand Up @@ -407,3 +417,4 @@ start`, `blockade stop`, `blockade restart`, `blockade up` and `blockade destroy
[blockade]: https://github.com/kongo2002/blockade
[vagrant]: https://www.vagrantup.com/
[eventuate]: https://github.com/RBMHTechnology/eventuate
[toolbox]: https://docs.docker.com/docker-for-mac/docker-toolbox/
3 changes: 2 additions & 1 deletion Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fi

# get setuptools
apt-get update
apt-get install python-setuptools
apt-get install -y python-setuptools

# get blockade
if [ -d "/blockade" ]; then
Expand All @@ -28,6 +28,7 @@ python setup.py develop

# pull required docker images
docker pull cassandra:2.2.3
docker pull cassandra:3.7
docker pull tonistiigi/dnsdock

docker build -t eventuate-chaos/sbt /vagrant
Expand Down
13 changes: 8 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ version := "0.1-SNAPSHOT"

scalaVersion := "2.11.7"

resolvers += "OJO Snapshots" at "https://oss.jfrog.org/oss-snapshot-local"
resolvers += "Eventuate Releases" at "https://dl.bintray.com/rbmhtechnology/maven"

val eventuateVersion = "0.8-M2"

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.4-M2",
"com.typesafe.akka" %% "akka-remote" % "2.4-M2" % "test",
"com.rbmhtechnology" %% "eventuate" % "0.5-SNAPSHOT" % "test",
"org.slf4j" % "slf4j-log4j12" % "1.7.9" % "test"
"com.rbmhtechnology" %% "eventuate-core" % eventuateVersion,
"com.rbmhtechnology" %% "eventuate-log-leveldb" % eventuateVersion,
"com.rbmhtechnology" %% "eventuate-log-cassandra" % eventuateVersion,
"com.rbmhtechnology" %% "eventuate-crdt" % eventuateVersion,
"org.slf4j" % "slf4j-log4j12" % "1.7.9" % "test"
)

val runNobootcp =
Expand Down
58 changes: 37 additions & 21 deletions crdt-counter-partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,90 +6,106 @@
import random
import sys

import blockade.cli
import interact


HOST = '127.0.0.1'
MAX_VALUE = 100


def get_counter(host, port):
value = interact.request(host, port, 'get')
if value:
return int(value)
return None


def check_counters(nodes):
if len(nodes) < 1:
raise ValueError('no nodes given')

counters = []
for node, port in nodes.items():
counter = interact.request(HOST, port, 'get')
counter = get_counter(HOST, port)
equal = all(x == counter for x in counters)

if not equal:
print('Counter of node "%s" [%s] does not match with other counters: %s' %
(node, counter, str(counters)))
(node, counter, counters))
return None
counters.append(counter)

# all counters are the same -> return the first one for reference
return int(counters[0])
return counters[0]


class CounterOperation(interact.Operation):
def dump_logs(nodes):
if len(nodes) < 1:
raise ValueError('no nodes given')

def __init__(self):
self.state = {'counter': 0}
for _, port in nodes.items():
interact.request(HOST, port, 'dump')

def init(self, host, nodes):
# get first counter value
counter = int(interact.request(host, nodes.values()[0], 'get'))
self.state['counter'] = counter

return self.state
class CounterOperation(interact.Operation):

def operation(self, node, idx, state):
def __init__(self):
self.counter = 0

def operation(self, node, idx):
op = random.choice(['inc', 'dec'])
value = random.randint(1, MAX_VALUE)

if op == 'inc':
state['counter'] += value
self.counter += value
else:
state['counter'] -= value
self.counter -= value

return '%s %d' % (op, value)

def get_counter(self):
return self.state['counter']
return self.counter


if __name__ == '__main__':
SETTLE_TIMEOUT = 30
PARSER = argparse.ArgumentParser(description='start CRDT-counter chaos test')
PARSER.add_argument('-i', '--iterations', type=int, default=30, help='number of failure/partition iterations')
PARSER.add_argument('--interval', type=float, default=0.1, help='delay between requests')
PARSER.add_argument('-l', '--locations', type=int, default=3, help='number of locations')
PARSER.add_argument('-d', '--delay', type=int, default=10, help='delay between each random partition')
PARSER.add_argument('--settle', type=int, default=60, help='number of seconds to wait for settling')
PARSER.add_argument('-r', '--runners', type=int, default=3, help='number of request runners')
PARSER.add_argument('--restarts', default=0.2, type=float, help='restart probability (in %%)')

ARGS = PARSER.parse_args()

# we are making some assumptions in here:
# every location is named 'location<id>' and its TCP port (8080)
# is mapped to the host port '10000+<id>'
NODES = dict(('location%d' % idx, 10000+idx) for idx in xrange(1, ARGS.locations+1))
OP = CounterOperation()
OPS = [CounterOperation() for _ in xrange(ARGS.runners)]

interact.wait_to_be_running(HOST, NODES)

if not interact.requests_with_chaos(OP, HOST, NODES, ARGS.iterations, ARGS.interval, SETTLE_TIMEOUT, ARGS.delay):
INITIAL_COUNTER = check_counters(NODES)
if INITIAL_COUNTER is None:
sys.exit(1)

EXPECTED_VALUE = OP.get_counter()
if not interact.requests_with_chaos(OPS, HOST, NODES, ARGS.iterations, ARGS.interval, ARGS.settle, ARGS.delay, ARGS.restarts):
sys.exit(1)

DIFF = sum(op.get_counter() for op in OPS)
COUNTER_VALUE = check_counters(NODES)

if COUNTER_VALUE is None:
sys.exit(1)
else:
print('All %d nodes converged to the counter value: %d' % (len(NODES), COUNTER_VALUE))

EXPECTED_VALUE = INITIAL_COUNTER + DIFF
if COUNTER_VALUE != EXPECTED_VALUE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doee this make sense at all, given that requests that fail are also considered in the DIFF?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are test scenarios that may succeed without failures (i.e. LevelDB based tests).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only LevelDB without restarts, right?

print('Expected counter value: %d; actual %d' % (EXPECTED_VALUE, COUNTER_VALUE))
sys.exit(1)
sys.exit(0)

print('Counter value (%d) matches up correctly' % EXPECTED_VALUE)

100 changes: 57 additions & 43 deletions interact.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ class Operation(object):
def init(self, host, nodes):
pass

def operation(self, node, iteration, state):
def operation(self, node, iteration):
'''
This method will be called on very iteration of a worker request. You
may modify the passed 'state' object.
This method will be called on very iteration of a worker request.
'''
raise NotImplementedError("you have to implement 'Operation.operation'")

Expand All @@ -42,19 +41,15 @@ def __init__(self, host, nodes, operation, operations=None, interval=0.5):
self.is_cancelled = False
self.operation = operation
self.iterations = 0
self.state = None

def run(self):
# initialize operation's state (if given)
self.state = self.operation.init(self.host, self.nodes)

while (self.operations is None or self.operations > 0) and not self.is_cancelled:
if self.operations:
self.operations -= 1
# initialize operation's state (if necessary)
self.operation.init(self.host, self.nodes)

while (self.operations is None or self.iterations < self.operations) and not self.is_cancelled:
node = random.choice(self.nodes.keys())
port = self.nodes[node]
request(self.host, port, self.operation.operation(node, self.iterations, self.state))
request(self.host, port, self.operation.operation(node, self.iterations))

self.iterations += 1
time.sleep(self.interval)
Expand All @@ -64,27 +59,30 @@ def cancel(self):


def request(ip, port, message):
# connect
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1.0)
sock.connect((ip, port))

# request
sock.send(message)
data = []

try:
while True:
received = sock.recv(BUFFER_SIZE)
if not received:
break
data.append(received)
except socket.timeout:
pass
finally:
# disconnect
sock.close()
return ''.join(data)
# connect
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1.0)
sock.connect((ip, port))

try:
# request
sock.send(message)
data = []

while True:
received = sock.recv(BUFFER_SIZE)
if not received:
break
data.append(received)
except socket.timeout:
pass
finally:
# disconnect
sock.close()
return ''.join(data)
except socket.error:
return None


def is_healthy(ip, port, message, verbose=True):
Expand Down Expand Up @@ -117,29 +115,37 @@ def _print_partitions(partitions):
print('Partition %d: %s' % (idx+1, ', '.join(part)))


def requests_with_chaos(operation, host, nodes, iterations, interval, settle=SETTLE_TIMEOUT, failure_delay=FAILURE_DELAY):
def requests_with_chaos(operations, host, nodes, iterations, interval, settle=SETTLE_TIMEOUT, failure_delay=FAILURE_DELAY, restarts=0.2):
print('Chaos iterations: %d' % iterations)
print('Request interval: %.3f sec' % interval)
print('Operation runners: %d' % len(operations))
print('Failure delay: %d sec' % failure_delay)

restart_prob = max(min(restarts, 1.0), 0.0)
print('Restart probability: %.2f%%' % (restart_prob * 100.0))

print('Nodes:')
for node in nodes.keys():
print(' ' + node)

# wait for system to be ready and initialized
wait_to_be_running(host, nodes)

workers = [RequestWorker(host, nodes, op, interval=interval) for op in operations]

def for_workers(func):
for worker in workers:
func(worker)
try:
worker = RequestWorker(host, nodes, operation, interval=interval)
worker.start()
for_workers(RequestWorker.start)

# initialize blockade interface
cfg = blockade.cli.load_config('blockade.yml')
blk = blockade.cli.get_blockade(cfg)

def random_network(node):
def random_network():
failure = random.choice([blk.fast, blk.flaky, blk.slow])
failure([node], None)
return failure(None, None, select_random=True)

delay = failure_delay / 2

Expand All @@ -150,13 +156,21 @@ def random_network(node):
print('-' * 25)
time.sleep(delay)

# trigger random network failure (slow, flaky...)
random_network(random.choice(nodes.keys()))
# we either schedule a restart of a
# running node or we trigger a network failure
if random.random() < restart_prob:
restarted = list(blk.restart(None, blk.state_factory.load(), select_random=True))
print('Restarting %s...' % (', '.join(restarted)))
print('-' * 25)
else:
# random network failure (slow, flaky...)
random_network()

time.sleep(delay)

except (KeyboardInterrupt, blockade.errors.BlockadeError) as err:
worker.cancel()
worker.join()
for_workers(RequestWorker.cancel)
for_workers(RequestWorker.join)

if err is KeyboardInterrupt:
blk.join()
Expand All @@ -166,16 +180,16 @@ def random_network(node):
else:
raise

worker.cancel()
worker.join()
for_workers(RequestWorker.cancel)
for_workers(RequestWorker.join)

print('Joining cluster - waiting %d seconds to settle...' % settle)
blk.join()
blk.fast(None, None)

time.sleep(settle)

print('Processed %d requests in the meantime' % worker.iterations)
print('Processed %d requests in the meantime' % sum(w.iterations for w in workers))
return True


Expand Down
Loading