diff --git a/js/spec/binary-client.spec.ts b/js/spec/binary-client.spec.ts index d7c0444..128121c 100644 --- a/js/spec/binary-client.spec.ts +++ b/js/spec/binary-client.spec.ts @@ -1,3 +1,4 @@ +import crypto from "crypto"; import * as fs from "fs"; import { Client } from "pg"; import { encodeContent } from "../src"; @@ -131,6 +132,29 @@ describe("binary client operations", () => { expect(JSON.stringify(result)).toMatch('{"count":0}'); expect(result.count).toStrictEqual(0); }); + + it("can timeout while rebuilding the file system", async () => { + const project = 1337n; + + await grpcClient.newProject(project, []); + + const stream = grpcClient.updateObjects(project); + for (let i = 0; i < 1000; i++) { + const content = encodeContent(crypto.randomBytes(512 * 1024).toString("hex")); + await stream.send({ + path: `hello-${i}.txt`, + mode: 0o755n, + content: content, + size: BigInt(content.length), + deleted: false, + }); + } + await stream.complete(); + + const dir = tmpdir(); + const rebuildPromise = binaryClient.rebuild(project, null, dir, { timeout: 1 }); + await expect(rebuildPromise).rejects.toThrow(/context deadline exceeded/); + }, 20_000); }); describe("Gadget file match tests", () => { @@ -140,8 +164,9 @@ describe("Gadget file match tests", () => { await grpcClient.newProject(project, []); + const stream = grpcClient.updateObjects(project); for (const path of paths) { - await grpcClient.updateObject(project, { + await stream.send({ path, mode: 0o755n, content: encodedContent, @@ -149,6 +174,7 @@ describe("Gadget file match tests", () => { deleted: false, }); } + await stream.complete(); const dir = tmpdir(); const result = await binaryClient.rebuild(project, null, dir, { matchInclude: include, matchExclude: exclude }); diff --git a/js/src/binary-client.ts b/js/src/binary-client.ts index 4600635..8a4ddbf 100644 --- a/js/src/binary-client.ts +++ b/js/src/binary-client.ts @@ -148,10 +148,12 @@ export class DateiLagerBinaryClient { ? { update: options.timeout, rebuild: options.timeout, + gc: options.timeout, } : { update: 0, rebuild: 0, + gc: 0, ...options.timeout, }, tracing: options.tracing ?? false, @@ -365,6 +367,9 @@ export class DateiLagerBinaryClient { baseArgs.push("--headless-host", this._options.headlessHost); } + const timeout = options?.timeout ?? this._options.timeout[method]; + baseArgs.push("--timeout", String(timeout)); + if (this._options.tracing) { const carrier = {}; propagation.inject(context.active(), carrier); @@ -375,7 +380,6 @@ export class DateiLagerBinaryClient { const subprocess = execa(this._options.command, baseArgs.concat(args), { cwd, cleanup: false, // don't terminate this subprocess process eagerly when the parent process is terminated, which is execa's default behaviour. we use graceful shutdown gadget-side to give running operations a chance to complete, and we don't want to terminate them prematurely - timeout: options?.timeout ?? this._options.timeout[method], env: { DL_TOKEN: await this._options.token() }, }); diff --git a/pkg/cli/client.go b/pkg/cli/client.go index 1644a5d..7da1125 100644 --- a/pkg/cli/client.go +++ b/pkg/cli/client.go @@ -5,6 +5,7 @@ import ( "encoding/json" "flag" "fmt" + "os" "strings" "time" @@ -33,9 +34,12 @@ func NewClientCommand() *cobra.Command { otelContext string host string port uint16 + timeout uint16 headlessHost string ) + var cancel context.CancelFunc + cmd := &cobra.Command{ Use: "client", Short: "DateiLager client", @@ -57,6 +61,12 @@ func NewClientCommand() *cobra.Command { ctx := cmd.Context() + fmt.Fprintf(os.Stderr, "timeout: %v\n", timeout) + if timeout != 0 { + ctx, cancel = context.WithTimeout(cmd.Context(), time.Duration(timeout)*time.Second) + fmt.Fprintf(os.Stderr, "duration: %v\n", time.Duration(timeout)*time.Second) + } + if tracing { shutdownTelemetry = telemetry.Init(ctx, telemetry.Client) } @@ -88,6 +98,12 @@ func NewClientCommand() *cobra.Command { return nil }, + PersistentPostRunE: func(cmd *cobra.Command, _ []string) error { + if cancel != nil { + cancel() + } + return nil + }, } flags := cmd.PersistentFlags() @@ -99,6 +115,7 @@ func NewClientCommand() *cobra.Command { flags.StringVar(&otelContext, "otel-context", "", "Open Telemetry context") flags.StringVar(&host, "host", "", "GRPC server hostname") flags.Uint16Var(&port, "port", 5051, "GRPC server port") + flags.Uint16Var(&timeout, "timeout", 0, "GRPC client timeout") flags.StringVar(&headlessHost, "headless-host", "", "Alternative headless hostname to use for round robin connections") cmd.AddCommand(NewCmdGet()) @@ -115,11 +132,8 @@ func NewClientCommand() *cobra.Command { } func ClientExecute() { - ctx, cancel := context.WithTimeout(context.Background(), 200*time.Second) - defer cancel() - + ctx := context.Background() cmd := NewClientCommand() - err := cmd.ExecuteContext(ctx) client := client.FromContext(cmd.Context())