Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOL-121561: Added provision and deprovision samples to demonstrate how to use the feature in the Go PubSub+ API #13

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module SolaceSamples.com/PubSub+Go

go 1.17

require solace.dev/go/messaging v1.6.1
require solace.dev/go/messaging v1.7.0

require solace.dev/go/messaging-trace/opentelemetry v1.0.0

Expand Down
152 changes: 152 additions & 0 deletions patterns/provisioner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package main

import (
"fmt"
"os"
"os/signal"
"time"

"solace.dev/go/messaging"
"solace.dev/go/messaging/pkg/solace"
"solace.dev/go/messaging/pkg/solace/config"
)

func getEnv(key, def string) string {
if val, ok := os.LookupEnv(key); ok {
return val
}
return def
}

// Code examples of how to use the Endpoint Provisioner to Provision
// queues on a Solace broker and Deprovision queues from a Solace broker.
//
// Possible Provision errors include:
// [x] Already Exists - when a queue with the same properties already exists on the broker
// [x] Endpoint Property Mismatch - when a queue with the same name but different provision properties already exists on the broker
//
// Possible Deprovision errors include:
// [x] Unknown Queue - when a queue with the provided queue name does not exists on the broker
func main() {
// logging.SetLogLevel(logging.LogLevelInfo)

// Configuration parameters
brokerConfig := config.ServicePropertyMap{
config.TransportLayerPropertyHost: getEnv("SOLACE_HOST", "tcp://localhost:55555,tcp://localhost:55554"),
config.ServicePropertyVPNName: getEnv("SOLACE_VPN", "default"),
config.AuthenticationPropertySchemeBasicPassword: getEnv("SOLACE_PASSWORD", "default"),
config.AuthenticationPropertySchemeBasicUserName: getEnv("SOLACE_USERNAME", "default"),
}

messagingService, err := messaging.NewMessagingServiceBuilder().
FromConfigurationProvider(brokerConfig).
WithProvisionTimeoutMs(10 * time.Millisecond). // set a provision timeout on the session
Build()

if err != nil {
panic(err)
}

// Connect to the messaging serice
if err := messagingService.Connect(); err != nil {
panic(err)
}

fmt.Println("Connected to the broker? ", messagingService.IsConnected())

/////////////////////////////////////////
// PROVISIONING EXAMPLES
/////////////////////////////////////////

//// Start Provision with blocking Example
queueName := "ProvisionedQueueName"
var outcome solace.ProvisionOutcome = messagingService.EndpointProvisioner().
WithDurability(true). // provision a durable queue (this is Default to True irresspective of whether this setter is called)
WithMaxMessageRedelivery(10). // number of times queue messages will be redelivered before moving to the DMQ
WithDiscardNotification(true). // will notify senders about message discards
WithTTLPolicy(true). // respect message TTL on queue
WithQuotaMB(100). // set the queue message quota (in MB)
WithMaxMessageSize(1000000). // set Max message size (in Bytes)
WithExclusiveAccess(true). // provision an Exclusive queue
WithPermission(config.EndpointPermissionDelete). // with the delete permission
Provision(queueName, false)

fmt.Println("\nEndpoint Provision on the broker [Status]: ", outcome.GetStatus())
fmt.Println("Endpoint Provision on the broker [Error]: ", outcome.GetError(), "\n")
//// End Provision with blocking Example

// //// Start ProvisionAsync Example
// queueName := "ProvisionedQueueName"
// outcomeChannel := messagingService.EndpointProvisioner().
// WithDurability(true). // provision a durable queue (this is Default to True irresspective of whether this setter is called)
// WithMaxMessageRedelivery(10). // number of times queue messages will be redelivered before moving to the DMQ
// WithDiscardNotification(true). // will notify senders about message discards
// WithTTLPolicy(true). // respect message TTL on queue
// WithQuotaMB(100). // set the queue message quota (in MB)
// WithMaxMessageSize(1000000). // set Max message size (in Bytes)
// WithExclusiveAccess(true). // provision an Exclusive queue
// WithPermission(config.EndpointPermissionDelete). // with the delete permission
// ProvisionAsync(queueName, false)

// outcome := <-outcomeChannel
// fmt.Println("\nEndpoint Provision Aysnc on the broker [Status]: ", outcome.GetStatus())
// fmt.Println("Endpoint Provision Aysnc on the broker [Error]: ", outcome.GetError(), "\n")
// //// End ProvisionAsync Example

// //// Start ProvisionAsync with callback Example
// queueName := "ProvisionedQueueName"
// provisionCallbackHandler := func(outcome solace.ProvisionOutcome) {
// fmt.Println("\nEndpoint Provision Aysnc With Callback on the broker [Status]: ", outcome.GetStatus())
// fmt.Println("Endpoint Provision Aysnc With Callback on the broker [Error]: ", outcome.GetError(), "\n")
// }
// messagingService.EndpointProvisioner().
// WithDurability(true). // provision a durable queue (this is Default to True irresspective of whether this setter is called)
// WithMaxMessageRedelivery(10). // number of times queue messages will be redelivered before moving to the DMQ
// WithDiscardNotification(true). // will notify senders about message discards
// WithTTLPolicy(true). // respect message TTL on queue
// WithQuotaMB(100). // set the queue message quota (in MB)
// WithMaxMessageSize(1000000). // set Max message size (in Bytes)
// WithExclusiveAccess(true). // provision an Exclusive queue
// WithPermission(config.EndpointPermissionDelete). // with the delete permission
// ProvisionAsyncWithCallback(queueName, false, provisionCallbackHandler)
// //// End ProvisionAsync with callback Example

/////////////////////////////////////////
// DEPROVISIONING EXAMPLES
/////////////////////////////////////////

// //// Start Deprovision with blocking Example
// queueName := "ProvisionedQueueName"
// var deprovError error = messagingService.EndpointProvisioner().Deprovision(queueName, false)
// fmt.Println("\nEndpoint Deprovision on the broker [Error]: ", deprovError)
// //// End Deprovision with blocking

// //// Start DeprovisionAsync
// queueName := "ProvisionedQueueName"
// errorChannel := messagingService.EndpointProvisioner().DeprovisionAsync(queueName, false)
// deprovError := <-errorChannel
// fmt.Println("\nEndpoint Deprovisioner Aysnc on the broker [Error]: ", deprovError)
// //// End DeprovisionAsync Example

// //// Start DeprovisionAsync with callback Example
// queueName := "ProvisionedQueueName"
// deprovisionCallbackHandler := func(deprovError error) {
// fmt.Println("\nEndpoint Deprovisioner Aysnc With Callback on the broker [Error]: ", deprovError)
// }
// messagingService.EndpointProvisioner().DeprovisionAsyncWithCallback(queueName, false, deprovisionCallbackHandler)
// //// End DeprovisionAsync with callback Example

fmt.Println("\n===Interrupt (CTR+C) to handle graceful terminaltion of the messaiging service===")

// Run forever until an interrupt signal is received
// Handle interrupts
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)

