Skip to content

Commit

Permalink
Merge pull request #4966 from zeeshanakram3/opentelemetry_fixes
Browse files Browse the repository at this point in the history
Opentelemetry fixes
  • Loading branch information
mnaamani authored Nov 28, 2023
2 parents 9c5383d + a91634a commit 2094c4f
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 17 deletions.
32 changes: 28 additions & 4 deletions distributor-node/src/services/parsers/ConfigParserService.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { ValidationError, ValidationService } from '../validation/ValidationService'
import { Config } from '../../types'
import fs from 'fs'
import { JSONSchema4, JSONSchema4TypeName } from 'json-schema'
import _ from 'lodash'
import path from 'path'
import YAML from 'yaml'
import _ from 'lodash'
import configSchema, { bytesizeUnits } from '../../schemas/configSchema'
import { JSONSchema4, JSONSchema4TypeName } from 'json-schema'
import { Config } from '../../types'
import { ValidationError, ValidationService } from '../validation/ValidationService'

const MIN_CACHE_SIZE = '20G'
const MIN_MAX_CACHED_ITEM_SIZE = '1M'
Expand Down Expand Up @@ -131,6 +131,26 @@ export class ConfigParserService {
return String(packageJSON.version)
}

private setEnvVarsFromInputConfig(inputConfig: Record<string, unknown>) {
this.populateEnvVars(inputConfig)
}

private populateEnvVars(config: Record<string, unknown>, prefix = 'JOYSTREAM_DISTRIBUTOR__', path = '') {
Object.entries(config).forEach(([key, value]) => {
// Transform camelCase to snake_case and then to uppercase
const envKey = `${prefix}${path}${this.toSnakeCase(key)}`.toUpperCase()
if (typeof value === 'object' && value !== null && !Array.isArray(value)) {
this.populateEnvVars(value as Record<string, unknown>, prefix, `${path}${this.toSnakeCase(key)}__`)
} else {
process.env[envKey] = String(value)
}
})
}

private toSnakeCase(str: string): string {
return str.replace(/[\w]([A-Z])/g, (m) => `${m[0]}_${m[1]}`).toLowerCase()
}

