Skip to content

Commit

Permalink
Merge pull request #13 from scylladb/improve-saving-progress
Browse files Browse the repository at this point in the history
Improve the progress saving feature
  • Loading branch information
piodul authored Oct 19, 2022
2 parents 7900048 + f4e6700 commit e99bcf1
Show file tree
Hide file tree
Showing 7 changed files with 561 additions and 12 deletions.
14 changes: 14 additions & 0 deletions change.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,20 @@ type ChangeConsumer interface {
End() error
}

// ChangeOrEmptyNotificationConsumer is an extension to the ChangeConsumer
// interface.
type ChangeOrEmptyNotificationConsumer interface {
ChangeConsumer

// Invoked upon empty results from the CDC log associated with the stream of
// the ChangeConsumer. This method is called to acknowledge a query window
// has been executed against the stream and the CDC log is to be considered
// completed as of 'ackTime' param passed.
//
// If this method returns an error, the library will stop with an error.
Empty(ctx context.Context, ackTime gocql.UUID) error
}

// MakeChangeConsumerFactoryFromFunc can be used if your processing is very
// simple, and don't need to keep any per-stream state or save any progress.
// The function supplied as an argument will be shared by all consumers created
Expand Down
296 changes: 296 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
package scyllacdc

import (
"context"
"log"
"os"
"sync"
"testing"
"time"

"github.com/gocql/gocql"
"github.com/scylladb/scylla-cdc-go/internal/testutils"
)

type recordingConsumer struct {
mu *sync.Mutex
emptyTimestamps []gocql.UUID
}

func (rc *recordingConsumer) CreateChangeConsumer(
ctx context.Context,
input CreateChangeConsumerInput,
) (ChangeConsumer, error) {
return rc, nil
}

func (rc *recordingConsumer) Consume(ctx context.Context, change Change) error {
return nil
}

func (rc *recordingConsumer) End() error {
return nil
}

func (rc *recordingConsumer) Empty(ctx context.Context, ackTime gocql.UUID) error {
rc.mu.Lock()
rc.emptyTimestamps = append(rc.emptyTimestamps, ackTime)
rc.mu.Unlock()
return nil
}

func (rc *recordingConsumer) GetTimestamps() []gocql.UUID {
rc.mu.Lock()
ret := append([]gocql.UUID{}, rc.emptyTimestamps...)
rc.mu.Unlock()
return ret
}

func TestConsumerCallsEmptyCallback(t *testing.T) {
consumer := &recordingConsumer{mu: &sync.Mutex{}}

adv := AdvancedReaderConfig{
ChangeAgeLimit: -time.Millisecond,
PostNonEmptyQueryDelay: 100 * time.Millisecond,
PostEmptyQueryDelay: 100 * time.Millisecond,
PostFailedQueryDelay: 100 * time.Millisecond,
QueryTimeWindowSize: 100 * time.Millisecond,
ConfidenceWindowSize: time.Millisecond,
}

// Configure a session
address := testutils.GetSourceClusterContactPoint()
keyspaceName := testutils.CreateUniqueKeyspace(t, address)
cluster := gocql.NewCluster(address)
cluster.Keyspace = keyspaceName
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
session, err := cluster.CreateSession()
if err != nil {
t.Fatal(err)
}
defer session.Close()

execQuery(t, session, "CREATE TABLE tbl (pk int PRIMARY KEY, v int) WITH cdc = {'enabled': true}")

cfg := &ReaderConfig{
Session: session,
ChangeConsumerFactory: consumer,
TableNames: []string{keyspaceName + ".tbl"},
Advanced: adv,
Logger: log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile),
}

startTime := time.Now()

reader, err := NewReader(context.Background(), cfg)
if err != nil {
t.Fatal(err)
}

errC := make(chan error)
go func() { errC <- reader.Run(context.Background()) }()

time.Sleep(time.Second)

endTime := startTime.Add(5 * time.Second)
reader.StopAt(endTime)
if err := <-errC; err != nil {
t.Fatal(err)
}

// All timestamps should be roughly between startTime and endTime
// To adjust for different clock on the scylla node, allow the time
// to exceed one second
acceptableStart := startTime.Add(-time.Second)
acceptableEnd := endTime.Add(time.Second)

timestamps := consumer.GetTimestamps()

if len(timestamps) == 0 {
t.Fatal("no empty event timestamps recorded")
}

for _, tstp := range timestamps {
early := !acceptableStart.Before(tstp.Time())
late := !tstp.Time().Before(acceptableEnd)
if early || late {
t.Errorf("timestamp of empty event %s not in expected range %s, %s",
tstp.Time(), acceptableStart, acceptableEnd)
}
}
}

