Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add custom spans for tracing in clickhouse datasource #1060

Merged
merged 7 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 42 additions & 7 deletions pkg/gofr/datasource/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package clickhouse
import (
"context"
"errors"
"fmt"
"strings"
"time"

"go.opentelemetry.io/otel/attribute"

"go.opentelemetry.io/otel/trace"

"github.com/ClickHouse/clickhouse-go/v2"
Expand Down Expand Up @@ -120,9 +123,13 @@ func pushDBMetrics(conn Conn, metrics Metrics) {
// Exec should be used for DDL and simple statements.
// It should not be used for larger inserts or query iterations.
func (c *client) Exec(ctx context.Context, query string, args ...any) error {
defer c.sendOperationStats(time.Now(), "Exec", query, args...)
tracedCtx, span := c.addTrace(ctx, "exec", query)

err := c.conn.Exec(tracedCtx, query, args...)

return c.conn.Exec(ctx, query, args...)
defer c.sendOperationStats(time.Now(), "Exec", query, "exec", span, args...)

return err
}

// Select method allows a set of response rows to be marshaled into a slice of structs with a single invocation..
Expand All @@ -139,20 +146,29 @@ func (c *client) Exec(ctx context.Context, query string, args ...any) error {
//
// err = ctx.Clickhouse.Select(ctx, &user, "SELECT * FROM users") .
func (c *client) Select(ctx context.Context, dest any, query string, args ...any) error {
defer c.sendOperationStats(time.Now(), "Select", query, args...)
tracedCtx, span := c.addTrace(ctx, "select", query)

err := c.conn.Select(tracedCtx, dest, query, args...)

return c.conn.Select(ctx, dest, query, args...)
defer c.sendOperationStats(time.Now(), "Select", query, "select", span, args...)

return err
}

// AsyncInsert allows the user to specify whether the client should wait for the server to complete the insert or
// respond once the data has been received.
func (c *client) AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error {
defer c.sendOperationStats(time.Now(), "AsyncInsert", query, args...)
tracedCtx, span := c.addTrace(ctx, "async-insert", query)

err := c.conn.AsyncInsert(tracedCtx, query, wait, args...)

return c.conn.AsyncInsert(ctx, query, wait, args...)
defer c.sendOperationStats(time.Now(), "AsyncInsert", query, "async-insert", span, args...)

return err
}

func (c *client) sendOperationStats(start time.Time, methodType, query string, args ...interface{}) {
func (c *client) sendOperationStats(start time.Time, methodType, query string, method string,
span trace.Span, args ...interface{}) {
duration := time.Since(start).Milliseconds()

c.logger.Debug(&Log{
Expand All @@ -162,6 +178,11 @@ func (c *client) sendOperationStats(start time.Time, methodType, query string, a
Args: args,
})

if span != nil {
defer span.End()
span.SetAttributes(attribute.Int64(fmt.Sprintf("clickhouse.%v.duration", method), duration))
}

c.metrics.RecordHistogram(context.Background(), "app_clickhouse_stats", float64(duration), "hosts", c.config.Hosts,
"database", c.config.Database, "type", getOperationType(query))
}
Expand Down Expand Up @@ -198,3 +219,17 @@ func (c *client) HealthCheck(ctx context.Context) (any, error) {

return &h, nil
}

func (c *client) addTrace(ctx context.Context, method, query string) (context.Context, trace.Span) {
if c.tracer != nil {
contextWithTrace, span := c.tracer.Start(ctx, fmt.Sprintf("clickhouse-%v", method))

span.SetAttributes(
attribute.String("clickhouse.query", query),
)

return contextWithTrace, span
}

return ctx, nil
}
61 changes: 34 additions & 27 deletions pkg/gofr/datasource/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"go.uber.org/mock/gomock"
)

func getClickHouseTestConnection(t *testing.T) (*MockConn, *MockMetrics, client) {
func getClickHouseTestConnection(t *testing.T) (*MockConn, *MockMetrics, *MockLogger, client) {
t.Helper()

ctrl := gomock.NewController(t)
Expand All @@ -30,36 +30,37 @@ func getClickHouseTestConnection(t *testing.T) (*MockConn, *MockMetrics, client)
Database: "test",
}, logger: mockLogger, metrics: mockMetric}

return mockConn, mockMetric, c
return mockConn, mockMetric, mockLogger, c
}

func Test_ClickHouse_ConnectAndMetricRegistrationAndPingFailure(t *testing.T) {
logs := stderrOutputForFunc(func() {
_, mockMetric, _ := getClickHouseTestConnection(t)
mockLogger := NewMockLogger(gomock.NewController(t))
_, mockMetric, _, _ := getClickHouseTestConnection(t)
mockLogger := NewMockLogger(gomock.NewController(t))

cl := New(Config{
Hosts: "localhost:8000",
Username: "user",
Password: "pass",
Database: "test",
})
cl := New(Config{
Hosts: "localhost:8000",
Username: "user",
Password: "pass",
Database: "test",
})

cl.UseLogger(mockLogger)
cl.UseMetrics(mockMetric)
cl.UseLogger(mockLogger)
cl.UseMetrics(mockMetric)

mockMetric.EXPECT().NewHistogram("app_clickhouse_stats", "Response time of Clickhouse queries in milliseconds.", gomock.Any())
mockMetric.EXPECT().NewGauge("app_clickhouse_open_connections", "Number of open Clickhouse connections.")
mockMetric.EXPECT().NewGauge("app_clickhouse_idle_connections", "Number of idle Clickhouse connections.")
mockMetric.EXPECT().SetGauge("app_clickhouse_open_connections", gomock.Any()).AnyTimes()
mockMetric.EXPECT().SetGauge("app_clickhouse_idle_connections", gomock.Any()).AnyTimes()
mockMetric.EXPECT().NewHistogram("app_clickhouse_stats", "Response time of Clickhouse queries in milliseconds.", gomock.Any())
mockMetric.EXPECT().NewGauge("app_clickhouse_open_connections", "Number of open Clickhouse connections.")
mockMetric.EXPECT().NewGauge("app_clickhouse_idle_connections", "Number of idle Clickhouse connections.")
mockMetric.EXPECT().SetGauge("app_clickhouse_open_connections", gomock.Any()).AnyTimes()
mockMetric.EXPECT().SetGauge("app_clickhouse_idle_connections", gomock.Any()).AnyTimes()
mockLogger.EXPECT().Logf("connecting to clickhouse db at %v to database %v", "localhost:8000", "test")
mockLogger.EXPECT().Errorf("ping failed with error %v", gomock.Any())

cl.Connect()
cl.Connect()

time.Sleep(100 * time.Millisecond)
})
time.Sleep(100 * time.Millisecond)

assert.Contains(t, logs, "ping failed with error dial tcp [::1]:8000: connect: connection refused")
assert.True(t, mockLogger.ctrl.Satisfied())
assert.True(t, mockMetric.ctrl.Satisfied())
}

func stderrOutputForFunc(f func()) string {
Expand All @@ -78,7 +79,7 @@ func stderrOutputForFunc(f func()) string {
}

func Test_ClickHouse_HealthUP(t *testing.T) {
mockConn, _, c := getClickHouseTestConnection(t)
mockConn, _, _, c := getClickHouseTestConnection(t)

mockConn.EXPECT().Ping(gomock.Any()).Return(nil)

Expand All @@ -88,7 +89,7 @@ func Test_ClickHouse_HealthUP(t *testing.T) {
}

func Test_ClickHouse_HealthDOWN(t *testing.T) {
mockConn, _, c := getClickHouseTestConnection(t)
mockConn, _, _, c := getClickHouseTestConnection(t)

mockConn.EXPECT().Ping(gomock.Any()).Return(sql.ErrConnDone)

Expand All @@ -100,13 +101,15 @@ func Test_ClickHouse_HealthDOWN(t *testing.T) {
}

func Test_ClickHouse_Exec(t *testing.T) {
mockConn, mockMetric, c := getClickHouseTestConnection(t)
mockConn, mockMetric, mockLogger, c := getClickHouseTestConnection(t)

ctx := context.Background()

mockConn.EXPECT().Exec(ctx, "INSERT INTO users (id, name, age) VALUES (?, ?, ?)",
"8f165e2d-feef-416c-95f6-913ce3172e15", "gofr", "10").Return(nil)

mockLogger.EXPECT().Debug(gomock.Any())

mockMetric.EXPECT().RecordHistogram(ctx, "app_clickhouse_stats", float64(0), "hosts", c.config.Hosts,
"database", c.config.Database, "type", "INSERT")

Expand All @@ -116,7 +119,7 @@ func Test_ClickHouse_Exec(t *testing.T) {
}

func Test_ClickHouse_Select(t *testing.T) {
mockConn, mockMetric, c := getClickHouseTestConnection(t)
mockConn, mockMetric, mockLogger, c := getClickHouseTestConnection(t)

type User struct {
ID string `ch:"id"`
Expand All @@ -130,6 +133,8 @@ func Test_ClickHouse_Select(t *testing.T) {

mockConn.EXPECT().Select(ctx, &user, "SELECT * FROM users").Return(nil)

mockLogger.EXPECT().Debug(gomock.Any())

mockMetric.EXPECT().RecordHistogram(ctx, "app_clickhouse_stats", float64(0), "hosts", c.config.Hosts,
"database", c.config.Database, "type", "SELECT")

Expand All @@ -139,7 +144,7 @@ func Test_ClickHouse_Select(t *testing.T) {
}

func Test_ClickHouse_AsyncInsert(t *testing.T) {
mockConn, mockMetric, c := getClickHouseTestConnection(t)
mockConn, mockMetric, mockLogger, c := getClickHouseTestConnection(t)

ctx := context.Background()

Expand All @@ -149,6 +154,8 @@ func Test_ClickHouse_AsyncInsert(t *testing.T) {
mockMetric.EXPECT().RecordHistogram(ctx, "app_clickhouse_stats", float64(0), "hosts", c.config.Hosts,
"database", c.config.Database, "type", "INSERT")

mockLogger.EXPECT().Debug(gomock.Any())

err := c.AsyncInsert(ctx, "INSERT INTO users (id, name, age) VALUES (?, ?, ?)", true,
"8f165e2d-feef-416c-95f6-913ce3172e15", "user", "10")

Expand Down
5 changes: 5 additions & 0 deletions pkg/gofr/external_db.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gofr

import (
"go.opentelemetry.io/otel"
"gofr.dev/pkg/gofr/container"
"gofr.dev/pkg/gofr/datasource/file"
)
Expand Down Expand Up @@ -52,6 +53,10 @@ func (a *App) AddClickhouse(db container.ClickhouseProvider) {
db.UseLogger(a.Logger())
db.UseMetrics(a.Metrics())

tracer := otel.GetTracerProvider().Tracer("gofr-clickhouse")

db.UseTracer(tracer)

db.Connect()

a.container.Clickhouse = db
Expand Down
2 changes: 2 additions & 0 deletions pkg/gofr/external_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel"
"go.uber.org/mock/gomock"

"gofr.dev/pkg/gofr/container"
Expand Down Expand Up @@ -78,6 +79,7 @@ func TestApp_AddClickhouse(t *testing.T) {

mock.EXPECT().UseLogger(app.Logger())
mock.EXPECT().UseMetrics(app.Metrics())
mock.EXPECT().UseTracer(otel.GetTracerProvider().Tracer("gofr-clickhouse"))
mock.EXPECT().Connect()

app.AddClickhouse(mock)
Expand Down
Loading