Skip to content

Commit

Permalink
add support for non connstring auth & workerpool size config
Browse files Browse the repository at this point in the history
  • Loading branch information
gordallott committed Feb 5, 2024
1 parent 4102f33 commit 7587237
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 18 deletions.
54 changes: 39 additions & 15 deletions cmd/export/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"os/signal"
"strings"
"syscall"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
Expand All @@ -29,24 +30,36 @@ var Cmd = &cobra.Command{
Run: export,
}

func auth(ctx context.Context, connectionString string, opts *azidentity.DefaultAzureCredentialOptions) (*azblob.Client, error) {
func authConnectionString(ctx context.Context, connectionString string) (*azblob.Client, error) {
return azblob.NewClientFromConnectionString(connectionString, nil)
}

func authDefualt(ctx context.Context, serviceURL string) (*azblob.Client, error) {
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
return nil, fmt.Errorf("error getting default azure credentials: %w", err)
}

return azblob.NewClient(serviceURL, cred, nil)
}

var (
storageURL string
connectionString string
axiomPersonalAPIKey string
axiomPersonalOrg string

axiomURL string

workerPoolSize int
)

func init() {
// TODO: we shouldn't be using personal tokens, API token work will allow us to use api tokens in the future
viper.AutomaticEnv()

flags := Cmd.Flags()
flags.IntVar(&workerPoolSize, "worker-pool-size", 8, "the size of the worker pool used to transfer blobs to axiom (more workers == more blobs sent concurrently)")
flags.StringVar(&axiomPersonalAPIKey, "axiom-personal-token", "", "your full axiom personal API key (or env AXIOM_PERSONAL_TOKEN)")
if err := viper.BindPFlag("AXIOM_PERSONAL_TOKEN", flags.Lookup("axiom-personal-token")); err != nil {
panic(err)
Expand All @@ -66,32 +79,41 @@ func init() {
}
}

func export(cmd *cobra.Command, args []string) {
func ensureValid(ctx context.Context) (*azblob.Client, error) {
axiomPersonalAPIKey = viper.Get("AXIOM_PERSONAL_TOKEN").(string)
if axiomPersonalAPIKey == "" {
fmt.Fprintf(cmd.ErrOrStderr(), "axiom personal token is required")
return
return nil, fmt.Errorf("axiom personal token is required")
}

storageURL = viper.Get("STORAGE_URL").(string)
if storageURL == "" {
return nil, fmt.Errorf("storage url is required")
}

connectionString = viper.Get("CONNECTION_STRING").(string)
if connectionString == "" {
fmt.Fprintf(cmd.ErrOrStderr(), "connection string is required")
return
if strings.TrimSpace(connectionString) != "" {
azclient, err := authConnectionString(ctx, connectionString)
if err != nil {
return nil, fmt.Errorf("can not auth with azure via connection-string: %w", err)
}
return azclient, nil
}

storageURL = viper.Get("STORAGE_URL").(string)
if storageURL == "" {
fmt.Fprintf(cmd.ErrOrStderr(), "storage url is required")
return
azclient, err := authDefualt(ctx, storageURL)
if err != nil {
return nil, fmt.Errorf("can not auth with azure via default credentials: %w", err)
}
return azclient, nil
}

cmd.Println("exporting from azure to axiom")
func export(cmd *cobra.Command, args []string) {
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()

azclient, err := auth(ctx, connectionString, nil)
azclient, err := ensureValid(ctx)
if err != nil {
fmt.Fprintf(cmd.ErrOrStderr(), "can not auth with azure: %s\n", err)
fmt.Fprintf(cmd.ErrOrStderr(), "error validating: %s", err)
return
}

axiclient, err := axiom.NewClient(
Expand All @@ -103,7 +125,9 @@ func export(cmd *cobra.Command, args []string) {
fmt.Fprintf(cmd.ErrOrStderr(), "can not create axiom client: %s\n", err)
}

poller := poll.NewPoller()
cmd.Println("exporting from azure to axiom")

poller := poll.NewPoller(workerPoolSize)
if err := poller.Start(ctx, azclient, axiclient, monitor.NewStorageAccountMonitor(storageURL)); err != nil {
fmt.Fprintf(cmd.ErrOrStderr(), "can not start poller: %s\n", err)
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/poll/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ var logger = log.New(os.Stdout, "poll: ", log.LstdFlags)
// queuing up all the blobs at once

type Poll struct {
wpsize int

cancel context.CancelFunc
stopped <-chan struct{}
}

func NewPoller() *Poll {
return &Poll{}
func NewPoller(workerPoolSize int) *Poll {
return &Poll{
wpsize: workerPoolSize,
}
}

func (p *Poll) Start(ctx context.Context,
Expand Down Expand Up @@ -99,7 +103,7 @@ func (p *Poll) loop(ctx context.Context,
containers[i], containers[j] = containers[j], containers[i]
})

wp := pond.New(8, 16)
wp := pond.New(p.wpsize, p.wpsize*2)
for _, container := range containers {
streamContainer(ctx, wp, azClient, axClient, container)
}
Expand Down

0 comments on commit 7587237

Please sign in to comment.