func TestConsumerResumesWithTableBackedProgressReporter(t *testing.T) {
// Makes sure that the table backed progress consumer is able to resume correctly
// when StartGeneration was called, but no SaveProgress has been called
// so far.

// Configure a session
address := testutils.GetSourceClusterContactPoint()
keyspaceName := testutils.CreateUniqueKeyspace(t, address)
cluster := gocql.NewCluster(address)
cluster.Keyspace = keyspaceName
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
session, err := cluster.CreateSession()
if err != nil {
t.Fatal(err)
}
defer session.Close()

execQuery(t, session, "CREATE TABLE tbl (pk int PRIMARY KEY, v int) WITH cdc = {'enabled': true}")

runWithProgressReporter := func(consumerFactory ChangeConsumerFactory, endTime time.Time, adv AdvancedReaderConfig) {
progressManager, err := NewTableBackedProgressManager(session, "progress", "test")
if err != nil {
t.Fatalf("failed to create progress manager: %v", err)
}

cfg := &ReaderConfig{
Session: session,
ChangeConsumerFactory: consumerFactory,
TableNames: []string{keyspaceName + ".tbl"},
ProgressManager: progressManager,
Advanced: adv,
Logger: log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile),
}

reader, err := NewReader(context.Background(), cfg)
if err != nil {
t.Fatal(err)
}

errC := make(chan error)
go func() { errC <- reader.Run(context.Background()) }()

time.Sleep(500 * time.Millisecond)

reader.StopAt(endTime)
if err := <-errC; err != nil {
t.Fatal(err)
}
}

startTime := time.Now()

adv := AdvancedReaderConfig{
PostNonEmptyQueryDelay: 100 * time.Millisecond,
PostEmptyQueryDelay: 100 * time.Millisecond,
PostFailedQueryDelay: 100 * time.Millisecond,
QueryTimeWindowSize: 100 * time.Millisecond,
ConfidenceWindowSize: time.Millisecond,
}

// Create and start the first consumer which will not call SaveProgress
// Start reading from ~now and stop after two seconds
// We should record that we started now but recorded no progress for
// any stream
adv.ChangeAgeLimit = -time.Millisecond
consumer := &recordingConsumer{mu: &sync.Mutex{}}
runWithProgressReporter(consumer, startTime.Add(2*time.Second), adv)

// Create and start the second consumer
// The progress manager should resume reading from the time
// when the previous run was started, not 1 minute ago
adv.ChangeAgeLimit = 10 * time.Second
consumer = &recordingConsumer{mu: &sync.Mutex{}}
runWithProgressReporter(consumer, startTime.Add(4*time.Second), adv)

// All timestamps should be roughly between startTime and endTime
// To adjust for different clock on the scylla node, allow the time
// to exceed one second
acceptableStart := startTime.Add(-time.Second)
acceptableEnd := startTime.Add(4 * time.Second).Add(time.Second)

timestamps := consumer.GetTimestamps()

if len(timestamps) == 0 {
t.Fatal("no empty event timestamps recorded")
}

for _, tstp := range timestamps {
early := !acceptableStart.Before(tstp.Time())
late := !tstp.Time().Before(acceptableEnd)
if early || late {
t.Errorf("timestamp of empty event %s not in expected range %s, %s",
tstp.Time(), acceptableStart, acceptableEnd)
}
}
}

func TestConsumerHonorsTableTTL(t *testing.T) {
// Make sure that the library doesn't attempt to read earlier than
// the table TTL

// Configure a session
address := testutils.GetSourceClusterContactPoint()
keyspaceName := testutils.CreateUniqueKeyspace(t, address)
cluster := gocql.NewCluster(address)
cluster.Keyspace = keyspaceName
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
session, err := cluster.CreateSession()
if err != nil {
t.Fatal(err)
}
defer session.Close()

// Create a table with a very short TTL
execQuery(t, session, "CREATE TABLE tbl (pk int PRIMARY KEY, v int) WITH cdc = {'enabled': true, 'ttl': 2}")

startTime := time.Now()
endTime := startTime.Add(2 * time.Second)

adv := AdvancedReaderConfig{
PostNonEmptyQueryDelay: 100 * time.Millisecond,
PostEmptyQueryDelay: 100 * time.Millisecond,
PostFailedQueryDelay: 100 * time.Millisecond,
QueryTimeWindowSize: 500 * time.Millisecond,
ConfidenceWindowSize: time.Millisecond,
ChangeAgeLimit: time.Minute, // should be overriden by the TTL
}

consumer := &recordingConsumer{mu: &sync.Mutex{}}

cfg := &ReaderConfig{
Session: session,
ChangeConsumerFactory: consumer,
TableNames: []string{keyspaceName + ".tbl"},
Advanced: adv,
Logger: log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile),
}

reader, err := NewReader(context.Background(), cfg)
if err != nil {
t.Fatal(err)
}

errC := make(chan error)
go func() { errC <- reader.Run(context.Background()) }()

