Skip to content

Commit

Permalink
Merge pull request #12 from scylladb/ci-tests
Browse files Browse the repository at this point in the history
Continuous integration
  • Loading branch information
piodul authored Oct 4, 2022
2 parents c318151 + 071e542 commit 7900048
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 61 deletions.
45 changes: 45 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: CI

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]

env:
SCYLLA_SUBNET: 192.168.100.0/24
SCYLLA_SRC_URI: 192.168.100.100
SCYLLA_DST_URI: 192.168.100.200
SCYLLA_IMAGE: scylladb/scylla

jobs:
ci:
runs-on: ubuntu-latest
steps:
- name: git checkout
uses: actions/checkout@v2

- name: set up Go
uses: actions/setup-go@v2
with:
go-version: ^1.18.0
stable: true

- name: go build
run: go build -v ./...

- name: go vet
run: go vet -v ./...

- name: start the Scylla nodes for the test
run: |
sudo sh -c "echo 2097152 >> /proc/sys/fs/aio-max-nr"
docker-compose -f ./ci/docker-compose.yml up -d
until docker-compose -f ./ci/docker-compose.yml exec -T source_node cqlsh -e "select * from system.local" ; do sleep 1; done
until docker-compose -f ./ci/docker-compose.yml exec -T destination_node cqlsh -e "select * from system.local" ; do sleep 1; done
- name: go test
run: go test -v ./...

- name: stop the Scylla nodes
run: docker-compose -f ./ci/docker-compose.yml stop
23 changes: 23 additions & 0 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
version: "3.7"

services:
source_node:
image: scylladb/scylla
command: --seeds ${SCYLLA_SRC_URI} --skip-wait-for-gossip-to-settle 0
networks:
public:
ipv4_address: ${SCYLLA_SRC_URI}
destination_node:
image: scylladb/scylla
command: --seeds ${SCYLLA_DST_URI} --skip-wait-for-gossip-to-settle 0
networks:
public:
ipv4_address: ${SCYLLA_DST_URI}

