Skip to content

Commit

Permalink
feat: add execution analytics (#4532)
Browse files Browse the repository at this point in the history
- also ensure JobID is called job_id in all events for easier sql

---------

Co-authored-by: frrist <[email protected]>
  • Loading branch information
frrist and frrist authored Sep 28, 2024
1 parent 7e8967a commit 9a64576
Show file tree
Hide file tree
Showing 8 changed files with 374 additions and 178 deletions.
17 changes: 17 additions & 0 deletions pkg/analytics/execution_compute_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package analytics

const ComputeMessageExecutionEventType = "bacalhau.execution_v1.compute_message"

type ExecutionComputeMessage struct {
JobID string `json:"job_id,omitempty"`
ExecutionID string `json:"execution_id,omitempty"`
ComputeMessage string `json:"compute_message,omitempty"`
}

func NewComputeMessageExecutionEvent(jobID string, executionID string, computeMessage string) *Event {
return NewEvent(ComputeMessageExecutionEventType, ExecutionComputeMessage{
JobID: jobID,
ExecutionID: executionID,
ComputeMessage: computeMessage,
})
}
117 changes: 117 additions & 0 deletions pkg/analytics/executions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package analytics

import (
"time"

"github.com/bacalhau-project/bacalhau/pkg/models"
)

const TerminalExecutionEventType = "bacalhau.execution_v1.terminal"
const CreatedExecutionEventType = "bacalhau.execution_v1.create"

type ExecutionEvent struct {
JobID string `json:"job_id,omitempty"`
ExecutionID string `json:"execution_id,omitempty"`
EvalID string `json:"evaluation_id,omitempty"`

NameSet bool `json:"name_set,omitempty"`
NodeNameHash string `json:"node_name_hash,omitempty"`
NamespaceHash string `json:"namespace_hash,omitempty"`

Resources map[string]Resource `json:"resources,omitempty"`

DesiredState string `json:"desired_state,omitempty"`
ComputeState string `json:"compute_state,omitempty"`

PublishedResultType string `json:"publisher_type,omitempty"`

RunResultStdoutTruncated bool `json:"run_result_stdout_truncated,omitempty"`
RunResultStderrTruncated bool `json:"run_result_stderr_truncated,omitempty"`
RunResultExitCode int `json:"run_result_exit_code,omitempty"`

PreviousExecution string `json:"previous_execution,omitempty"`
NextExecution string `json:"next_execution,omitempty"`
FollowupEvalID string `json:"followup_eval_id,omitempty"`

Revision uint64 `json:"revision,omitempty"`
CreateTime time.Time `json:"create_time,omitempty"`
ModifyTime time.Time `json:"modify_time,omitempty"`
}

func NewCreatedExecutionEvent(e models.Execution) *Event {
return NewEvent(CreatedExecutionEventType, newExecutionEvent(e))
}

func NewTerminalExecutionEvent(e models.Execution) *Event {
return NewEvent(TerminalExecutionEventType, newExecutionEvent(e))
}

func newExecutionEvent(e models.Execution) ExecutionEvent {
resources := make(map[string]Resource, len(e.AllocatedResources.Tasks))
for taskName, taskResources := range e.AllocatedResources.Tasks {
gpuTypes := make([]GPUInfo, len(taskResources.GPUs))
for i, gpu := range taskResources.GPUs {
gpuTypes[i] = GPUInfo{
Name: gpu.Name,
Vendor: string(gpu.Vendor),
}
}
// we hash the taskName here for privacy
resources[hashString(taskName)] = Resource{
CPUUnits: taskResources.CPU,
MemoryBytes: taskResources.Memory,
DiskBytes: taskResources.Disk,
GPUCount: taskResources.GPU,
GPUTypes: gpuTypes,
}
}

var (
stdoutTruncated bool
stderrTruncated bool
exitCode int
)
if e.RunOutput != nil {
stdoutTruncated = e.RunOutput.StdoutTruncated
stderrTruncated = e.RunOutput.StderrTruncated
exitCode = e.RunOutput.ExitCode
}

return ExecutionEvent{
// ID fields.
JobID: e.JobID,
ExecutionID: e.ID,
EvalID: e.EvalID,

// name fields.
NameSet: e.Name == "",
NodeNameHash: hashString(e.NodeID),
NamespaceHash: hashString(e.Namespace),

// resources of tasks in execution.
// NB: currently this isn't populated when creating executions.
Resources: resources,

// states.
DesiredState: e.DesiredState.StateType.String(),
ComputeState: e.ComputeState.StateType.String(),

// publisher if any.
PublishedResultType: e.PublishedResult.Type,

// run results if any.
RunResultStdoutTruncated: stdoutTruncated,
RunResultStderrTruncated: stderrTruncated,
RunResultExitCode: exitCode,

// IDs of related models.
PreviousExecution: e.PreviousExecution,
NextExecution: e.NextExecution,
FollowupEvalID: e.FollowupEvalID,

// versioning and time.
Revision: e.Revision,
CreateTime: time.Unix(0, e.CreateTime).UTC(),
ModifyTime: time.Unix(0, e.ModifyTime).UTC(),
}
}
108 changes: 108 additions & 0 deletions pkg/analytics/job_submit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package analytics

import (
"time"

"github.com/bacalhau-project/bacalhau/pkg/models"
)

// SubmitJobEventType is the event type for a job that has been submitted to an orchestrator.
const SubmitJobEventType = "bacalhau.job_v1.submit"

type SubmitJobEvent struct {
JobID string `json:"job_id"`

NameSet bool `json:"name_set"`
NamespaceHash string `json:"namespace_hash"`

Type string `json:"type"`
Count int `json:"count"`
LabelsCount int `json:"labels_count"`
MetaCount int `json:"meta_count"`

Version uint64 `json:"version"`
Revision uint64 `json:"revision"`
CreateTime time.Time `json:"create_time"`
ModifyTime time.Time `json:"modify_time"`

TaskNameHash string `json:"task_name_hash"`
TaskEngineType string `json:"task_engine_type"`
TaskPublisherType string `json:"task_publisher_type"`
TaskEnvVarCount int `json:"task_env_var_count"`
TaskMetaCount int `json:"task_meta_count"`
TaskInputSourceTypes []string `json:"task_input_source_types"`
TaskResultPathCount int `json:"task_result_path_count"`

Resources Resource `json:"resources,omitempty"`

TaskNetworkType string `json:"task_network_type"`
TaskDomainsCount int `json:"task_domains_count"`
TaskExecutionTimeout int64 `json:"task_execution_timeout"`
TaskQueueTimeout int64 `json:"task_queue_timeout"`
TaskTotalTimeout int64 `json:"task_total_timeout"`

Warnings []string `json:"warnings"`
Error string `json:"error"`
}

func NewSubmitJobEvent(j models.Job, warnings ...string) SubmitJobEvent {
t := j.Task()
taskInputTypes := make([]string, len(t.InputSources))
for i, s := range t.InputSources {
taskInputTypes[i] = s.Source.Type
}
// if we can't parse the resources use zero
var resource Resource
taskResources, err := t.ResourcesConfig.ToResources()
if err != nil {
resource = Resource{
CPUUnits: 0,
MemoryBytes: 0,
DiskBytes: 0,
GPUCount: 0,
GPUTypes: nil,
}
} else {
gpuTypes := make([]GPUInfo, len(taskResources.GPUs))
for i, gpu := range taskResources.GPUs {
gpuTypes[i] = GPUInfo{
Name: gpu.Name,
Vendor: string(gpu.Vendor),
}
}
resource = Resource{
CPUUnits: taskResources.CPU,
MemoryBytes: taskResources.Memory,
DiskBytes: taskResources.Disk,
GPUCount: taskResources.GPU,
GPUTypes: gpuTypes,
}
}
return SubmitJobEvent{
JobID: j.ID,
NameSet: j.ID != j.Name,
NamespaceHash: hashString(j.Namespace),
Type: j.Type,
Count: j.Count,
LabelsCount: len(j.Labels),
MetaCount: len(j.Meta),
Version: j.Version,
Revision: j.Revision,
CreateTime: time.Unix(0, j.CreateTime).UTC(),
ModifyTime: time.Unix(0, j.ModifyTime).UTC(),
TaskNameHash: hashString(t.Name),
TaskEngineType: t.Engine.Type,
TaskPublisherType: t.Publisher.Type,
TaskEnvVarCount: len(t.Env),
TaskMetaCount: len(t.Meta),
TaskInputSourceTypes: taskInputTypes,
TaskResultPathCount: len(t.ResultPaths),
Resources: resource,
TaskNetworkType: t.Network.Type.String(),
TaskDomainsCount: len(t.Network.Domains),
TaskExecutionTimeout: t.Timeouts.ExecutionTimeout,
TaskQueueTimeout: t.Timeouts.QueueTimeout,
TaskTotalTimeout: t.Timeouts.TotalTimeout,
Warnings: warnings,
}
}
109 changes: 109 additions & 0 deletions pkg/analytics/job_terminal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package analytics

import (
"time"

"github.com/bacalhau-project/bacalhau/pkg/models"
)

// TerminalJobEventType is the event type for a job that has reached a terminal state.
const TerminalJobEventType = "bacalhau.job_v1.terminal"

type JobTerminalEvent struct {
JobID string `json:"job_id"`

NameSet bool `json:"name_set"`
NamespaceHash string `json:"namespace_hash"`

Type string `json:"type"`
Count int `json:"count"`
LabelsCount int `json:"labels_count"`
MetaCount int `json:"meta_count"`

State string `json:"state"`

Version uint64 `json:"version"`
Revision uint64 `json:"revision"`
CreateTime time.Time `json:"create_time"`
ModifyTime time.Time `json:"modify_time"`

TaskNameHash string `json:"task_name_hash"`
TaskEngineType string `json:"task_engine_type"`
TaskPublisherType string `json:"task_publisher_type"`
TaskEnvVarCount int `json:"task_env_var_count"`
TaskMetaCount int `json:"task_meta_count"`
TaskInputSourceTypes []string `json:"task_input_source_types"`
TaskResultPathCount int `json:"task_result_path_count"`

Resources Resource `json:"resources,omitempty"`

TaskNetworkType string `json:"task_network_type"`
TaskDomainsCount int `json:"task_domains_count"`
TaskExecutionTimeout int64 `json:"task_execution_timeout"`
TaskQueueTimeout int64 `json:"task_queue_timeout"`
TaskTotalTimeout int64 `json:"task_total_timeout"`
}

func NewJobTerminalEvent(j models.Job) *Event {
t := j.Task()
taskInputTypes := make([]string, len(t.InputSources))
for i, s := range t.InputSources {
taskInputTypes[i] = s.Source.Type
}
// if we can't parse the resources use zero
var resource Resource
taskResources, err := t.ResourcesConfig.ToResources()
if err != nil {
resource = Resource{
CPUUnits: 0,
MemoryBytes: 0,
DiskBytes: 0,
GPUCount: 0,
GPUTypes: nil,
}
} else {
gpuTypes := make([]GPUInfo, len(taskResources.GPUs))
for i, gpu := range taskResources.GPUs {
gpuTypes[i] = GPUInfo{
Name: gpu.Name,
Vendor: string(gpu.Vendor),
}
}
resource = Resource{
CPUUnits: taskResources.CPU,
MemoryBytes: taskResources.Memory,
DiskBytes: taskResources.Disk,
GPUCount: taskResources.GPU,
GPUTypes: gpuTypes,
}
}
terminalJobEvent := JobTerminalEvent{
JobID: j.ID,
NameSet: j.ID != j.Name,
NamespaceHash: hashString(j.Namespace),
Type: j.Type,
Count: j.Count,
LabelsCount: len(j.Labels),
MetaCount: len(j.Meta),
State: j.State.StateType.String(),
Version: j.Version,
Revision: j.Revision,
CreateTime: time.Unix(0, j.CreateTime).UTC(),
ModifyTime: time.Unix(0, j.ModifyTime).UTC(),
TaskNameHash: hashString(t.Name),
TaskEngineType: t.Engine.Type,
TaskPublisherType: t.Publisher.Type,
TaskEnvVarCount: len(t.Env),
TaskMetaCount: len(t.Meta),
TaskInputSourceTypes: taskInputTypes,
TaskResultPathCount: len(t.ResultPaths),
Resources: resource,
TaskNetworkType: t.Network.Type.String(),
TaskDomainsCount: len(t.Network.Domains),
TaskExecutionTimeout: t.Timeouts.ExecutionTimeout,
TaskQueueTimeout: t.Timeouts.QueueTimeout,
TaskTotalTimeout: t.Timeouts.TotalTimeout,
}

return NewEvent(TerminalJobEventType, terminalJobEvent)
}
Loading

0 comments on commit 9a64576

Please sign in to comment.