diff --git a/go.mod b/go.mod index fba71e52e1..e89a8b6131 100644 --- a/go.mod +++ b/go.mod @@ -71,7 +71,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.19.0 github.com/stretchr/testify v1.9.0 - github.com/superfly/fly-go v0.1.26 + github.com/superfly/fly-go v0.1.28-0.20240826092218-a776f16963c3 github.com/superfly/graphql v0.2.4 github.com/superfly/lfsc-go v0.1.1 github.com/superfly/macaroon v0.2.14-0.20240702184853-b8ac52a1fc77 diff --git a/go.sum b/go.sum index 1401065ea8..a2d8cee895 100644 --- a/go.sum +++ b/go.sum @@ -625,8 +625,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= -github.com/superfly/fly-go v0.1.26 h1:X0d6aeNNCQAyHgOigbIqTwloBRXJEW5pRFjwT53GY4g= -github.com/superfly/fly-go v0.1.26/go.mod h1:JQke/BwoZqrWurqYkypSlcSo7bIUgCI3eVnqMC6AUj0= +github.com/superfly/fly-go v0.1.28-0.20240826092218-a776f16963c3 h1:/Ynr9pIsvjhvzSGGc0UKHj1+ZNtzFmhgCE78zwVKu7Y= +github.com/superfly/fly-go v0.1.28-0.20240826092218-a776f16963c3/go.mod h1:JQke/BwoZqrWurqYkypSlcSo7bIUgCI3eVnqMC6AUj0= github.com/superfly/graphql v0.2.4 h1:Av8hSk4x8WvKJ6MTnEwrLknSVSGPc7DWpgT3z/kt3PU= github.com/superfly/graphql v0.2.4/go.mod h1:CVfDl31srm8HnJ9udwLu6hFNUW/P6GUM2dKcG1YQ8jc= github.com/superfly/lfsc-go v0.1.1 h1:dGjLgt81D09cG+aR9lJZIdmonjZSR5zYCi7s54+ZU2Q= diff --git a/internal/command/deploy/deploy.go b/internal/command/deploy/deploy.go index 21c58dfea2..6e9434a64c 100644 --- a/internal/command/deploy/deploy.go +++ b/internal/command/deploy/deploy.go @@ -226,6 +226,17 @@ func New() *Command { Description: "Path to a deploy manifest file to use for deployment.", Hidden: true, }, + flag.Bool{ + Name: "delegate", + Description: "Delegate deployment to a remote deployer", + Hidden: true, + }, + flag.Bool{ + Name: "watch", + Description: "Watch the delegated remote deployment instead of exiting immediately", + Default: false, + Hidden: true, + }, ) return cmd @@ -612,6 +623,14 @@ func deployToMachines( return nil } + if flag.GetBool(ctx, "delegate") { + // create a manifest and deploy it to a remote deployer + fmt.Fprintln(io.Out, "Generating deploy manifest...") + manifest := NewManifest(app.Name, cfg, args) + + return deployRemotely(ctx, manifest) + } + md, err := NewMachineDeployment(ctx, args) if err != nil { sentry.CaptureExceptionWithAppInfo(ctx, err, "deploy", app) diff --git a/internal/command/deploy/machines.go b/internal/command/deploy/machines.go index ced6149ed4..01c57024af 100644 --- a/internal/command/deploy/machines.go +++ b/internal/command/deploy/machines.go @@ -75,39 +75,41 @@ type MachineDeploymentArgs struct { RestartMaxRetries int DeployRetries int BuildID string + FromManifestID *string } func argsFromManifest(manifest *DeployManifest, app *fly.AppCompact) MachineDeploymentArgs { return MachineDeploymentArgs{ AppCompact: app, - DeploymentImage: manifest.DeploymentImage, - Strategy: manifest.Strategy, - EnvFromFlags: manifest.EnvFromFlags, - PrimaryRegionFlag: manifest.PrimaryRegionFlag, - SkipSmokeChecks: manifest.SkipSmokeChecks, - SkipHealthChecks: manifest.SkipHealthChecks, - SkipDNSChecks: manifest.SkipDNSChecks, - SkipReleaseCommand: manifest.SkipReleaseCommand, - MaxUnavailable: manifest.MaxUnavailable, - RestartOnly: manifest.RestartOnly, - WaitTimeout: manifest.WaitTimeout, - StopSignal: manifest.StopSignal, - LeaseTimeout: manifest.LeaseTimeout, - ReleaseCmdTimeout: manifest.ReleaseCmdTimeout, - Guest: manifest.Guest, - IncreasedAvailability: manifest.IncreasedAvailability, - UpdateOnly: manifest.UpdateOnly, - Files: manifest.Files, - ExcludeRegions: manifest.ExcludeRegions, - OnlyRegions: manifest.OnlyRegions, - ExcludeMachines: manifest.ExcludeMachines, - OnlyMachines: manifest.OnlyMachines, - ProcessGroups: manifest.ProcessGroups, - MaxConcurrent: manifest.MaxConcurrent, - VolumeInitialSize: manifest.VolumeInitialSize, - RestartPolicy: manifest.RestartPolicy, - RestartMaxRetries: manifest.RestartMaxRetries, - DeployRetries: manifest.DeployRetries, + DeploymentImage: manifest.Args.DeploymentImage, + Strategy: manifest.Args.Strategy, + EnvFromFlags: manifest.Args.EnvFromFlags, + PrimaryRegionFlag: manifest.Args.PrimaryRegionFlag, + SkipSmokeChecks: manifest.Args.SkipSmokeChecks, + SkipHealthChecks: manifest.Args.SkipHealthChecks, + SkipDNSChecks: manifest.Args.SkipDNSChecks, + SkipReleaseCommand: manifest.Args.SkipReleaseCommand, + MaxUnavailable: manifest.Args.MaxUnavailable, + RestartOnly: manifest.Args.RestartOnly, + WaitTimeout: manifest.Args.WaitTimeout, + StopSignal: manifest.Args.StopSignal, + LeaseTimeout: manifest.Args.LeaseTimeout, + ReleaseCmdTimeout: manifest.Args.ReleaseCmdTimeout, + Guest: manifest.Args.Guest, + IncreasedAvailability: manifest.Args.IncreasedAvailability, + UpdateOnly: manifest.Args.UpdateOnly, + Files: manifest.Args.Files, + ExcludeRegions: manifest.Args.ExcludeRegions, + OnlyRegions: manifest.Args.OnlyRegions, + ExcludeMachines: manifest.Args.ExcludeMachines, + OnlyMachines: manifest.Args.OnlyMachines, + ProcessGroups: manifest.Args.ProcessGroups, + MaxConcurrent: manifest.Args.MaxConcurrent, + VolumeInitialSize: manifest.Args.VolumeInitialSize, + RestartPolicy: manifest.Args.RestartPolicy, + RestartMaxRetries: manifest.Args.RestartMaxRetries, + DeployRetries: manifest.Args.DeployRetries, + FromManifestID: fly.StringPointer(manifest.ID), } } @@ -150,6 +152,7 @@ type machineDeployment struct { volumeInitialSize int deployRetries int buildID string + FromManifestID *string } func NewMachineDeployment(ctx context.Context, args MachineDeploymentArgs) (_ MachineDeployment, err error) { @@ -277,6 +280,7 @@ func NewMachineDeployment(ctx context.Context, args MachineDeploymentArgs) (_ Ma processGroups: args.ProcessGroups, deployRetries: args.DeployRetries, buildID: args.BuildID, + FromManifestID: args.FromManifestID, } if err := md.setStrategy(); err != nil { tracing.RecordError(span, err, "failed to set strategy") diff --git a/internal/command/deploy/machines_deploymachinesapp.go b/internal/command/deploy/machines_deploymachinesapp.go index a698a8e981..b1f4eb2272 100644 --- a/internal/command/deploy/machines_deploymachinesapp.go +++ b/internal/command/deploy/machines_deploymachinesapp.go @@ -63,6 +63,7 @@ func (md *machineDeployment) DeployMachinesApp(ctx context.Context) error { var status string metadata := &fly.ReleaseMetadata{ + ManifestID: md.FromManifestID, PostDeploymentInfo: fly.PostDeploymentInfo{ FlyctlVersion: buildinfo.Info().Version.String(), }, diff --git a/internal/command/deploy/manifest.go b/internal/command/deploy/manifest.go index a71eda6e7b..f47f2916a9 100644 --- a/internal/command/deploy/manifest.go +++ b/internal/command/deploy/manifest.go @@ -12,14 +12,21 @@ import ( fly "github.com/superfly/fly-go" "github.com/superfly/flyctl/internal/appconfig" + "github.com/superfly/flyctl/internal/buildinfo" "github.com/superfly/flyctl/internal/flyutil" "github.com/superfly/flyctl/internal/sentry" "github.com/superfly/flyctl/iostreams" ) type DeployManifest struct { - AppName string - Config *appconfig.Config `json:"config"` + ID string `json:"id,omitempty"` + FlyVersion string `json:"fly_version,omitempty"` + AppName string `json:"app_name"` + Config *appconfig.Config `json:"config"` + Args *ManifestArgs `json:"args,omitempty"` +} + +type ManifestArgs struct { DeploymentImage string `json:"deployment_image,omitempty"` Strategy string `json:"strategy,omitempty"` EnvFromFlags []string `json:"env_from_flags,omitempty"` @@ -53,36 +60,38 @@ type DeployManifest struct { func NewManifest(AppName string, config *appconfig.Config, args MachineDeploymentArgs) *DeployManifest { return &DeployManifest{ - AppName: AppName, - Config: config, - DeploymentImage: args.DeploymentImage, - Strategy: args.Strategy, - EnvFromFlags: args.EnvFromFlags, - PrimaryRegionFlag: args.PrimaryRegionFlag, - SkipSmokeChecks: args.SkipSmokeChecks, - SkipHealthChecks: args.SkipHealthChecks, - SkipDNSChecks: args.SkipDNSChecks, - SkipReleaseCommand: args.SkipReleaseCommand, - MaxUnavailable: args.MaxUnavailable, - RestartOnly: args.RestartOnly, - WaitTimeout: args.WaitTimeout, - StopSignal: args.StopSignal, - LeaseTimeout: args.LeaseTimeout, - ReleaseCmdTimeout: args.ReleaseCmdTimeout, - Guest: args.Guest, - IncreasedAvailability: args.IncreasedAvailability, - UpdateOnly: args.UpdateOnly, - Files: args.Files, - ExcludeRegions: args.ExcludeRegions, - OnlyRegions: args.OnlyRegions, - ExcludeMachines: args.ExcludeMachines, - OnlyMachines: args.OnlyMachines, - ProcessGroups: args.ProcessGroups, - MaxConcurrent: args.MaxConcurrent, - VolumeInitialSize: args.VolumeInitialSize, - RestartPolicy: args.RestartPolicy, - RestartMaxRetries: args.RestartMaxRetries, - DeployRetries: args.DeployRetries, + AppName: AppName, + Config: config, + ID: fmt.Sprintf("manifest_%d", time.Now().UnixNano()), + FlyVersion: buildinfo.Info().Version.String(), + Args: &ManifestArgs{DeploymentImage: args.DeploymentImage, + Strategy: args.Strategy, + EnvFromFlags: args.EnvFromFlags, + PrimaryRegionFlag: args.PrimaryRegionFlag, + SkipSmokeChecks: args.SkipSmokeChecks, + SkipHealthChecks: args.SkipHealthChecks, + SkipDNSChecks: args.SkipDNSChecks, + SkipReleaseCommand: args.SkipReleaseCommand, + MaxUnavailable: args.MaxUnavailable, + RestartOnly: args.RestartOnly, + WaitTimeout: args.WaitTimeout, + StopSignal: args.StopSignal, + LeaseTimeout: args.LeaseTimeout, + ReleaseCmdTimeout: args.ReleaseCmdTimeout, + Guest: args.Guest, + IncreasedAvailability: args.IncreasedAvailability, + UpdateOnly: args.UpdateOnly, + Files: args.Files, + ExcludeRegions: args.ExcludeRegions, + OnlyRegions: args.OnlyRegions, + ExcludeMachines: args.ExcludeMachines, + OnlyMachines: args.OnlyMachines, + ProcessGroups: args.ProcessGroups, + MaxConcurrent: args.MaxConcurrent, + VolumeInitialSize: args.VolumeInitialSize, + RestartPolicy: args.RestartPolicy, + RestartMaxRetries: args.RestartMaxRetries, + DeployRetries: args.DeployRetries}, } } diff --git a/internal/command/deploy/remote.go b/internal/command/deploy/remote.go new file mode 100644 index 0000000000..728ef1529f --- /dev/null +++ b/internal/command/deploy/remote.go @@ -0,0 +1,563 @@ +package deploy + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "strings" + "time" + + "github.com/azazeal/pause" + "github.com/cenkalti/backoff" + "github.com/superfly/fly-go" + "github.com/superfly/fly-go/flaps" + "github.com/superfly/flyctl/gql" + "github.com/superfly/flyctl/internal/config" + "github.com/superfly/flyctl/internal/flag" + "github.com/superfly/flyctl/internal/flapsutil" + "github.com/superfly/flyctl/internal/flyutil" + "github.com/superfly/flyctl/internal/haikunator" + "github.com/superfly/flyctl/internal/logger" + "github.com/superfly/flyctl/internal/render" + "github.com/superfly/flyctl/iostreams" + "github.com/superfly/flyctl/logs" + "golang.org/x/sync/errgroup" +) + +type Deployer struct { + app *fly.App + machine *fly.Machine + flaps flapsutil.FlapsClient +} + +// Exec a remote "curl --unix-socket /path/to/socket -X GET http://localhost/ready" to check if the deployer is ready +// The deployer should return a 200 status code if it is ready +func (d *Deployer) Ready(ctx context.Context) (bool, error) { + var io = iostreams.FromContext(ctx) + + fmt.Fprintln(io.Out, "Waiting for remote deployer to be ready") + + ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + + b := backoff.NewExponentialBackOff() + b.InitialInterval = 500 * time.Millisecond + b.MaxElapsedTime = 5 * time.Second + + cmd := "curl --unix-socket /var/run/fly/deployer.sock -X GET http://localhost/ready" + + err := backoff.Retry(func() error { + res, err := d.flaps.Exec(ctx, d.machine.ID, &fly.MachineExecRequest{ + Cmd: cmd, + }) + if err != nil { + return err + } + if res.ExitCode != 0 { + return fmt.Errorf("remote deployer not ready") + } + if !strings.Contains(res.StdOut, "OK") { + return fmt.Errorf("remote deployer not ready: %s", res.StdOut) + } + return nil // Successful readiness check + }, b) + if err != nil { + return false, err + } + return true, nil +} + +func (d *Deployer) Done(ctx context.Context) (<-chan struct{}, error) { + var ( + done = make(chan struct{}) + io = iostreams.FromContext(ctx) + logger = logger.FromContext(ctx) + ) + + go func() { + defer close(done) + + ticker := time.NewTicker(4 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + // Context was canceled or timed out + return + case <-ticker.C: + machine, err := d.flaps.Get(ctx, d.machine.ID) + if err != nil { + logger.Warnf("Error getting machine %s from API: %v\n", d.machine.ID, err) + return + } + // render.JSON(io.Out, machine) + if exitEvent := machine.GetLatestEventOfTypeAfterType("exit", "start"); exitEvent != nil { + fmt.Fprintf(io.Out, "Machine exited with status %s\n", exitEvent.Status) + logger.Debugf("Machine %s exited with status %s\n", d.machine.ID, exitEvent.Status) + return + } + } + } + }() + return done, nil +} + +func deployRemotely(ctx context.Context, manifest *DeployManifest) error { + var ( + client = flyutil.ClientFromContext(ctx) + io = iostreams.FromContext(ctx) + ) + + org, err := client.GetOrganizationByApp(ctx, manifest.AppName) + if err != nil { + return err + } + + region := os.Getenv("FLY_REMOTE_BUILDER_REGION") + + // convert manifest to base64 so that we can pipe it to `fly deploy --manifest -` + manifestBase64, err := manifest.ToBase64() + if err != nil { + return err + } + + deployer, err := EnsureDeployer(ctx, org, manifest.AppName, region, manifestBase64) + if err != nil { + return err + } + + cmd := `bash -c "curl -s --unix-socket /var/run/fly/deployer.sock -X POST http://localhost/deploy"` + fmt.Fprintln(io.Out, "Executing deploy command on remote deployer") + + res, err := deployer.flaps.Exec(ctx, deployer.machine.ID, &fly.MachineExecRequest{ + Cmd: cmd, + }) + if err != nil { + return err + } + + if res.ExitCode != 0 { + if res.StdErr != "" { + fmt.Fprint(io.ErrOut, res.StdErr) + } + return fmt.Errorf("remote deploy failed with exit code %d", res.ExitCode) + } + if res.StdOut != "" { + fmt.Fprint(io.Out, res.StdOut) + } + if res.StdErr != "" { + fmt.Fprint(io.ErrOut, res.StdErr) + } + + if flag.GetBool(ctx, "watch") { + opts := &logs.LogOptions{ + AppName: deployer.app.Name, + VMID: deployer.machine.ID, + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var eg *errgroup.Group + eg, ctx = errgroup.WithContext(ctx) + + var streams []<-chan logs.LogEntry + if opts.NoTail { + streams = []<-chan logs.LogEntry{ + poll(ctx, eg, client, opts), + } + } else { + pollingCtx, cancelPolling := context.WithCancel(ctx) + streams = []<-chan logs.LogEntry{ + poll(pollingCtx, eg, client, opts), + nats(ctx, eg, client, opts, cancelPolling), + } + } + + // Handle log streaming in another goroutine + eg.Go(func() error { + return printStreams(ctx, streams...) + }) + + eg.Go(func() error { + done, err := deployer.Done(ctx) + if err != nil { + return err + } + + // Wait for either the machine to exit or the context to be canceled + select { + case <-done: + fmt.Fprintln(io.Out, "Tada! 🎉") + cancel() + case <-ctx.Done(): + } + return nil + }) + + // Wait for all goroutines to finish + if waitErr := eg.Wait(); waitErr != nil && !errors.Is(waitErr, context.Canceled) { + return waitErr + } + return nil + } + + return nil +} + +func EnsureDeployer(ctx context.Context, org *fly.Organization, appName, region, manifest string) (*Deployer, error) { + var deployer *Deployer + + deployer, err := findExistingDeployer(ctx, org, appName, region, manifest) + + switch { + case err == nil: + return deployer, nil + case !strings.Contains(err.Error(), "no deployer found"): + return nil, err + default: + // continue to create a new deployer + } + + deployer, err = createDeployer(ctx, org, appName, region, manifest) + if err != nil { + return nil, err + } + + switch ready, err := deployer.Ready(ctx); { + case err != nil: + return nil, err + case !ready: + return nil, fmt.Errorf("remote deployer not ready") + } + return deployer, nil +} + +// findExistingDeployer finds an existing deployer app in the org by getting all apps and filtering by the app role and the app name with "fly-deployer-*" +func findExistingDeployer(ctx context.Context, org *fly.Organization, appName, region, manifest string) (*Deployer, error) { + var ( + client = flyutil.ClientFromContext(ctx) + io = iostreams.FromContext(ctx) + ) + + deployer, err := client.GetDeployerAppByOrg(ctx, org.ID) + if err != nil { + return nil, err + } + flapsClient, err := flapsutil.NewClientWithOptions(ctx, flaps.NewClientOpts{ + AppName: deployer.Name, + OrgSlug: org.Slug, + }) + if err != nil { + return nil, err + } + + machines, err := flapsClient.ListActive(ctx) + if err != nil { + return nil, err + } + if len(machines) > 0 { + return nil, fmt.Errorf("a deployment is already in progress") + } + + fmt.Fprintln(io.Out, "Refreshing deploy token") + + token, err := getDeployToken(ctx, appName, org.ID) + if err != nil { + return nil, fmt.Errorf("failed getting deploy token: %w", err) + } + secrets := map[string]string{ + "FLY_API_TOKEN": token, + } + if _, err := client.SetSecrets(ctx, deployer.Name, secrets); err != nil { + return nil, fmt.Errorf("failed setting deploy token: %w", err) + } + + machine, err := createDeployerMachine(ctx, flapsClient, org.Slug, appName, region, manifest, org.PaidPlan) + if err != nil { + return nil, err + } + return &Deployer{ + app: deployer, + machine: machine, + flaps: flapsClient, + }, nil +} + +func createDeployer(ctx context.Context, org *fly.Organization, appName, region, manifest string) (*Deployer, error) { + var ( + io = iostreams.FromContext(ctx) + client = flyutil.ClientFromContext(ctx) + ) + + var ( + appRole = "fly-deployer" + deployerName = fmt.Sprintf("%s-%s", appRole, haikunator.Haikunator().Build()) + ) + + flapsClient, err := flapsutil.NewClientWithOptions(ctx, flaps.NewClientOpts{ + AppName: deployerName, + OrgSlug: org.Slug, + }) + if err != nil { + return nil, err + } + ctx = flapsutil.NewContextWithClient(ctx, flapsClient) + + deployerApp, err := client.CreateApp(ctx, fly.CreateAppInput{ + OrganizationID: org.ID, + Name: deployerName, + AppRoleID: appRole, + Machines: true, + PreferredRegion: fly.StringPointer(region), + }) + if err != nil { + return nil, err + } + defer func() { + if err != nil { + _ = client.DeleteApp(ctx, deployerName) + } + }() + + if err := flapsClient.WaitForApp(ctx, deployerApp.Name); err != nil { + return nil, err + } + + fmt.Fprintln(io.Out, "Setting deploy token") + + token, err := getDeployToken(ctx, appName, org.ID) + if err != nil { + return nil, fmt.Errorf("failed getting deploy token: %w", err) + } + secrets := map[string]string{ + "FLY_API_TOKEN": token, + } + + if _, err := client.SetSecrets(ctx, deployerApp.Name, secrets); err != nil { + return nil, fmt.Errorf("failed setting deploy token: %w", err) + } + + machine, err := createDeployerMachine(ctx, flapsClient, org.Slug, appName, region, manifest, org.PaidPlan) + if err != nil { + return nil, err + } + return &Deployer{ + app: deployerApp, + machine: machine, + flaps: flapsClient, + }, nil +} + +func createDeployerMachine(ctx context.Context, flapsClient flapsutil.FlapsClient, orgSlug, appName, region, manifest string, paidPlan bool) (*fly.Machine, error) { + guest := fly.MachineGuest{ + CPUKind: "shared", + CPUs: 4, + MemoryMB: 4096, + } + if paidPlan { + guest.CPUKind = "shared" + guest.CPUs = 8 + guest.MemoryMB = 8192 + } + + envVars := map[string]string{ + "ALLOW_ORG_SLUG": orgSlug, + "FLY_DEPLOY_APP": appName, + } + + var image = "docker.io/codebaker/fly-deployer:3d10c78" + + if os.Getenv("FLY_DEPLOYER_IMAGE") != "" { + image = os.Getenv("FLY_DEPLOYER_IMAGE") + } + + fmt.Fprintf(iostreams.FromContext(ctx).Out, "Using deployer image: %s\n", image) + + machineInput := fly.LaunchMachineInput{ + Region: region, + Config: &fly.MachineConfig{ + Env: envVars, + Guest: &guest, + Image: image, + Files: []*fly.File{ + { + GuestPath: "/app/manifest.json", + RawValue: &manifest, + }, + }, + Restart: &fly.MachineRestart{ + Policy: "on-failure", + MaxRetries: 3, + }, + AutoDestroy: true, // we want the machine to be destroyed after a successful deploy + }, + } + + machine, err := flapsClient.Launch(ctx, machineInput) + if err != nil { + return nil, err + } + + var state = "started" + + if err := flapsClient.Wait(ctx, machine, state, 60*time.Second); err != nil { + return nil, err + } + return machine, nil +} + +func getDeployToken(ctx context.Context, appName, orgID string) (string, error) { + const ( + tokenName = "remote deploy token" + profile = "deploy" + expiry = time.Minute * 300 + ) + + client := flyutil.ClientFromContext(ctx) + + app, err := client.GetAppCompact(ctx, appName) + if err != nil { + return "", err + } + + // tokens, err := client.GetAppLimitedAccessTokens(ctx, app.Name) + // if err != nil { + // return "", fmt.Errorf("failed getting existing tokens: %w", err) + // } + + // for _, token := range tokens { + // if token.Name != tokenName { + // continue + // } + + // disClient := flyio.DischargeClient(tp.WithBearerAuthentication( + // flyio.LocationAuthentication, + // config.FromContext(ctx).Tokens.UserTokenOnly().All(), + // )) + + // // logger.FromContext(ctx).Warnf("fetching discharge tokens for %s", token.Token) + + // tk, err := disClient.FetchDischargeTokens(ctx, token.Token) + // if err != nil { + // return "", fmt.Errorf("failed fetching discharge tokens: %w", err) + // } + // return tk, nil + // } + + // no existing token found + + options := &gql.LimitedAccessTokenOptions{ + "app_id": app.ID, + } + + resp, err := gql.CreateLimitedAccessToken( + ctx, + client.GenqClient(), + tokenName, + orgID, + profile, + options, + expiry.String(), + ) + if err != nil { + return "", fmt.Errorf("failed creating token: %w", err) + } + return resp.CreateLimitedAccessToken.LimitedAccessToken.TokenHeader, nil +} + +func printStreams(ctx context.Context, streams ...<-chan logs.LogEntry) error { + var eg *errgroup.Group + eg, ctx = errgroup.WithContext(ctx) + + out := iostreams.FromContext(ctx).Out + json := config.FromContext(ctx).JSONOutput + + for _, stream := range streams { + stream := stream + + eg.Go(func() error { + return printStream(ctx, out, stream, json) + }) + } + return eg.Wait() +} + +func printStream(ctx context.Context, w io.Writer, stream <-chan logs.LogEntry, json bool) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case entry, ok := <-stream: + if !ok { + return nil + } + + var err error + if json { + err = render.JSON(w, entry) + } else { + err = render.LogEntry(w, entry, + render.HideAllocID(), + render.RemoveNewlines(), + render.HideRegion(), + ) + } + + if err != nil { + return err + } + } + } +} + +func nats(ctx context.Context, eg *errgroup.Group, client flyutil.Client, opts *logs.LogOptions, cancelPolling context.CancelFunc) <-chan logs.LogEntry { + c := make(chan logs.LogEntry) + + eg.Go(func() error { + defer close(c) + + stream, err := logs.NewNatsStream(ctx, client, opts) + if err != nil { + logger := logger.FromContext(ctx) + + logger.Debugf("could not connect to wireguard tunnel: %v\n", err) + logger.Debug("falling back to log polling...") + + return nil + } + + // we wait for 2 seconds before canceling the polling context so that + // we get a few records + pause.For(ctx, 2*time.Second) + cancelPolling() + + for entry := range stream.Stream(ctx, opts) { + c <- entry + } + + return nil + }) + + return c +} + +func poll(ctx context.Context, eg *errgroup.Group, client flyutil.Client, opts *logs.LogOptions) <-chan logs.LogEntry { + c := make(chan logs.LogEntry) + + eg.Go(func() (err error) { + defer close(c) + + if err = logs.Poll(ctx, c, client, opts); errors.Is(err, context.Canceled) { + // if the parent context is cancelled then the errorgroup will return + // context.Canceled because nats and/or printStreams will return it. + err = nil + } + return + }) + + return c +} diff --git a/test/preflight/fly_deploy_test.go b/test/preflight/fly_deploy_test.go index 5584314a99..e0181d9d83 100644 --- a/test/preflight/fly_deploy_test.go +++ b/test/preflight/fly_deploy_test.go @@ -335,7 +335,7 @@ func TestDeployManifest(t *testing.T) { f.Fly("deploy --export-manifest %s", manifestPath) manifest := f.ReadFile("manifest.json") - require.Contains(t, manifest, `"AppName": "`+appName+`"`) + require.Contains(t, manifest, `"app_name": "`+appName+`"`) require.Contains(t, manifest, `"primary_region": "`+f.PrimaryRegion()+`"`) require.Contains(t, manifest, `"internal_port": 80`) require.Contains(t, manifest, `"increased_availability": true`)