networks:
public:
driver: bridge
ipam:
driver: default
config:
- subnet: ${SCYLLA_SUBNET}
2 changes: 1 addition & 1 deletion examples/printer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func main() {

// React to Ctrl+C signal, and stop gracefully after the first signal
// Second signal exits the process
signalC := make(chan os.Signal)
signalC := make(chan os.Signal, 2)
go func() {
<-signalC
reader.Stop()
Expand Down
2 changes: 1 addition & 1 deletion examples/replicator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func main() {
// done by the replicator ASAP and stop it.
//
// 3rd signal will exit the process immediately with error code 1.
signalC := make(chan os.Signal)
signalC := make(chan os.Signal, 3)
go func() {
<-signalC
now := time.Now()
Expand Down
67 changes: 33 additions & 34 deletions examples/replicator/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@ import (

"github.com/gocql/gocql"
scyllacdc "github.com/scylladb/scylla-cdc-go"
)

const (
sourceAddress = "127.0.0.1"
destinationAddress = "127.0.0.2"
"github.com/scylladb/scylla-cdc-go/internal/testutils"
)

type schema struct {
Expand All @@ -25,49 +21,49 @@ type schema struct {
}

var udts = []string{
"CREATE TYPE ks.udt_simple (a int, b int, c text)",
"CREATE TYPE udt_simple (a int, b int, c text)",
}

var (
schemaSimple = schema{
"ks.tbl_simple",
"CREATE TABLE ks.tbl_simple (pk text, ck int, v1 int, v2 text, PRIMARY KEY (pk, ck))",
"tbl_simple",
"CREATE TABLE tbl_simple (pk text, ck int, v1 int, v2 text, PRIMARY KEY (pk, ck))",
}
schemaMultipleClusteringKeys = schema{
"ks.tbl_multiple_clustering_keys",
"CREATE TABLE ks.tbl_multiple_clustering_keys (pk text, ck1 int, ck2 int, v int, PRIMARY KEY (pk, ck1, ck2))",
"tbl_multiple_clustering_keys",
"CREATE TABLE tbl_multiple_clustering_keys (pk text, ck1 int, ck2 int, v int, PRIMARY KEY (pk, ck1, ck2))",
}
schemaBlobs = schema{
"ks.tbl_blobs",
"CREATE TABLE ks.tbl_blobs (pk text, ck int, v blob, PRIMARY KEY (pk, ck))",
"tbl_blobs",
"CREATE TABLE tbl_blobs (pk text, ck int, v blob, PRIMARY KEY (pk, ck))",
}
schemaLists = schema{
"ks.tbl_lists",
"CREATE TABLE ks.tbl_lists (pk text, ck int, v list<int>, PRIMARY KEY(pk, ck))",
"tbl_lists",
"CREATE TABLE tbl_lists (pk text, ck int, v list<int>, PRIMARY KEY(pk, ck))",
}
schemaSets = schema{
"ks.tbl_sets",
"CREATE TABLE ks.tbl_sets (pk text, ck int, v set<int>, PRIMARY KEY (pk, ck))",
"tbl_sets",
"CREATE TABLE tbl_sets (pk text, ck int, v set<int>, PRIMARY KEY (pk, ck))",
}
schemaMaps = schema{
"ks.tbl_maps",
"CREATE TABLE ks.tbl_maps (pk text, ck int, v map<int, int>, PRIMARY KEY (pk, ck))",
"tbl_maps",
"CREATE TABLE tbl_maps (pk text, ck int, v map<int, int>, PRIMARY KEY (pk, ck))",
}
schemaTuples = schema{
"ks.tbl_tuples",
"CREATE TABLE ks.tbl_tuples (pk text, ck int, v tuple<int, text>, PRIMARY KEY (pk, ck))",
"tbl_tuples",
"CREATE TABLE tbl_tuples (pk text, ck int, v tuple<int, text>, PRIMARY KEY (pk, ck))",
}
schemaTuplesInTuples = schema{
"ks.tbl_tuples_in_tuples",
"CREATE TABLE ks.tbl_tuples_in_tuples (pk text, ck int, v tuple<tuple<int, text>, int>, PRIMARY KEY (pk, ck))",
"tbl_tuples_in_tuples",
"CREATE TABLE tbl_tuples_in_tuples (pk text, ck int, v tuple<tuple<int, text>, int>, PRIMARY KEY (pk, ck))",
}
schemaTuplesInTuplesInTuples = schema{
"ks.tbl_tuples_in_tuples_in_tuples",
"CREATE TABLE ks.tbl_tuples_in_tuples_in_tuples (pk text, ck int, v tuple<tuple<tuple<int, int>, text>, int>, PRIMARY KEY (pk, ck))",
"tbl_tuples_in_tuples_in_tuples",
"CREATE TABLE tbl_tuples_in_tuples_in_tuples (pk text, ck int, v tuple<tuple<tuple<int, int>, text>, int>, PRIMARY KEY (pk, ck))",
}
schemaUDTs = schema{
"ks.tbl_udts",
"CREATE TABLE ks.tbl_udts (pk text, ck int, v ks.udt_simple, PRIMARY KEY (pk, ck))",
"tbl_udts",
"CREATE TABLE tbl_udts (pk text, ck int, v udt_simple, PRIMARY KEY (pk, ck))",
}
)

Expand Down Expand Up @@ -355,11 +351,14 @@ func TestReplicator(t *testing.T) {
schemas[tc.schema.tableName] = tc.schema.createQuery
}

// TODO: Provide IPs from the env
sourceSession := createSessionAndSetupSchema(t, sourceAddress, true, schemas)
sourceAddress := testutils.GetSourceClusterContactPoint()
destinationAddress := testutils.GetDestinationClusterContactPoint()
keyspaceName := testutils.GetUniqueName("test_keyspace")

sourceSession := createSessionAndSetupSchema(t, sourceAddress, keyspaceName, true, schemas)
defer sourceSession.Close()

destinationSession := createSessionAndSetupSchema(t, destinationAddress, false, schemas)
destinationSession := createSessionAndSetupSchema(t, destinationAddress, keyspaceName, false, schemas)
defer destinationSession.Close()

// Execute all of the queries
Expand All @@ -385,7 +384,7 @@ func TestReplicator(t *testing.T) {

schemaNames := make([]string, 0)
for tbl := range schemas {
schemaNames = append(schemaNames, tbl)
schemaNames = append(schemaNames, fmt.Sprintf("%s.%s", keyspaceName, tbl))
}

logger := log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile)
Expand Down Expand Up @@ -471,16 +470,16 @@ func TestReplicator(t *testing.T) {
}
}

func createSessionAndSetupSchema(t *testing.T, addr string, withCdc bool, schemas map[string]string) *gocql.Session {
func createSessionAndSetupSchema(t *testing.T, addr string, keyspaceName string, withCdc bool, schemas map[string]string) *gocql.Session {
testutils.CreateKeyspace(t, addr, keyspaceName)

cfg := gocql.NewCluster(addr)
cfg.Keyspace = keyspaceName
session, err := cfg.CreateSession()
if err != nil {
t.Fatal(err)
}

execQuery(t, session, "DROP KEYSPACE IF EXISTS ks")
execQuery(t, session, "CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")

for _, udt := range udts {
execQuery(t, session, udt)
}
Expand Down
68 changes: 68 additions & 0 deletions internal/testutils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package testutils

import (
"context"
"fmt"
"os"
"sync/atomic"
"testing"
"time"

"github.com/gocql/gocql"
)

var (
testStartTimestamp time.Time
currentTestNumber uint32 = 0
)

func init() {
testStartTimestamp = time.Now()
}

func GetUniqueName(prefix string) string {
unixNano := testStartTimestamp.UnixNano()
uniqueId := atomic.AddUint32(&currentTestNumber, 1) - 1
return fmt.Sprintf("%s_%d_%d", prefix, unixNano, uniqueId)
}

func GetSourceClusterContactPoint() string {
uri := os.Getenv("SCYLLA_SRC_URI")
if uri == "" {
uri = "127.0.0.1"
}
return uri
}

func GetDestinationClusterContactPoint() string {
uri := os.Getenv("SCYLLA_DST_URI")
if uri == "" {
uri = "127.0.0.2"
}
return uri
}

func CreateKeyspace(t *testing.T, contactPoint string, keyspaceName string) {
cluster := gocql.NewCluster(contactPoint)
session, err := cluster.CreateSession()
if err != nil {
t.Fatalf("failed to create session: %v", err)
}

defer session.Close()
err = session.Query(fmt.Sprintf("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}", keyspaceName)).Exec()
if err != nil {
t.Fatalf("failed to create keyspace %s: %v", keyspaceName, err)
}

err = session.AwaitSchemaAgreement(context.Background())
if err != nil {
t.Fatalf("awaiting schema agreement failed: %v", err)
}
}

func CreateUniqueKeyspace(t *testing.T, contactPoint string) string {
keyspaceName := GetUniqueName("test_keyspace")
CreateKeyspace(t, contactPoint, keyspaceName)
return keyspaceName
}
Loading

0 comments on commit 7900048

Please sign in to comment.