Skip to content

Commit

Permalink
workflow examples: remove use of deprecated functions
Browse files Browse the repository at this point in the history
Signed-off-by: Fabian Martinez <[email protected]>
  • Loading branch information
famarting committed Oct 18, 2024
1 parent 9bc7d82 commit 92810f6
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 161 deletions.
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/marusama/semaphore/v2 v2.5.0 // indirect
github.com/microsoft/durabletask-go v0.5.0 // indirect
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d // indirect
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
go.opentelemetry.io/otel v1.27.0 // indirect
go.opentelemetry.io/otel/metric v1.27.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
github.com/microsoft/durabletask-go v0.5.0 h1:4DWBgg05wnkV/VwakaiPqZ4cARvATP74ZQJFcXVMC18=
github.com/microsoft/durabletask-go v0.5.0/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d h1:Phnx8/wPd9BM6RPIjlqNl8kAaFjtU+Sdw9CzmZd8Wsw=
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
Expand Down
2 changes: 1 addition & 1 deletion examples/service/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ client: ## Runs the uncompiled example client code
custom-grpc-client: ## Runs the uncompiled example custom grpc client code
dapr run --app-id custom-grpc-client \
-d ./config \
--dapr-http-max-request-size 41 \
--max-body-size 4Mi \
--log-level debug \
go run ./custom-grpc-client/main.go

Expand Down
2 changes: 1 addition & 1 deletion examples/service/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ expected_stdout_lines:
```bash
dapr run --app-id custom-grpc-client \
-d ./config \
--dapr-http-max-request-size 41 \
--max-body-size 4Mi \
--log-level debug \
go run ./custom-grpc-client/main.go
```
Expand Down
182 changes: 30 additions & 152 deletions examples/workflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,11 @@ import (
"log"
"time"

"github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/workflow"
)

var stage = 0

const (
workflowComponent = "dapr"
)

func main() {
w, err := workflow.NewWorker()
if err != nil {
Expand All @@ -54,70 +49,49 @@ func main() {
}
fmt.Println("runner started")

daprClient, err := client.NewClient()
wfClient, err := workflow.NewClient()
if err != nil {
log.Fatalf("failed to intialise client: %v", err)
}
defer daprClient.Close()
defer wfClient.Close()
ctx := context.Background()

// Start workflow test
respStart, err := daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
WorkflowName: "TestWorkflow",
Options: nil,
Input: 1,
SendRawInput: false,
})
instanceID, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
fmt.Printf("workflow started with id: %v\n", respStart.InstanceID)
fmt.Printf("workflow started with id: %v\n", instanceID)

// Pause workflow test
err = daprClient.PauseWorkflowBeta1(ctx, &client.PauseWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})

err = wfClient.SuspendWorkflow(ctx, instanceID, "")
if err != nil {
log.Fatalf("failed to pause workflow: %v", err)
}

respGet, err := daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
respFetch, err := wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
log.Fatalf("failed to fetch workflow: %v", err)
}

if respGet.RuntimeStatus != workflow.StatusSuspended.String() {
log.Fatalf("workflow not paused: %v", respGet.RuntimeStatus)
if respFetch.RuntimeStatus != workflow.StatusSuspended {
log.Fatalf("workflow not paused: %v", respFetch.RuntimeStatus)
}

fmt.Printf("workflow paused\n")

// Resume workflow test
err = daprClient.ResumeWorkflowBeta1(ctx, &client.ResumeWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})

err = wfClient.ResumeWorkflow(ctx, instanceID, "")
if err != nil {
log.Fatalf("failed to resume workflow: %v", err)
}

respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}

if respGet.RuntimeStatus != workflow.StatusRunning.String() {
if respFetch.RuntimeStatus != workflow.StatusRunning {
log.Fatalf("workflow not running")
}

Expand All @@ -127,14 +101,7 @@ func main() {

// Raise Event Test

err = daprClient.RaiseEventWorkflowBeta1(ctx, &client.RaiseEventWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
EventName: "testEvent",
EventData: "testData",
SendRawData: false,
})

err = wfClient.RaiseEvent(ctx, instanceID, "testEvent", workflow.WithEventPayload("testData"))
if err != nil {
fmt.Printf("failed to raise event: %v", err)
}
Expand All @@ -145,151 +112,62 @@ func main() {

fmt.Printf("stage: %d\n", stage)

respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}

fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus)
fmt.Printf("workflow status: %v\n", respFetch.RuntimeStatus)

// Purge workflow test
err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
err = wfClient.PurgeWorkflow(ctx, instanceID)
if err != nil {
log.Fatalf("failed to purge workflow: %v", err)
}

respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil && respGet != nil {
log.Fatal("failed to purge workflow")
respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err == nil || respFetch != nil {
log.Fatalf("failed to purge workflow: %v", err)
}

fmt.Println("workflow purged")

fmt.Printf("stage: %d\n", stage)

// Terminate workflow test
respStart, err = daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
WorkflowName: "TestWorkflow",
Options: nil,
Input: 1,
SendRawInput: false,
})
instanceID, err = wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}

