From 9a6457658aaf07ef0bbe15313efc11068cc0e2bc Mon Sep 17 00:00:00 2001 From: Forrest <6546409+frrist@users.noreply.github.com> Date: Sat, 28 Sep 2024 00:56:51 -0700 Subject: [PATCH] feat: add execution analytics (#4532) - also ensure JobID is called job_id in all events for easier sql --------- Co-authored-by: frrist --- pkg/analytics/execution_compute_message.go | 17 ++ pkg/analytics/executions.go | 117 ++++++++++++++ pkg/analytics/job_submit.go | 108 +++++++++++++ pkg/analytics/job_terminal.go | 109 +++++++++++++ pkg/analytics/models.go | 177 --------------------- pkg/analytics/resources.go | 14 ++ pkg/jobstore/boltdb/store.go | 8 + pkg/orchestrator/endpoint.go | 2 +- 8 files changed, 374 insertions(+), 178 deletions(-) create mode 100644 pkg/analytics/execution_compute_message.go create mode 100644 pkg/analytics/executions.go create mode 100644 pkg/analytics/job_submit.go create mode 100644 pkg/analytics/job_terminal.go create mode 100644 pkg/analytics/resources.go diff --git a/pkg/analytics/execution_compute_message.go b/pkg/analytics/execution_compute_message.go new file mode 100644 index 0000000000..6f0aaa6040 --- /dev/null +++ b/pkg/analytics/execution_compute_message.go @@ -0,0 +1,17 @@ +package analytics + +const ComputeMessageExecutionEventType = "bacalhau.execution_v1.compute_message" + +type ExecutionComputeMessage struct { + JobID string `json:"job_id,omitempty"` + ExecutionID string `json:"execution_id,omitempty"` + ComputeMessage string `json:"compute_message,omitempty"` +} + +func NewComputeMessageExecutionEvent(jobID string, executionID string, computeMessage string) *Event { + return NewEvent(ComputeMessageExecutionEventType, ExecutionComputeMessage{ + JobID: jobID, + ExecutionID: executionID, + ComputeMessage: computeMessage, + }) +} diff --git a/pkg/analytics/executions.go b/pkg/analytics/executions.go new file mode 100644 index 0000000000..ea16289a18 --- /dev/null +++ b/pkg/analytics/executions.go @@ -0,0 +1,117 @@ +package analytics + +import ( + "time" + + "github.com/bacalhau-project/bacalhau/pkg/models" +) + +const TerminalExecutionEventType = "bacalhau.execution_v1.terminal" +const CreatedExecutionEventType = "bacalhau.execution_v1.create" + +type ExecutionEvent struct { + JobID string `json:"job_id,omitempty"` + ExecutionID string `json:"execution_id,omitempty"` + EvalID string `json:"evaluation_id,omitempty"` + + NameSet bool `json:"name_set,omitempty"` + NodeNameHash string `json:"node_name_hash,omitempty"` + NamespaceHash string `json:"namespace_hash,omitempty"` + + Resources map[string]Resource `json:"resources,omitempty"` + + DesiredState string `json:"desired_state,omitempty"` + ComputeState string `json:"compute_state,omitempty"` + + PublishedResultType string `json:"publisher_type,omitempty"` + + RunResultStdoutTruncated bool `json:"run_result_stdout_truncated,omitempty"` + RunResultStderrTruncated bool `json:"run_result_stderr_truncated,omitempty"` + RunResultExitCode int `json:"run_result_exit_code,omitempty"` + + PreviousExecution string `json:"previous_execution,omitempty"` + NextExecution string `json:"next_execution,omitempty"` + FollowupEvalID string `json:"followup_eval_id,omitempty"` + + Revision uint64 `json:"revision,omitempty"` + CreateTime time.Time `json:"create_time,omitempty"` + ModifyTime time.Time `json:"modify_time,omitempty"` +} + +func NewCreatedExecutionEvent(e models.Execution) *Event { + return NewEvent(CreatedExecutionEventType, newExecutionEvent(e)) +} + +func NewTerminalExecutionEvent(e models.Execution) *Event { + return NewEvent(TerminalExecutionEventType, newExecutionEvent(e)) +} + +func newExecutionEvent(e models.Execution) ExecutionEvent { + resources := make(map[string]Resource, len(e.AllocatedResources.Tasks)) + for taskName, taskResources := range e.AllocatedResources.Tasks { + gpuTypes := make([]GPUInfo, len(taskResources.GPUs)) + for i, gpu := range taskResources.GPUs { + gpuTypes[i] = GPUInfo{ + Name: gpu.Name, + Vendor: string(gpu.Vendor), + } + } + // we hash the taskName here for privacy + resources[hashString(taskName)] = Resource{ + CPUUnits: taskResources.CPU, + MemoryBytes: taskResources.Memory, + DiskBytes: taskResources.Disk, + GPUCount: taskResources.GPU, + GPUTypes: gpuTypes, + } + } + + var ( + stdoutTruncated bool + stderrTruncated bool + exitCode int + ) + if e.RunOutput != nil { + stdoutTruncated = e.RunOutput.StdoutTruncated + stderrTruncated = e.RunOutput.StderrTruncated + exitCode = e.RunOutput.ExitCode + } + + return ExecutionEvent{ + // ID fields. + JobID: e.JobID, + ExecutionID: e.ID, + EvalID: e.EvalID, + + // name fields. + NameSet: e.Name == "", + NodeNameHash: hashString(e.NodeID), + NamespaceHash: hashString(e.Namespace), + + // resources of tasks in execution. + // NB: currently this isn't populated when creating executions. + Resources: resources, + + // states. + DesiredState: e.DesiredState.StateType.String(), + ComputeState: e.ComputeState.StateType.String(), + + // publisher if any. + PublishedResultType: e.PublishedResult.Type, + + // run results if any. + RunResultStdoutTruncated: stdoutTruncated, + RunResultStderrTruncated: stderrTruncated, + RunResultExitCode: exitCode, + + // IDs of related models. + PreviousExecution: e.PreviousExecution, + NextExecution: e.NextExecution, + FollowupEvalID: e.FollowupEvalID, + + // versioning and time. + Revision: e.Revision, + CreateTime: time.Unix(0, e.CreateTime).UTC(), + ModifyTime: time.Unix(0, e.ModifyTime).UTC(), + } +} diff --git a/pkg/analytics/job_submit.go b/pkg/analytics/job_submit.go new file mode 100644 index 0000000000..20ac49b891 --- /dev/null +++ b/pkg/analytics/job_submit.go @@ -0,0 +1,108 @@ +package analytics + +import ( + "time" + + "github.com/bacalhau-project/bacalhau/pkg/models" +) + +// SubmitJobEventType is the event type for a job that has been submitted to an orchestrator. +const SubmitJobEventType = "bacalhau.job_v1.submit" + +type SubmitJobEvent struct { + JobID string `json:"job_id"` + + NameSet bool `json:"name_set"` + NamespaceHash string `json:"namespace_hash"` + + Type string `json:"type"` + Count int `json:"count"` + LabelsCount int `json:"labels_count"` + MetaCount int `json:"meta_count"` + + Version uint64 `json:"version"` + Revision uint64 `json:"revision"` + CreateTime time.Time `json:"create_time"` + ModifyTime time.Time `json:"modify_time"` + + TaskNameHash string `json:"task_name_hash"` + TaskEngineType string `json:"task_engine_type"` + TaskPublisherType string `json:"task_publisher_type"` + TaskEnvVarCount int `json:"task_env_var_count"` + TaskMetaCount int `json:"task_meta_count"` + TaskInputSourceTypes []string `json:"task_input_source_types"` + TaskResultPathCount int `json:"task_result_path_count"` + + Resources Resource `json:"resources,omitempty"` + + TaskNetworkType string `json:"task_network_type"` + TaskDomainsCount int `json:"task_domains_count"` + TaskExecutionTimeout int64 `json:"task_execution_timeout"` + TaskQueueTimeout int64 `json:"task_queue_timeout"` + TaskTotalTimeout int64 `json:"task_total_timeout"` + + Warnings []string `json:"warnings"` + Error string `json:"error"` +} + +func NewSubmitJobEvent(j models.Job, warnings ...string) SubmitJobEvent { + t := j.Task() + taskInputTypes := make([]string, len(t.InputSources)) + for i, s := range t.InputSources { + taskInputTypes[i] = s.Source.Type + } + // if we can't parse the resources use zero + var resource Resource + taskResources, err := t.ResourcesConfig.ToResources() + if err != nil { + resource = Resource{ + CPUUnits: 0, + MemoryBytes: 0, + DiskBytes: 0, + GPUCount: 0, + GPUTypes: nil, + } + } else { + gpuTypes := make([]GPUInfo, len(taskResources.GPUs)) + for i, gpu := range taskResources.GPUs { + gpuTypes[i] = GPUInfo{ + Name: gpu.Name, + Vendor: string(gpu.Vendor), + } + } + resource = Resource{ + CPUUnits: taskResources.CPU, + MemoryBytes: taskResources.Memory, + DiskBytes: taskResources.Disk, + GPUCount: taskResources.GPU, + GPUTypes: gpuTypes, + } + } + return SubmitJobEvent{ + JobID: j.ID, + NameSet: j.ID != j.Name, + NamespaceHash: hashString(j.Namespace), + Type: j.Type, + Count: j.Count, + LabelsCount: len(j.Labels), + MetaCount: len(j.Meta), + Version: j.Version, + Revision: j.Revision, + CreateTime: time.Unix(0, j.CreateTime).UTC(), + ModifyTime: time.Unix(0, j.ModifyTime).UTC(), + TaskNameHash: hashString(t.Name), + TaskEngineType: t.Engine.Type, + TaskPublisherType: t.Publisher.Type, + TaskEnvVarCount: len(t.Env), + TaskMetaCount: len(t.Meta), + TaskInputSourceTypes: taskInputTypes, + TaskResultPathCount: len(t.ResultPaths), + Resources: resource, + TaskNetworkType: t.Network.Type.String(), + TaskDomainsCount: len(t.Network.Domains), + TaskExecutionTimeout: t.Timeouts.ExecutionTimeout, + TaskQueueTimeout: t.Timeouts.QueueTimeout, + TaskTotalTimeout: t.Timeouts.TotalTimeout, + Warnings: warnings, + } +} diff --git a/pkg/analytics/job_terminal.go b/pkg/analytics/job_terminal.go new file mode 100644 index 0000000000..492c31384a --- /dev/null +++ b/pkg/analytics/job_terminal.go @@ -0,0 +1,109 @@ +package analytics + +import ( + "time" + + "github.com/bacalhau-project/bacalhau/pkg/models" +) + +// TerminalJobEventType is the event type for a job that has reached a terminal state. +const TerminalJobEventType = "bacalhau.job_v1.terminal" + +type JobTerminalEvent struct { + JobID string `json:"job_id"` + + NameSet bool `json:"name_set"` + NamespaceHash string `json:"namespace_hash"` + + Type string `json:"type"` + Count int `json:"count"` + LabelsCount int `json:"labels_count"` + MetaCount int `json:"meta_count"` + + State string `json:"state"` + + Version uint64 `json:"version"` + Revision uint64 `json:"revision"` + CreateTime time.Time `json:"create_time"` + ModifyTime time.Time `json:"modify_time"` + + TaskNameHash string `json:"task_name_hash"` + TaskEngineType string `json:"task_engine_type"` + TaskPublisherType string `json:"task_publisher_type"` + TaskEnvVarCount int `json:"task_env_var_count"` + TaskMetaCount int `json:"task_meta_count"` + TaskInputSourceTypes []string `json:"task_input_source_types"` + TaskResultPathCount int `json:"task_result_path_count"` + + Resources Resource `json:"resources,omitempty"` + + TaskNetworkType string `json:"task_network_type"` + TaskDomainsCount int `json:"task_domains_count"` + TaskExecutionTimeout int64 `json:"task_execution_timeout"` + TaskQueueTimeout int64 `json:"task_queue_timeout"` + TaskTotalTimeout int64 `json:"task_total_timeout"` +} + +func NewJobTerminalEvent(j models.Job) *Event { + t := j.Task() + taskInputTypes := make([]string, len(t.InputSources)) + for i, s := range t.InputSources { + taskInputTypes[i] = s.Source.Type + } + // if we can't parse the resources use zero + var resource Resource + taskResources, err := t.ResourcesConfig.ToResources() + if err != nil { + resource = Resource{ + CPUUnits: 0, + MemoryBytes: 0, + DiskBytes: 0, + GPUCount: 0, + GPUTypes: nil, + } + } else { + gpuTypes := make([]GPUInfo, len(taskResources.GPUs)) + for i, gpu := range taskResources.GPUs { + gpuTypes[i] = GPUInfo{ + Name: gpu.Name, + Vendor: string(gpu.Vendor), + } + } + resource = Resource{ + CPUUnits: taskResources.CPU, + MemoryBytes: taskResources.Memory, + DiskBytes: taskResources.Disk, + GPUCount: taskResources.GPU, + GPUTypes: gpuTypes, + } + } + terminalJobEvent := JobTerminalEvent{ + JobID: j.ID, + NameSet: j.ID != j.Name, + NamespaceHash: hashString(j.Namespace), + Type: j.Type, + Count: j.Count, + LabelsCount: len(j.Labels), + MetaCount: len(j.Meta), + State: j.State.StateType.String(), + Version: j.Version, + Revision: j.Revision, + CreateTime: time.Unix(0, j.CreateTime).UTC(), + ModifyTime: time.Unix(0, j.ModifyTime).UTC(), + TaskNameHash: hashString(t.Name), + TaskEngineType: t.Engine.Type, + TaskPublisherType: t.Publisher.Type, + TaskEnvVarCount: len(t.Env), + TaskMetaCount: len(t.Meta), + TaskInputSourceTypes: taskInputTypes, + TaskResultPathCount: len(t.ResultPaths), + Resources: resource, + TaskNetworkType: t.Network.Type.String(), + TaskDomainsCount: len(t.Network.Domains), + TaskExecutionTimeout: t.Timeouts.ExecutionTimeout, + TaskQueueTimeout: t.Timeouts.QueueTimeout, + TaskTotalTimeout: t.Timeouts.TotalTimeout, + } + + return NewEvent(TerminalJobEventType, terminalJobEvent) +} diff --git a/pkg/analytics/models.go b/pkg/analytics/models.go index 0362392400..19bdd92cbf 100644 --- a/pkg/analytics/models.go +++ b/pkg/analytics/models.go @@ -7,19 +7,10 @@ import ( "time" otellog "go.opentelemetry.io/otel/log" - - "github.com/bacalhau-project/bacalhau/pkg/models" ) type EventType string -const ( - // SubmitJobEventType is the event type for a job that has been submitted to an orchestrator. - SubmitJobEventType = "bacalhau.job_v1.submit" - // TerminalJobEventType is the event type for a job that has reached a terminal state. - TerminalJobEventType = "bacalhau.job_v1.terminal" -) - type Event struct { Type string Properties any @@ -56,171 +47,3 @@ func hashString(in string) string { hash.Write([]byte(in)) return hex.EncodeToString(hash.Sum(nil)) } - -type JobTerminalEvent struct { - ID string `json:"id"` - NameSet bool `json:"name_set"` - NamespaceHash string `json:"namespace_hash"` - Type string `json:"type"` - Count int `json:"count"` - LabelsCount int `json:"labels_count"` - MetaCount int `json:"meta_count"` - State string `json:"state"` - StateMessage string `json:"state_message"` - Version uint64 `json:"version"` - Revision uint64 `json:"revision"` - CreateTime int64 `json:"create_time"` - ModifyTime int64 `json:"modify_time"` - - TaskNameHash string `json:"task_name_hash"` - TaskEngineType string `json:"task_engine_type"` - TaskPublisherType string `json:"task_publisher_type"` - TaskEnvVarCount int `json:"task_env_var_count"` - TaskMetaCount int `json:"task_meta_count"` - TaskInputSourceTypes []string `json:"task_input_source_types"` - TaskResultPathCount int `json:"task_result_path_count"` - TaskCPUUnits float64 `json:"task_cpu_units"` - TaskMemoryBytes uint64 `json:"task_memory_bytes"` - TaskDiskBytes uint64 `json:"task_disk_bytes"` - TaskGPUCount uint64 `json:"task_gpu_count"` - TaskNetworkType string `json:"task_network_type"` - TaskDomainsCount int `json:"task_domains_count"` - TaskExecutionTimeout int64 `json:"task_execution_timeout"` - TaskQueueTimeout int64 `json:"task_queue_timeout"` - TaskTotalTimeout int64 `json:"task_total_timeout"` -} - -func NewJobTerminalEvent(j models.Job) *Event { - t := j.Task() - taskInputTypes := make([]string, len(t.InputSources)) - for i, s := range t.InputSources { - taskInputTypes[i] = s.Source.Type - } - // if we can't parse the resources use zero - taskResources, err := t.ResourcesConfig.ToResources() - if err != nil { - taskResources = &models.Resources{ - CPU: 0, - Memory: 0, - Disk: 0, - GPU: 0, - GPUs: nil, - } - } - terminalJobEvent := JobTerminalEvent{ - ID: j.ID, - NameSet: j.ID != j.Name, - NamespaceHash: hashString(j.Namespace), - Type: j.Type, - Count: j.Count, - LabelsCount: len(j.Labels), - MetaCount: len(j.Meta), - State: j.State.StateType.String(), - StateMessage: j.State.Message, - Version: j.Version, - Revision: j.Revision, - CreateTime: j.CreateTime, - ModifyTime: j.ModifyTime, - TaskNameHash: hashString(t.Name), - TaskEngineType: t.Engine.Type, - TaskPublisherType: t.Publisher.Type, - TaskEnvVarCount: len(t.Env), - TaskMetaCount: len(t.Meta), - TaskInputSourceTypes: taskInputTypes, - TaskResultPathCount: len(t.ResultPaths), - TaskCPUUnits: taskResources.CPU, - TaskMemoryBytes: taskResources.Memory, - TaskDiskBytes: taskResources.Disk, - TaskGPUCount: taskResources.GPU, - TaskNetworkType: t.Network.Type.String(), - TaskDomainsCount: len(t.Network.Domains), - TaskExecutionTimeout: t.Timeouts.ExecutionTimeout, - TaskQueueTimeout: t.Timeouts.QueueTimeout, - TaskTotalTimeout: t.Timeouts.TotalTimeout, - } - - return NewEvent(TerminalJobEventType, terminalJobEvent) -} - -type SubmitJobEvent struct { - ID string `json:"id"` - NameSet bool `json:"name_set"` - NamespaceHash string `json:"namespace_hash"` - Type string `json:"type"` - Count int `json:"count"` - LabelsCount int `json:"labels_count"` - MetaCount int `json:"meta_count"` - Version uint64 `json:"version"` - Revision uint64 `json:"revision"` - CreateTime int64 `json:"create_time"` - ModifyTime int64 `json:"modify_time"` - - TaskNameHash string `json:"task_name_hash"` - TaskEngineType string `json:"task_engine_type"` - TaskPublisherType string `json:"task_publisher_type"` - TaskEnvVarCount int `json:"task_env_var_count"` - TaskMetaCount int `json:"task_meta_count"` - TaskInputSourceTypes []string `json:"task_input_source_types"` - TaskResultPathCount int `json:"task_result_path_count"` - TaskCPUUnits float64 `json:"task_cpu_units"` - TaskMemoryBytes uint64 `json:"task_memory_bytes"` - TaskDiskBytes uint64 `json:"task_disk_bytes"` - TaskGPUCount uint64 `json:"task_gpu_count"` - TaskNetworkType string `json:"task_network_type"` - TaskDomainsCount int `json:"task_domains_count"` - TaskExecutionTimeout int64 `json:"task_execution_timeout"` - TaskQueueTimeout int64 `json:"task_queue_timeout"` - TaskTotalTimeout int64 `json:"task_total_timeout"` - - Warnings []string `json:"warnings"` - Error string `json:"error"` -} - -func NewSubmitJobEvent(j models.Job, warnings ...string) SubmitJobEvent { - t := j.Task() - taskInputTypes := make([]string, len(t.InputSources)) - for i, s := range t.InputSources { - taskInputTypes[i] = s.Source.Type - } - // if we can't parse the resources use zero - taskResources, err := t.ResourcesConfig.ToResources() - if err != nil { - taskResources = &models.Resources{ - CPU: 0, - Memory: 0, - Disk: 0, - GPU: 0, - GPUs: nil, - } - } - return SubmitJobEvent{ - ID: j.ID, - NameSet: j.ID != j.Name, - NamespaceHash: hashString(j.Namespace), - Type: j.Type, - Count: j.Count, - LabelsCount: len(j.Labels), - MetaCount: len(j.Meta), - Version: j.Version, - Revision: j.Revision, - CreateTime: j.CreateTime, - ModifyTime: j.ModifyTime, - TaskNameHash: hashString(t.Name), - TaskEngineType: t.Engine.Type, - TaskPublisherType: t.Publisher.Type, - TaskEnvVarCount: len(t.Env), - TaskMetaCount: len(t.Meta), - TaskInputSourceTypes: taskInputTypes, - TaskResultPathCount: len(t.ResultPaths), - TaskCPUUnits: taskResources.CPU, - TaskMemoryBytes: taskResources.Memory, - TaskDiskBytes: taskResources.Disk, - TaskGPUCount: taskResources.GPU, - TaskNetworkType: t.Network.Type.String(), - TaskDomainsCount: len(t.Network.Domains), - TaskExecutionTimeout: t.Timeouts.ExecutionTimeout, - TaskQueueTimeout: t.Timeouts.QueueTimeout, - TaskTotalTimeout: t.Timeouts.TotalTimeout, - Warnings: warnings, - } -} diff --git a/pkg/analytics/resources.go b/pkg/analytics/resources.go new file mode 100644 index 0000000000..e2fcfb1e87 --- /dev/null +++ b/pkg/analytics/resources.go @@ -0,0 +1,14 @@ +package analytics + +type Resource struct { + CPUUnits float64 `json:"cpu_units,omitempty"` + MemoryBytes uint64 `json:"memory_bytes,omitempty"` + DiskBytes uint64 `json:"disk_bytes,omitempty"` + GPUCount uint64 `json:"gpu_count,omitempty"` + GPUTypes []GPUInfo `json:"gpu_types,omitempty"` +} + +type GPUInfo struct { + Name string `json:"name,omitempty"` + Vendor string `json:"vendor,omitempty"` +} diff --git a/pkg/jobstore/boltdb/store.go b/pkg/jobstore/boltdb/store.go index c4205257f7..2a55903313 100644 --- a/pkg/jobstore/boltdb/store.go +++ b/pkg/jobstore/boltdb/store.go @@ -1160,6 +1160,7 @@ func (b *BoltJobStore) createExecution(tx *bolt.Tx, execution models.Execution) } } + analytics.EmitEvent(context.TODO(), analytics.NewCreatedExecutionEvent(execution)) return nil } @@ -1221,6 +1222,13 @@ func (b *BoltJobStore) updateExecution(tx *bolt.Tx, request jobstore.UpdateExecu } } + if newExecution.IsTerminalState() { + analytics.EmitEvent(context.TODO(), analytics.NewTerminalExecutionEvent(newExecution)) + } + if newExecution.IsDiscarded() { + analytics.EmitEvent(context.TODO(), analytics.NewComputeMessageExecutionEvent(newExecution.JobID, newExecution.ID, newExecution.ComputeState.Message)) + } + return nil } diff --git a/pkg/orchestrator/endpoint.go b/pkg/orchestrator/endpoint.go index 39707a4dc9..dfe1a5da90 100644 --- a/pkg/orchestrator/endpoint.go +++ b/pkg/orchestrator/endpoint.go @@ -74,7 +74,7 @@ func (e *BaseEndpoint) SubmitJob(ctx context.Context, request *SubmitJobRequest) submitEvent.Error = err.Error() return nil, err } - submitEvent.ID = job.ID + submitEvent.JobID = job.ID var translationEvent models.Event