Skip to content

Commit

Permalink
Merge pull request #80 from gadget-inc/move_binary_client_timeout_to_go
Browse files Browse the repository at this point in the history
Move binary client timeout to Go
  • Loading branch information
angelini authored Mar 19, 2024
2 parents ecd6db9 + 934f54f commit 15d7564
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 6 deletions.
28 changes: 27 additions & 1 deletion js/spec/binary-client.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import crypto from "crypto";
import * as fs from "fs";
import { Client } from "pg";
import { encodeContent } from "../src";
Expand Down Expand Up @@ -131,6 +132,29 @@ describe("binary client operations", () => {
expect(JSON.stringify(result)).toMatch('{"count":0}');
expect(result.count).toStrictEqual(0);
});

it("can timeout while rebuilding the file system", async () => {
const project = 1337n;

await grpcClient.newProject(project, []);

const stream = grpcClient.updateObjects(project);
for (let i = 0; i < 1000; i++) {
const content = encodeContent(crypto.randomBytes(512 * 1024).toString("hex"));
await stream.send({
path: `hello-${i}.txt`,
mode: 0o755n,
content: content,
size: BigInt(content.length),
deleted: false,
});
}
await stream.complete();

const dir = tmpdir();
const rebuildPromise = binaryClient.rebuild(project, null, dir, { timeout: 1 });
await expect(rebuildPromise).rejects.toThrow(/context deadline exceeded/);
}, 20_000);
});

describe("Gadget file match tests", () => {
Expand All @@ -140,15 +164,17 @@ describe("Gadget file match tests", () => {

await grpcClient.newProject(project, []);

const stream = grpcClient.updateObjects(project);
for (const path of paths) {
await grpcClient.updateObject(project, {
await stream.send({
path,
mode: 0o755n,
content: encodedContent,
size: BigInt(encodedContent.length),
deleted: false,
});
}
await stream.complete();

const dir = tmpdir();
const result = await binaryClient.rebuild(project, null, dir, { matchInclude: include, matchExclude: exclude });
Expand Down
6 changes: 5 additions & 1 deletion js/src/binary-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,12 @@ export class DateiLagerBinaryClient {
? {
update: options.timeout,
rebuild: options.timeout,
gc: options.timeout,
}
: {
update: 0,
rebuild: 0,
gc: 0,
...options.timeout,
},
tracing: options.tracing ?? false,
Expand Down Expand Up @@ -365,6 +367,9 @@ export class DateiLagerBinaryClient {
baseArgs.push("--headless-host", this._options.headlessHost);
}

const timeout = options?.timeout ?? this._options.timeout[method];
baseArgs.push("--timeout", String(timeout));

if (this._options.tracing) {
const carrier = {};
propagation.inject(context.active(), carrier);
Expand All @@ -375,7 +380,6 @@ export class DateiLagerBinaryClient {
const subprocess = execa(this._options.command, baseArgs.concat(args), {
cwd,
cleanup: false, // don't terminate this subprocess process eagerly when the parent process is terminated, which is execa's default behaviour. we use graceful shutdown gadget-side to give running operations a chance to complete, and we don't want to terminate them prematurely
timeout: options?.timeout ?? this._options.timeout[method],
env: { DL_TOKEN: await this._options.token() },
});

Expand Down
22 changes: 18 additions & 4 deletions pkg/cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"flag"
"fmt"
"os"
"strings"
"time"

Expand Down Expand Up @@ -33,9 +34,12 @@ func NewClientCommand() *cobra.Command {
otelContext string
host string
port uint16
timeout uint16
headlessHost string
)

var cancel context.CancelFunc

cmd := &cobra.Command{
Use: "client",
Short: "DateiLager client",
Expand All @@ -57,6 +61,12 @@ func NewClientCommand() *cobra.Command {

ctx := cmd.Context()

fmt.Fprintf(os.Stderr, "timeout: %v\n", timeout)
if timeout != 0 {
ctx, cancel = context.WithTimeout(cmd.Context(), time.Duration(timeout)*time.Second)
fmt.Fprintf(os.Stderr, "duration: %v\n", time.Duration(timeout)*time.Second)
}

if tracing {
shutdownTelemetry = telemetry.Init(ctx, telemetry.Client)
}
Expand Down Expand Up @@ -88,6 +98,12 @@ func NewClientCommand() *cobra.Command {

return nil
},
PersistentPostRunE: func(cmd *cobra.Command, _ []string) error {
if cancel != nil {
cancel()
}
return nil
},
}

flags := cmd.PersistentFlags()
Expand All @@ -99,6 +115,7 @@ func NewClientCommand() *cobra.Command {
flags.StringVar(&otelContext, "otel-context", "", "Open Telemetry context")
flags.StringVar(&host, "host", "", "GRPC server hostname")
flags.Uint16Var(&port, "port", 5051, "GRPC server port")
flags.Uint16Var(&timeout, "timeout", 0, "GRPC client timeout")
flags.StringVar(&headlessHost, "headless-host", "", "Alternative headless hostname to use for round robin connections")

cmd.AddCommand(NewCmdGet())
Expand All @@ -115,11 +132,8 @@ func NewClientCommand() *cobra.Command {
}

func ClientExecute() {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Second)
defer cancel()

ctx := context.Background()
cmd := NewClientCommand()

err := cmd.ExecuteContext(ctx)

client := client.FromContext(cmd.Context())
Expand Down

0 comments on commit 15d7564

Please sign in to comment.