fmt.Printf("workflow started with id: %s\n", respStart.InstanceID)
fmt.Printf("workflow started with id: %s\n", instanceID)

err = daprClient.TerminateWorkflowBeta1(ctx, &client.TerminateWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
err = wfClient.TerminateWorkflow(ctx, instanceID)
if err != nil {
log.Fatalf("failed to terminate workflow: %v", err)
}

respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
if respGet.RuntimeStatus != workflow.StatusTerminated.String() {
if respFetch.RuntimeStatus != workflow.StatusTerminated {
log.Fatal("failed to terminate workflow")
}

fmt.Println("workflow terminated")

err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})

respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err == nil || respGet != nil {
log.Fatalf("failed to purge workflow: %v", err)
}

fmt.Println("workflow purged")

// WFClient
// TODO: Expand client validation

stage = 0
fmt.Println("workflow client test")

wfClient, err := workflow.NewClient()
err = wfClient.PurgeWorkflow(ctx, instanceID)
if err != nil {
log.Fatalf("[wfclient] faield to initialize: %v", err)
}

id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
if err != nil {
log.Fatalf("[wfclient] failed to start workflow: %v", err)
}

fmt.Printf("[wfclient] started workflow with id: %s\n", id)

metadata, err := wfClient.FetchWorkflowMetadata(ctx, id)
if err != nil {
log.Fatalf("[wfclient] failed to get worfklow: %v", err)
}

fmt.Printf("[wfclient] workflow status: %v\n", metadata.RuntimeStatus.String())

if stage != 1 {
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 1 expected, current: %d", stage)
}

fmt.Printf("[wfclient] stage: %d\n", stage)

// TODO: WaitForWorkflowStart
// TODO: WaitForWorkflowCompletion

// raise event

if err := wfClient.RaiseEvent(ctx, id, "testEvent", workflow.WithEventPayload("testData")); err != nil {
log.Fatalf("[wfclient] failed to raise event: %v", err)
}

fmt.Println("[wfclient] event raised")

// Sleep to allow the workflow to advance
time.Sleep(time.Second)

if stage != 2 {
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 2 expected, current: %d", stage)
}

fmt.Printf("[wfclient] stage: %d\n", stage)

// stop workflow
if err := wfClient.TerminateWorkflow(ctx, id); err != nil {
log.Fatalf("[wfclient] failed to terminate workflow: %v", err)
log.Fatalf("failed to purge workflow: %v", err)
}

fmt.Println("[wfclient] workflow terminated")

if err := wfClient.PurgeWorkflow(ctx, id); err != nil {
log.Fatalf("[wfclient] failed to purge workflow: %v", err)
respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err == nil || respFetch != nil {
log.Fatalf("failed to purge workflow: %v", err)
}

fmt.Println("[wfclient] workflow purged")
fmt.Println("workflow purged")

// stop workflow runtime
if err := w.Shutdown(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/go-chi/chi/v5 v5.1.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.6.0
github.com/microsoft/durabletask-go v0.5.0
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d
github.com/stretchr/testify v1.9.0
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
github.com/microsoft/durabletask-go v0.5.0 h1:4DWBgg05wnkV/VwakaiPqZ4cARvATP74ZQJFcXVMC18=
github.com/microsoft/durabletask-go v0.5.0/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d h1:Phnx8/wPd9BM6RPIjlqNl8kAaFjtU+Sdw9CzmZd8Wsw=
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
Expand Down
10 changes: 9 additions & 1 deletion workflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
"github.com/microsoft/durabletask-go/api"
"github.com/microsoft/durabletask-go/backend"
durabletaskclient "github.com/microsoft/durabletask-go/client"
"google.golang.org/grpc"

dapr "github.com/dapr/go-sdk/client"
)

type Client struct {
conn *grpc.ClientConn
taskHubClient *durabletaskclient.TaskHubGrpcClient
}

Expand Down Expand Up @@ -113,9 +115,11 @@ func NewClient(opts ...clientOption) (*Client, error) {
return &Client{}, fmt.Errorf("failed to initialise dapr.Client: %v", err)
}

taskHubClient := durabletaskclient.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
conn := daprClient.GrpcClientConn()
taskHubClient := durabletaskclient.NewTaskHubGrpcClient(conn, backend.DefaultLogger())

Check warning on line 119 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L118-L119

Added lines #L118 - L119 were not covered by tests

return &Client{
conn: conn,

Check warning on line 122 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L122

Added line #L122 was not covered by tests
taskHubClient: taskHubClient,
}, nil
}
Expand Down Expand Up @@ -211,3 +215,7 @@ func (c *Client) PurgeWorkflow(ctx context.Context, id string) error {
}
return c.taskHubClient.PurgeOrchestrationState(ctx, api.InstanceID(id))
}

func (c *Client) Close() {
_ = c.conn.Close()

Check warning on line 220 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L219-L220

Added lines #L219 - L220 were not covered by tests
}

0 comments on commit 92810f6

Please sign in to comment.