public parse(): Config {
const { configPath } = this
let inputConfig: Record<string, unknown> = {}
Expand All @@ -149,6 +169,10 @@ export class ConfigParserService {
// Override config with env variables
this.mergeEnvConfigWith(inputConfig)

// Export the JSON/YML config as env vars too so that the pieces of code that does not have
// access to the config object (e.g. opentelemetry module) can read the config values.
this.setEnvVarsFromInputConfig(inputConfig)

// Validate the config
const configJson = this.validator.validate('Config', inputConfig)

Expand Down
1 change: 1 addition & 0 deletions entrypoints/distributor.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# docker entrypoint fot distributor node, to allow running with telemetry
if [[ "$TELEMETRY_ENABLED" = "yes" ]] && [[ $1 = "start" ]]; then
export OTEL_APPLICATION=distributor-node
node --require @joystream/opentelemetry /joystream/distributor-node/bin/run $*
else
/joystream/distributor-node/bin/run $*
Expand Down
1 change: 1 addition & 0 deletions entrypoints/graphql-server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# docker entrypoint fot graphql-server, to allow running with telemetry
if [[ "$TELEMETRY_ENABLED" = "yes" ]]; then
export OTEL_APPLICATION=query-node
yarn workspace query-node-root query-node:start:prod:with-instrumentation $*
else
yarn workspace query-node-root query-node:start:prod:pm2 $*
Expand Down
1 change: 1 addition & 0 deletions entrypoints/storage.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# docker entrypoint fot storage node, to allow running with telemetry
if [[ "$TELEMETRY_ENABLED" = "yes" ]] && [[ $1 = "server" ]]; then
export OTEL_APPLICATION=storage-node
node --require @joystream/opentelemetry /joystream/storage-node/bin/run $*
else
/joystream/storage-node/bin/run $*
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry/.env
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ OTEL_APPLICATION=
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:8200
OTEL_RESOURCE_ATTRIBUTES="service.name=test-service,deployment.environment=development"
OTEL_METRICS_EXPORTER="otlp"

# Optional env vars to configure the opentelemetry exporters
OTEL_MAX_QUEUE_SIZE=8192 # 4 times of default queue size
OTEL_MAX_EXPORT_BATCH_SIZE=1024 # 2 times of default batch size
6 changes: 1 addition & 5 deletions opentelemetry/index.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import { DiagConsoleLogger, DiagLogLevel, diag } from '@opentelemetry/api'
import { NodeSDK } from '@opentelemetry/sdk-node'
import { config } from 'dotenv'
import path from 'path'
import 'dotenv/config'
import { DefaultInstrumentation, DistributorNodeInstrumentation, StorageNodeInstrumentation } from './instrumentations'

// For troubleshooting, set the log level to DiagLogLevel.DEBUG
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.INFO)

async function addInstrumentation() {
// Load env variables
config({ path: path.join(__dirname, '../.env') })

const applicationName = process.env.OTEL_APPLICATION

let instrumentation: NodeSDK
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry/instrumentations/default.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-node'

export const DefaultInstrumentation = new NodeSDK({
spanProcessor: new BatchSpanProcessor(new OTLPTraceExporter(), {
maxQueueSize: 8192 /* 4 times of default queue size */,
maxExportBatchSize: 1024 /* 2 times of default batch size */,
maxQueueSize: parseInt(process.env.OTEL_MAX_QUEUE_SIZE || '8192'),
maxExportBatchSize: parseInt(process.env.OTEL_MAX_EXPORT_BATCH_SIZE || '1024'),
}),
metricReader: new PeriodicExportingMetricReader({
exporter: new OTLPMetricExporter(),
Expand Down
14 changes: 10 additions & 4 deletions opentelemetry/instrumentations/distributor-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,21 @@ import { FsInstrumentation } from '@opentelemetry/instrumentation-fs'
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http'
import { PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics'
import { NodeSDK } from '@opentelemetry/sdk-node'
import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-node'
import { BatchSpanProcessor, Span } from '@opentelemetry/sdk-trace-node'
import { ClientRequest, ServerResponse } from 'http'

/** Opentelemetry Instrumentation for Joystream Distributor Node */

class CustomSpanProcessor extends BatchSpanProcessor {
onStart(span: Span) {
span.setAttribute('nodeId', process.env.JOYSTREAM_DISTRIBUTOR__ID)
}
}

export const DistributorNodeInstrumentation = new NodeSDK({
spanProcessor: new BatchSpanProcessor(new OTLPTraceExporter(), {
maxQueueSize: 8192 /* 4 times of default queue size */,
maxExportBatchSize: 1024 /* 2 times of default batch size */,
spanProcessor: new CustomSpanProcessor(new OTLPTraceExporter(), {
maxQueueSize: parseInt(process.env.OTEL_MAX_QUEUE_SIZE || '8192'),
maxExportBatchSize: parseInt(process.env.OTEL_MAX_EXPORT_BATCH_SIZE || '1024'),
}),
metricReader: new PeriodicExportingMetricReader({
exporter: new OTLPMetricExporter(),
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry/instrumentations/storage-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import { ClientRequest, ServerResponse } from 'http'

export const StorageNodeInstrumentation = new NodeSDK({
spanProcessor: new BatchSpanProcessor(new OTLPTraceExporter(), {
maxQueueSize: 8192 /* 4 times of default queue size */,
maxExportBatchSize: 1024 /* 2 times of default batch size */,
maxQueueSize: parseInt(process.env.OTEL_MAX_QUEUE_SIZE || '8192'),
maxExportBatchSize: parseInt(process.env.OTEL_MAX_EXPORT_BATCH_SIZE || '1024'),
}),
metricReader: new PeriodicExportingMetricReader({
exporter: new OTLPMetricExporter(),
Expand Down

0 comments on commit 2094c4f

Please sign in to comment.