Skip to content

Commit

Permalink
pglock: add support for leveled logs
Browse files Browse the repository at this point in the history
Closes #55
  • Loading branch information
ucirello committed Aug 8, 2023
1 parent ddd19ad commit b2e4951
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 17 deletions.
33 changes: 20 additions & 13 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"database/sql"
"errors"
"fmt"
"io/ioutil"
"io"
"log"
"math/rand"
"net"
Expand Down Expand Up @@ -55,7 +55,7 @@ type Client struct {
tableName string
leaseDuration time.Duration
heartbeatFrequency time.Duration
log Logger
log LevelLogger
owner string
}

Expand Down Expand Up @@ -85,7 +85,7 @@ func newClient(db *sql.DB, opts ...ClientOption) (_ *Client, err error) {
tableName: DefaultTableName,
leaseDuration: DefaultLeaseDuration,
heartbeatFrequency: DefaultHeartbeatFrequency,
log: log.New(ioutil.Discard, "", 0),
log: &flatLogger{log.New(io.Discard, "", 0)},
owner: fmt.Sprintf("pglock-%v", rand.Int()),
}
for _, opt := range opts {
Expand Down Expand Up @@ -185,18 +185,18 @@ func (c *Client) AcquireContext(ctx context.Context, name string, opts ...LockOp
default:
err := c.retry(ctx, func() error { return c.tryAcquire(ctx, l) })
if l.failIfLocked && err == ErrNotAcquired {
c.log.Println("not acquired, exit")
c.log.Debug("not acquired, exit")
return l, err
} else if err == ErrNotAcquired {
c.log.Println("not acquired, wait:", l.leaseDuration)
c.log.Debug("not acquired, wait:", l.leaseDuration)
select {
case <-time.After(l.leaseDuration):
case <-ctx.Done():
return l, err
}
continue
} else if err != nil {
c.log.Println("error:", err)
c.log.Error("error:", err)
return nil, err
}
return l, nil
Expand Down Expand Up @@ -232,9 +232,9 @@ func (c *Client) storeAcquire(ctx context.Context, l *Lock) error {
if err != nil {
return typedError(err, "cannot create transaction for lock acquisition")
}
c.log.Println("storeAcquire in", l.name, rvn, l.data, l.recordVersionNumber)
c.log.Debug("storeAcquire in", l.name, rvn, l.data, l.recordVersionNumber)
defer func() {
c.log.Println("storeAcquire out", l.name, rvn, l.data, l.recordVersionNumber)
c.log.Debug("storeAcquire out", l.name, rvn, l.data, l.recordVersionNumber)
}()
_, err = tx.ExecContext(ctx, `
INSERT INTO `+c.tableName+`
Expand Down Expand Up @@ -361,13 +361,13 @@ func (c *Client) storeRelease(ctx context.Context, l *Lock) error {

func (c *Client) heartbeat(ctx context.Context, l *Lock) {
defer l.heartbeatWG.Done()
c.log.Println("heartbeat started", l.name)
defer c.log.Println("heartbeat stopped", l.name)
c.log.Debug("heartbeat started", l.name)
defer c.log.Debug("heartbeat stopped", l.name)
for {
if err := ctx.Err(); err != nil {
return
} else if err := c.SendHeartbeat(ctx, l); err != nil {
defer c.log.Println("heartbeat missed", err)
defer c.log.Error("heartbeat missed", err)
return
}
select {
Expand Down Expand Up @@ -459,7 +459,7 @@ func (c *Client) GetContext(ctx context.Context, name string) (*Lock, error) {
return err
})
if notExist := (&NotExistError{}); err != nil && errors.As(err, &notExist) {
c.log.Println("missing lock entry:", err)
c.log.Error("missing lock entry:", err)
}
return l, err
}
Expand Down Expand Up @@ -506,7 +506,7 @@ func (c *Client) retry(ctx context.Context, f func() error) error {
if failedPrecondition := (&FailedPreconditionError{}); err == nil || !errors.As(err, &failedPrecondition) {
break
}
c.log.Println("bad transaction, retrying:", err)
c.log.Debug("bad transaction, retrying:", err)
select {
case <-time.After(retryPeriod):
case <-ctx.Done():
Expand Down Expand Up @@ -559,7 +559,14 @@ type ClientOption func(*Client)

// WithLogger injects a logger into the client, so its internals can be
// recorded.
// Deprecated. Use WithLevelLogger instead.
func WithLogger(l Logger) ClientOption {
return func(c *Client) { c.log = &flatLogger{l} }
}

// WithLevelLogger injects a logger into the client, so its internals can be
// recorded.
func WithLevelLogger(l LevelLogger) ClientOption {
return func(c *Client) { c.log = l }
}

Expand Down
6 changes: 3 additions & 3 deletions client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"database/sql"
"errors"
"fmt"
"io/ioutil"
"io"
"log"
"net"
"testing"
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestTypedError(t *testing.T) {
func TestRetry(t *testing.T) {
t.Run("type check", func(t *testing.T) {
c := &Client{
log: &testLogger{t},
log: &flatLogger{&testLogger{t}},
}
errs := []error{
&FailedPreconditionError{errors.New("failed precondition")},
Expand All @@ -73,7 +73,7 @@ func TestRetry(t *testing.T) {
})
t.Run("max retries", func(t *testing.T) {
c := &Client{
log: log.New(ioutil.Discard, "", 0),
log: &flatLogger{log.New(io.Discard, "", 0)},
}
var retries int
err := c.retry(context.Background(), func() error {
Expand Down
2 changes: 1 addition & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,7 @@ func TestOverflowSequence(t *testing.T) {
name := randStr()
c, err := pglock.New(
db,
pglock.WithLogger(&testLogger{t}),
pglock.WithLevelLogger(&testLevelLogger{t}),
pglock.WithCustomTable(tableName),
)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,17 @@ func (t *testLogger) Println(v ...interface{}) {
type discardLogging struct{}

func (t *discardLogging) Println(...interface{}) {}

type testLevelLogger struct {
t testing.TB
}

func (t *testLevelLogger) Debug(msg string, args ...any) {
t.t.Helper()
t.t.Logf("DEBUG: "+msg, args...)
}

func (t *testLevelLogger) Error(msg string, args ...any) {
t.t.Helper()
t.t.Logf("ERROR: "+msg, args...)
}
24 changes: 24 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,31 @@ limitations under the License.

package pglock

import "fmt"

// Logger is used for internal inspection of the lock client.
type Logger interface {
Println(v ...interface{})
}

// LevelLogger is used for internal inspection of the lock client.
type LevelLogger interface {
Debug(msg string, args ...any)
Error(msg string, args ...any)
}

type flatLogger struct {
l Logger
}

func (fl *flatLogger) Debug(msg string, args ...any) {
fl.l.Println(fmt.Sprintf(msg, args...))
}

func (fl *flatLogger) Info(msg string, args ...any) {
fl.l.Println(fmt.Sprintf(msg, args...))
}

func (fl *flatLogger) Error(msg string, args ...any) {
fl.l.Println(fmt.Sprintf(msg, args...))
}

0 comments on commit b2e4951

Please sign in to comment.