// Block until a signal is received.
<-c

// Disconnect the Message Service
messagingService.Disconnect()
fmt.Println("Messaging Service Disconnected? ", !messagingService.IsConnected())
}
4 changes: 3 additions & 1 deletion patterns/request-reply/direct_replier_blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ func main() {

// Run forever until an interrupt signal is received
for requestReplyReceiver.IsRunning() {
// have receiver push request messages to request message handler
// The ReceiveMessage() function waits until the specified timeout to receive a message or waits
// forever if timeout value is negative. If a timeout occurs, a solace.TimeoutError is returned.
// Reference: https://pkg.go.dev/solace.dev/go/[email protected]/pkg/solace#RequestReplyMessageReceiver
message, replier, regErr := requestReplyReceiver.ReceiveMessage(-1)
if regErr != nil {
panic(regErr)
Expand Down
3 changes: 3 additions & 0 deletions patterns/request-reply/direct_requestor_blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ func main() {
// Publish to the given topic
// Block until reply message is received
replyTimeout := 5 * time.Second
// The PublishAwaitResponse() function waits until the specified replyTimeout to receive a published message's reply or waits
// indefinitely if replyTimeout value is negative.
// Reference: https://pkg.go.dev/solace.dev/go/[email protected]/pkg/solace#RequestReplyMessagePublisher
messageReply, publishErr := requestReplyPublisher.PublishAwaitResponse(message, topic, replyTimeout, config.MessagePropertyMap{
config.MessagePropertyCorrelationID: fmt.Sprint(msgSeqNum),
})
Expand Down