From 1a4739c2db236029fa171067500714f74dedc5d2 Mon Sep 17 00:00:00 2001 From: umang01-hash Date: Fri, 27 Sep 2024 15:50:50 +0530 Subject: [PATCH 1/4] add custom spans for tracing in clickhouse datasource --- pkg/gofr/datasource/clickhouse/clickhouse.go | 49 ++++++++++++++++--- .../datasource/clickhouse/clickhouse_test.go | 24 ++++++--- pkg/gofr/external_db.go | 5 ++ 3 files changed, 63 insertions(+), 15 deletions(-) diff --git a/pkg/gofr/datasource/clickhouse/clickhouse.go b/pkg/gofr/datasource/clickhouse/clickhouse.go index 9ab49d2e5..c60b3c23a 100644 --- a/pkg/gofr/datasource/clickhouse/clickhouse.go +++ b/pkg/gofr/datasource/clickhouse/clickhouse.go @@ -3,9 +3,12 @@ package clickhouse import ( "context" "errors" + "fmt" "strings" "time" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/ClickHouse/clickhouse-go/v2" @@ -120,9 +123,13 @@ func pushDBMetrics(conn Conn, metrics Metrics) { // Exec should be used for DDL and simple statements. // It should not be used for larger inserts or query iterations. func (c *client) Exec(ctx context.Context, query string, args ...any) error { - defer c.sendOperationStats(time.Now(), "Exec", query, args...) + tracedCtx, span := c.addTraces(ctx, "exec", query) + + err := c.conn.Exec(tracedCtx, query, args...) - return c.conn.Exec(ctx, query, args...) + defer c.sendOperationStats(time.Now(), "Exec", query, "exec", span, args...) + + return err } // Select method allows a set of response rows to be marshaled into a slice of structs with a single invocation.. @@ -139,20 +146,29 @@ func (c *client) Exec(ctx context.Context, query string, args ...any) error { // // err = ctx.Clickhouse.Select(ctx, &user, "SELECT * FROM users") . func (c *client) Select(ctx context.Context, dest any, query string, args ...any) error { - defer c.sendOperationStats(time.Now(), "Select", query, args...) + tracedCtx, span := c.addTraces(ctx, "select", query) + + err := c.conn.Select(tracedCtx, dest, query, args...) - return c.conn.Select(ctx, dest, query, args...) + defer c.sendOperationStats(time.Now(), "Select", query, "select", span, args...) + + return err } // AsyncInsert allows the user to specify whether the client should wait for the server to complete the insert or // respond once the data has been received. func (c *client) AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error { - defer c.sendOperationStats(time.Now(), "AsyncInsert", query, args...) + tracedCtx, span := c.addTraces(ctx, "async-insert", query) + + err := c.conn.AsyncInsert(tracedCtx, query, wait, args...) - return c.conn.AsyncInsert(ctx, query, wait, args...) + defer c.sendOperationStats(time.Now(), "AsyncInsert", query, "async-insert", span, args...) + + return err } -func (c *client) sendOperationStats(start time.Time, methodType, query string, args ...interface{}) { +func (c *client) sendOperationStats(start time.Time, methodType, query string, method string, + span trace.Span, args ...interface{}) { duration := time.Since(start).Milliseconds() c.logger.Debug(&Log{ @@ -162,6 +178,11 @@ func (c *client) sendOperationStats(start time.Time, methodType, query string, a Args: args, }) + if span != nil { + defer span.End() + span.SetAttributes(attribute.Int64(fmt.Sprintf("clickhouse.%v.duration", method), duration)) + } + c.metrics.RecordHistogram(context.Background(), "app_clickhouse_stats", float64(duration), "hosts", c.config.Hosts, "database", c.config.Database, "type", getOperationType(query)) } @@ -198,3 +219,17 @@ func (c *client) HealthCheck(ctx context.Context) (any, error) { return &h, nil } + +func (c *client) addTraces(ctx context.Context, method, query string) (context.Context, trace.Span) { + if c.tracer != nil { + contextWithTrace, span := c.tracer.Start(ctx, fmt.Sprintf("clickhouse-%v", method)) + + span.SetAttributes( + attribute.String("clickhouse.query", query), + ) + + return contextWithTrace, span + } + + return ctx, nil +} diff --git a/pkg/gofr/datasource/clickhouse/clickhouse_test.go b/pkg/gofr/datasource/clickhouse/clickhouse_test.go index 4bd4861d7..05ad2fedf 100644 --- a/pkg/gofr/datasource/clickhouse/clickhouse_test.go +++ b/pkg/gofr/datasource/clickhouse/clickhouse_test.go @@ -14,7 +14,7 @@ import ( "go.uber.org/mock/gomock" ) -func getClickHouseTestConnection(t *testing.T) (*MockConn, *MockMetrics, client) { +func getClickHouseTestConnection(t *testing.T) (*MockConn, *MockMetrics, *MockLogger, client) { t.Helper() ctrl := gomock.NewController(t) @@ -30,12 +30,12 @@ func getClickHouseTestConnection(t *testing.T) (*MockConn, *MockMetrics, client) Database: "test", }, logger: mockLogger, metrics: mockMetric} - return mockConn, mockMetric, c + return mockConn, mockMetric, mockLogger, c } func Test_ClickHouse_ConnectAndMetricRegistrationAndPingFailure(t *testing.T) { logs := stderrOutputForFunc(func() { - _, mockMetric, _ := getClickHouseTestConnection(t) + _, mockMetric, _, _ := getClickHouseTestConnection(t) mockLogger := NewMockLogger(gomock.NewController(t)) cl := New(Config{ @@ -53,6 +53,8 @@ func Test_ClickHouse_ConnectAndMetricRegistrationAndPingFailure(t *testing.T) { mockMetric.EXPECT().NewGauge("app_clickhouse_idle_connections", "Number of idle Clickhouse connections.") mockMetric.EXPECT().SetGauge("app_clickhouse_open_connections", gomock.Any()).AnyTimes() mockMetric.EXPECT().SetGauge("app_clickhouse_idle_connections", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Logf("connecting to clickhouse db at %v to database %v", "localhost:8000", "test") + mockLogger.EXPECT().Errorf("ping failed with error %v", gomock.Any()) cl.Connect() @@ -78,7 +80,7 @@ func stderrOutputForFunc(f func()) string { } func Test_ClickHouse_HealthUP(t *testing.T) { - mockConn, _, c := getClickHouseTestConnection(t) + mockConn, _, _, c := getClickHouseTestConnection(t) mockConn.EXPECT().Ping(gomock.Any()).Return(nil) @@ -88,7 +90,7 @@ func Test_ClickHouse_HealthUP(t *testing.T) { } func Test_ClickHouse_HealthDOWN(t *testing.T) { - mockConn, _, c := getClickHouseTestConnection(t) + mockConn, _, _, c := getClickHouseTestConnection(t) mockConn.EXPECT().Ping(gomock.Any()).Return(sql.ErrConnDone) @@ -100,13 +102,15 @@ func Test_ClickHouse_HealthDOWN(t *testing.T) { } func Test_ClickHouse_Exec(t *testing.T) { - mockConn, mockMetric, c := getClickHouseTestConnection(t) + mockConn, mockMetric, mockLogger, c := getClickHouseTestConnection(t) ctx := context.Background() mockConn.EXPECT().Exec(ctx, "INSERT INTO users (id, name, age) VALUES (?, ?, ?)", "8f165e2d-feef-416c-95f6-913ce3172e15", "gofr", "10").Return(nil) + mockLogger.EXPECT().Debug(gomock.Any()) + mockMetric.EXPECT().RecordHistogram(ctx, "app_clickhouse_stats", float64(0), "hosts", c.config.Hosts, "database", c.config.Database, "type", "INSERT") @@ -116,7 +120,7 @@ func Test_ClickHouse_Exec(t *testing.T) { } func Test_ClickHouse_Select(t *testing.T) { - mockConn, mockMetric, c := getClickHouseTestConnection(t) + mockConn, mockMetric, mockLogger, c := getClickHouseTestConnection(t) type User struct { ID string `ch:"id"` @@ -130,6 +134,8 @@ func Test_ClickHouse_Select(t *testing.T) { mockConn.EXPECT().Select(ctx, &user, "SELECT * FROM users").Return(nil) + mockLogger.EXPECT().Debug(gomock.Any()) + mockMetric.EXPECT().RecordHistogram(ctx, "app_clickhouse_stats", float64(0), "hosts", c.config.Hosts, "database", c.config.Database, "type", "SELECT") @@ -139,7 +145,7 @@ func Test_ClickHouse_Select(t *testing.T) { } func Test_ClickHouse_AsyncInsert(t *testing.T) { - mockConn, mockMetric, c := getClickHouseTestConnection(t) + mockConn, mockMetric, mockLogger, c := getClickHouseTestConnection(t) ctx := context.Background() @@ -149,6 +155,8 @@ func Test_ClickHouse_AsyncInsert(t *testing.T) { mockMetric.EXPECT().RecordHistogram(ctx, "app_clickhouse_stats", float64(0), "hosts", c.config.Hosts, "database", c.config.Database, "type", "INSERT") + mockLogger.EXPECT().Debug(gomock.Any()) + err := c.AsyncInsert(ctx, "INSERT INTO users (id, name, age) VALUES (?, ?, ?)", true, "8f165e2d-feef-416c-95f6-913ce3172e15", "user", "10") diff --git a/pkg/gofr/external_db.go b/pkg/gofr/external_db.go index f5c64b4b5..ba20f4bbf 100644 --- a/pkg/gofr/external_db.go +++ b/pkg/gofr/external_db.go @@ -1,6 +1,7 @@ package gofr import ( + "go.opentelemetry.io/otel" "gofr.dev/pkg/gofr/container" "gofr.dev/pkg/gofr/datasource/file" ) @@ -42,6 +43,10 @@ func (a *App) AddClickhouse(db container.ClickhouseProvider) { db.UseLogger(a.Logger()) db.UseMetrics(a.Metrics()) + tracer := otel.GetTracerProvider().Tracer("gofr-clickhouse") + + db.UseTracer(tracer) + db.Connect() a.container.Clickhouse = db From 8133fadc21a9884e850b526d92465c13ae82a98f Mon Sep 17 00:00:00 2001 From: umang01-hash Date: Fri, 27 Sep 2024 15:56:38 +0530 Subject: [PATCH 2/4] fix tests --- .../datasource/clickhouse/clickhouse_test.go | 51 +++++++++---------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/pkg/gofr/datasource/clickhouse/clickhouse_test.go b/pkg/gofr/datasource/clickhouse/clickhouse_test.go index 05ad2fedf..51f6cf38f 100644 --- a/pkg/gofr/datasource/clickhouse/clickhouse_test.go +++ b/pkg/gofr/datasource/clickhouse/clickhouse_test.go @@ -34,34 +34,33 @@ func getClickHouseTestConnection(t *testing.T) (*MockConn, *MockMetrics, *MockLo } func Test_ClickHouse_ConnectAndMetricRegistrationAndPingFailure(t *testing.T) { - logs := stderrOutputForFunc(func() { - _, mockMetric, _, _ := getClickHouseTestConnection(t) - mockLogger := NewMockLogger(gomock.NewController(t)) - - cl := New(Config{ - Hosts: "localhost:8000", - Username: "user", - Password: "pass", - Database: "test", - }) - - cl.UseLogger(mockLogger) - cl.UseMetrics(mockMetric) - - mockMetric.EXPECT().NewHistogram("app_clickhouse_stats", "Response time of Clickhouse queries in milliseconds.", gomock.Any()) - mockMetric.EXPECT().NewGauge("app_clickhouse_open_connections", "Number of open Clickhouse connections.") - mockMetric.EXPECT().NewGauge("app_clickhouse_idle_connections", "Number of idle Clickhouse connections.") - mockMetric.EXPECT().SetGauge("app_clickhouse_open_connections", gomock.Any()).AnyTimes() - mockMetric.EXPECT().SetGauge("app_clickhouse_idle_connections", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Logf("connecting to clickhouse db at %v to database %v", "localhost:8000", "test") - mockLogger.EXPECT().Errorf("ping failed with error %v", gomock.Any()) - - cl.Connect() - - time.Sleep(100 * time.Millisecond) + _, mockMetric, _, _ := getClickHouseTestConnection(t) + mockLogger := NewMockLogger(gomock.NewController(t)) + + cl := New(Config{ + Hosts: "localhost:8000", + Username: "user", + Password: "pass", + Database: "test", }) - assert.Contains(t, logs, "ping failed with error dial tcp [::1]:8000: connect: connection refused") + cl.UseLogger(mockLogger) + cl.UseMetrics(mockMetric) + + mockMetric.EXPECT().NewHistogram("app_clickhouse_stats", "Response time of Clickhouse queries in milliseconds.", gomock.Any()) + mockMetric.EXPECT().NewGauge("app_clickhouse_open_connections", "Number of open Clickhouse connections.") + mockMetric.EXPECT().NewGauge("app_clickhouse_idle_connections", "Number of idle Clickhouse connections.") + mockMetric.EXPECT().SetGauge("app_clickhouse_open_connections", gomock.Any()).AnyTimes() + mockMetric.EXPECT().SetGauge("app_clickhouse_idle_connections", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Logf("connecting to clickhouse db at %v to database %v", "localhost:8000", "test") + mockLogger.EXPECT().Errorf("ping failed with error %v", gomock.Any()) + + cl.Connect() + + time.Sleep(100 * time.Millisecond) + + assert.True(t, mockLogger.ctrl.Satisfied()) + assert.True(t, mockMetric.ctrl.Satisfied()) } func stderrOutputForFunc(f func()) string { From 101da140e100396e306e5774b50db6fca6897fe5 Mon Sep 17 00:00:00 2001 From: umang01-hash Date: Fri, 27 Sep 2024 15:57:37 +0530 Subject: [PATCH 3/4] rename method from addTraces to addTrace --- pkg/gofr/datasource/clickhouse/clickhouse.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/gofr/datasource/clickhouse/clickhouse.go b/pkg/gofr/datasource/clickhouse/clickhouse.go index c60b3c23a..4f24ee5ac 100644 --- a/pkg/gofr/datasource/clickhouse/clickhouse.go +++ b/pkg/gofr/datasource/clickhouse/clickhouse.go @@ -123,7 +123,7 @@ func pushDBMetrics(conn Conn, metrics Metrics) { // Exec should be used for DDL and simple statements. // It should not be used for larger inserts or query iterations. func (c *client) Exec(ctx context.Context, query string, args ...any) error { - tracedCtx, span := c.addTraces(ctx, "exec", query) + tracedCtx, span := c.addTrace(ctx, "exec", query) err := c.conn.Exec(tracedCtx, query, args...) @@ -146,7 +146,7 @@ func (c *client) Exec(ctx context.Context, query string, args ...any) error { // // err = ctx.Clickhouse.Select(ctx, &user, "SELECT * FROM users") . func (c *client) Select(ctx context.Context, dest any, query string, args ...any) error { - tracedCtx, span := c.addTraces(ctx, "select", query) + tracedCtx, span := c.addTrace(ctx, "select", query) err := c.conn.Select(tracedCtx, dest, query, args...) @@ -158,7 +158,7 @@ func (c *client) Select(ctx context.Context, dest any, query string, args ...any // AsyncInsert allows the user to specify whether the client should wait for the server to complete the insert or // respond once the data has been received. func (c *client) AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error { - tracedCtx, span := c.addTraces(ctx, "async-insert", query) + tracedCtx, span := c.addTrace(ctx, "async-insert", query) err := c.conn.AsyncInsert(tracedCtx, query, wait, args...) @@ -220,7 +220,7 @@ func (c *client) HealthCheck(ctx context.Context) (any, error) { return &h, nil } -func (c *client) addTraces(ctx context.Context, method, query string) (context.Context, trace.Span) { +func (c *client) addTrace(ctx context.Context, method, query string) (context.Context, trace.Span) { if c.tracer != nil { contextWithTrace, span := c.tracer.Start(ctx, fmt.Sprintf("clickhouse-%v", method)) From 937f938b88e00caa5f18d7faedd5065c979013e8 Mon Sep 17 00:00:00 2001 From: umang01-hash Date: Fri, 27 Sep 2024 16:03:28 +0530 Subject: [PATCH 4/4] fix failing test --- pkg/gofr/external_db_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/gofr/external_db_test.go b/pkg/gofr/external_db_test.go index ae5a4952c..b2f2adb06 100644 --- a/pkg/gofr/external_db_test.go +++ b/pkg/gofr/external_db_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel" "go.uber.org/mock/gomock" "gofr.dev/pkg/gofr/container" @@ -78,6 +79,7 @@ func TestApp_AddClickhouse(t *testing.T) { mock.EXPECT().UseLogger(app.Logger()) mock.EXPECT().UseMetrics(app.Metrics()) + mock.EXPECT().UseTracer(otel.GetTracerProvider().Tracer("gofr-clickhouse")) mock.EXPECT().Connect() app.AddClickhouse(mock)