time.Sleep(500 * time.Millisecond)

reader.StopAt(endTime)
if err := <-errC; err != nil {
t.Fatal(err)
}

// All timestamps should be roughly between startTime-TTL and endTime
// To adjust for different clock on the scylla node, allow the time
// to exceed one second
acceptableStart := startTime.Add(-time.Second).Add(-2 * time.Second)
acceptableEnd := startTime.Add(2 * time.Second).Add(time.Second)

timestamps := consumer.GetTimestamps()

if len(timestamps) == 0 {
t.Fatal("no empty event timestamps recorded")
}

for _, tstp := range timestamps {
early := !acceptableStart.Before(tstp.Time())
late := !tstp.Time().Before(acceptableEnd)
if early || late {
t.Errorf("timestamp of empty event %s not in expected range %s, %s",
tstp.Time(), acceptableStart, acceptableEnd)
}
}
}
9 changes: 9 additions & 0 deletions examples/replicator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,15 @@ func (r *DeltaReplicator) End() error {
return nil
}

func (r *DeltaReplicator) Empty(ctx context.Context, ackTime gocql.UUID) error {
log.Printf("Streams [%s]: saw no changes up to %s", r.streamID, ackTime.Time())
r.reporter.Update(ackTime)
return nil
}

// Make sure that DeltaReplicator supports the ChangeOrEmptyNotificationConsumer interface
var _ scyllacdc.ChangeOrEmptyNotificationConsumer = (*DeltaReplicator)(nil)

func (r *DeltaReplicator) processUpdate(ctx context.Context, timestamp int64, c *scyllacdc.ChangeRow) error {
return r.processInsertOrUpdate(ctx, timestamp, false, c)
}
Expand Down
53 changes: 53 additions & 0 deletions progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,30 @@ type ProgressManager interface {
SaveProgress(ctx context.Context, gen time.Time, table string, streamID StreamID, progress Progress) error
}

// ProgressManagerWithStartTime is an extension to the ProgressManager interface.
type ProgressManagerWithStartTime interface {
ProgressManager

// GetApplicationReadStartTime returns the timestamp from which
// the application started reading data. The library uses this timestamp
// as a lower bound to determine where it should start reading. For example,
// if there is no generation saved or there is no progress information
// saved for a stream, reading will be restarted from the given timestamp
// (or higher if the generation timestamp is higher).
//
// If this function returns a zero timeuuid, the library will start reading
// from `time.Now() - AdvancedReaderConfig.ChangeAgeLimit`.
// If this function returns an error, the library will stop with an error.
GetApplicationReadStartTime(ctx context.Context) (time.Time, error)

// SaveApplicationReadStartTime stores information about the timestamp
// from which the application originally started reading data.
// It is called by the library if there was no start timestamp saved.
//
// If this function returns an error, the library will stop with an error.
SaveApplicationReadStartTime(ctx context.Context, startTime time.Time) error
}

// ProgressReporter is a helper object for the ChangeConsumer. It allows
// the consumer to save its progress.
type ProgressReporter struct {
Expand Down Expand Up @@ -242,4 +266,33 @@ func (tbpm *TableBackedProgressManager) SaveProgress(ctx context.Context, gen ti
).Exec()
}

// SaveApplicationReadStartTime is needed to implement the ProgressManagerWithStartTime interface.
func (tbpm *TableBackedProgressManager) SaveApplicationReadStartTime(ctx context.Context, startTime time.Time) error {
// Store information about the timestamp in the `last_timestamp` column,
// in the special partition with "zero generation".
return tbpm.session.Query(
fmt.Sprintf(
"INSERT INTO %s (generation, application_name, table_name, stream_id, last_timestamp) "+
"VALUES (?, ?, ?, ?, ?)",
tbpm.progressTableName,
),
time.Time{}, tbpm.applicationName, "", []byte{}, gocql.MinTimeUUID(startTime),
).Exec()
}

// GetApplicationReadStartTime is needed to implement the ProgressManagerWithStartTime interface.
func (tbpm *TableBackedProgressManager) GetApplicationReadStartTime(ctx context.Context) (time.Time, error) {
// Retrieve the information from the special column
var timestamp gocql.UUID
err := tbpm.session.Query(
fmt.Sprintf("SELECT last_timestamp FROM %s WHERE generation = ? AND application_name = ? AND table_name = ? AND stream_id = ?", tbpm.progressTableName),
time.Time{}, tbpm.applicationName, "", []byte{},
).Scan(&timestamp)
if err != nil && err != gocql.ErrNotFound {
return time.Time{}, err
}
return timestamp.Time(), nil
}

var _ ProgressManager = (*TableBackedProgressManager)(nil)
var _ ProgressManagerWithStartTime = (*TableBackedProgressManager)(nil)
Loading

0 comments on commit e99bcf1

Please sign in to comment.