Skip to content

Commit

Permalink
wip: multi-worker wrangler dev
Browse files Browse the repository at this point in the history
  • Loading branch information
RamIdeas committed Sep 24, 2024
1 parent e2c3e53 commit d2e2ec8
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 47 deletions.
3 changes: 2 additions & 1 deletion packages/wrangler/src/api/dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ export async function unstable_dev(
onReady: (address, port, proxyData) => {
readyResolve({ address, port, proxyData });
},
config: options?.config,
// @ts-expect-error who cares
config: options?.config === undefined ? undefined : [options.config],
env: options?.env,
processEntrypoint,
additionalModules,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import * as MF from "../../dev/miniflare";
import { logger } from "../../logger";
import { RuntimeController } from "./BaseController";
import { castErrorCause } from "./events";
import { ProxyControllerLogger } from "./ProxyController";
import { convertBindingsToCfWorkerInitBindings } from "./utils";
import type { WorkerEntrypointsDefinition } from "../../dev-registry";
import type {
Expand Down Expand Up @@ -146,12 +147,19 @@ export class LocalRuntimeController extends RuntimeController {

async #onBundleComplete(data: BundleCompleteEvent, id: number) {
try {
const { options, internalObjects, entrypointNames } =
// eslint-disable-next-line prefer-const
let { options, internalObjects, entrypointNames } =
await MF.buildMiniflareOptions(
this.#log,
await convertToConfigBundle(data),
this.#proxyToUserWorkerAuthenticationSecret
);
options = {
...options,
log: new ProxyControllerLogger(MF.castLogLevel(logger.loggerLevel), {
prefix: `wrangler-${data.config.name}`,
}),
};
options.liveReload = false; // TODO: set in buildMiniflareOptions once old code path is removed
if (this.#mf === undefined) {
logger.log(chalk.dim("⎔ Starting local server..."));
Expand Down
22 changes: 13 additions & 9 deletions packages/wrangler/src/api/startDevWorker/ProxyController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { randomUUID } from "node:crypto";
import events from "node:events";
import path from "node:path";
import { LogLevel, Miniflare, Mutex, Response } from "miniflare";
import { fetch } from "undici";
import inspectorProxyWorkerPath from "worker:startDevWorker/InspectorProxyWorker";
import proxyWorkerPath from "worker:startDevWorker/ProxyWorker";
import WebSocket from "ws";
Expand Down Expand Up @@ -106,6 +107,7 @@ export class ProxyController extends Controller<ProxyControllerEventMap> {
},
bindings: {
PROXY_CONTROLLER_AUTH_SECRET: this.secret,
NAME: this.latestConfig.name ?? "",
},

// no need to use file-system, so don't
Expand Down Expand Up @@ -158,7 +160,7 @@ export class ProxyController extends Controller<ProxyControllerEventMap> {
log: new ProxyControllerLogger(castLogLevel(logger.loggerLevel), {
prefix:
// if debugging, log requests with specic ProxyWorker prefix
logger.loggerLevel === "debug" ? "wrangler-ProxyWorker" : "wrangler",
`wrangler-ProxyWorker-${this.latestConfig.name}`,
}),
handleRuntimeStdio,
liveReload: false,
Expand Down Expand Up @@ -286,20 +288,22 @@ export class ProxyController extends Controller<ProxyControllerEventMap> {

try {
await this.runtimeMessageMutex.runWith(async () => {
const { proxyWorker } = await this.ready.promise;
const { proxyWorker, url } = await this.ready.promise;

const ready = await proxyWorker.ready.catch(() => undefined);
if (!ready) {
return;
}

return proxyWorker.dispatchFetch(
`http://dummy/cdn-cgi/ProxyWorker/${message.type}`,
{
headers: { Authorization: this.secret },
cf: { hostMetadata: message },
}
const proxyWorkerUrl = new URL(
`/cdn-cgi/ProxyWorker/${message.type}`,
url
);
return fetch(proxyWorkerUrl, {
method: "POST",
headers: { Authorization: this.secret },
body: JSON.stringify(message),
});
});
} catch (cause) {
if (this._torndown) {
Expand All @@ -317,7 +321,7 @@ export class ProxyController extends Controller<ProxyControllerEventMap> {
error
);

throw error;
// throw error;
}
}
async sendMessageToInspectorProxyWorker(
Expand Down
82 changes: 82 additions & 0 deletions packages/wrangler/src/api/startDevWorker/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import assert from "node:assert";
import { updateDevEnvRegistry } from "../../dev";
import { serializeWorkerRegistryDefinition } from "../../dev/local";
import { DevEnv } from "./DevEnv";
import type { WorkerDefinition } from "../../dev-registry";
import type { StartDevWorkerInput, Worker } from "./types";

export { DevEnv };
Expand All @@ -12,3 +16,81 @@ export async function startWorker(

return devEnv.startWorker(options);
}

export async function startMultiWorker(
optionsArray: StartDevWorkerInput[],
devEnv0: DevEnv
): Promise<DevEnv[]> {
const workerRegistry = new Map<string, WorkerDefinition>();
let prevRegistry: Record<string, WorkerDefinition> = {};
async function updateWorkerRegistry(
name: string,
definition: WorkerDefinition
) {
workerRegistry.set(name, definition);

if (!devEnvs) {
return;
}

const nextRegistry = Object.fromEntries(workerRegistry);

if (JSON.stringify(prevRegistry) !== JSON.stringify(nextRegistry)) {
prevRegistry = nextRegistry;
await Promise.all(
devEnvs.map(async (devEnv) => {
await updateDevEnvRegistry(
devEnv,
Object.fromEntries(workerRegistry)
);
})
);
}
}

const devEnvs = await Promise.all(
optionsArray.map(async (options, workerIndex) => {
const devEnv = workerIndex === 0 ? devEnv0 : new DevEnv();

devEnv.runtimes.forEach((runtime) => {
runtime.on("reloadComplete", async (reloadEvent) => {
if (!reloadEvent.config.dev?.remote) {
const { url } = await devEnv.proxy.ready.promise;
const { name } = reloadEvent.config;
assert(name); // default value "multi-worker-n" is defined below

const definition = serializeWorkerRegistryDefinition(
url,
name,
reloadEvent.proxyData.internalDurableObjects,
reloadEvent.proxyData.entrypointAddresses
);

if (definition) {
await updateWorkerRegistry(name, definition);
}
}
});
});

await devEnv.config.set({
// name: `multi-worker-${workerIndex + 1}`,
...options,
dev: {
...options.dev,
remote: false,
inspector: { port: 0 },
server: {
...options.dev?.server,
// port: options.dev?.server?.port ?? 0,
// hostname: "localhost",
},
},
});

return devEnv;
})
);

return devEnvs;
}
57 changes: 42 additions & 15 deletions packages/wrangler/src/dev.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import util from "node:util";
import { isWebContainer } from "@webcontainer/env";
import { watch } from "chokidar";
import { render } from "ink";
import { DevEnv } from "./api";
import { DevEnv, startMultiWorker } from "./api";
import {
convertCfWorkerInitBindingstoBindings,
extractBindingsOfType,
Expand Down Expand Up @@ -64,6 +64,7 @@ import type {
} from "./config/environment";
import type { CfModule, CfWorkerInit } from "./deployment-bundle/worker";
import type { WorkerRegistry } from "./dev-registry";
import type { ExperimentalAssetsOptions } from "./experimental-assets";
import type { LoggerLevel } from "./logger";
import type { EnablePagesAssetsServiceBindingOptions } from "./miniflare-cli/types";
import type {
Expand All @@ -76,6 +77,14 @@ import type React from "react";
export function devOptions(yargs: CommonYargsArgv) {
return (
yargs
.option("config", {
alias: "c",
describe:
"Path to .toml configuration file. Set mutliple times for multi-worker support.",
type: "string",
array: true,
requiresArg: true,
})
.positional("script", {
describe: "The path to an entry point for your worker",
type: "string",
Expand Down Expand Up @@ -398,10 +407,16 @@ This is currently not supported 😭, but we think that we'll get it to work soo
() => startDev(args)
);
if (args.experimentalDevEnv) {
assert(devInstance instanceof DevEnv);
await events.once(devInstance, "teardown");
if (devInstance instanceof DevEnv) {
await events.once(devInstance, "teardown");
} else if (Array.isArray(devInstance)) {
await Promise.all(
devInstance.map((worker) => events.once(worker, "teardown"))
);
}
} else {
assert(!(devInstance instanceof DevEnv));
assert(!Array.isArray(devInstance));

configFileWatcher = devInstance.configFileWatcher;
assetsWatcher = devInstance.assetsWatcher;
Expand Down Expand Up @@ -467,7 +482,7 @@ export type StartDevOptions = DevArguments &
onReady?: (ip: string, port: number, proxyData: ProxyData) => void;
};

async function updateDevEnvRegistry(
export async function updateDevEnvRegistry(
devEnv: DevEnv,
registry: WorkerRegistry | undefined
) {
Expand Down Expand Up @@ -547,7 +562,7 @@ export async function startDev(args: StartDevOptions) {
let rerender: (node: React.ReactNode) => void | undefined;
try {
const configPath =
args.config ||
args.config?.[0] ||
(args.script && findWranglerToml(path.dirname(args.script)));
let config = readConfig(configPath, args);

Expand Down Expand Up @@ -654,13 +669,7 @@ export async function startDev(args: StartDevOptions) {
);
});
}

let unregisterHotKeys: () => void;
if (isInteractive() && args.showInteractiveDevSession !== false) {
unregisterHotKeys = registerDevHotKeys(devEnv, args);
}

await devEnv.config.set({
const startWorkerInput: StartDevWorkerInput = {
name: args.name,
config: configPath,
entrypoint: args.script,
Expand Down Expand Up @@ -730,7 +739,7 @@ export async function startDev(args: StartDevOptions) {
if (!accountId) {
unregisterHotKeys?.();
accountId = await requireAuth({});
unregisterHotKeys = registerDevHotKeys(devEnv, args);
unregisterHotKeys = registerDevHotKeys(devEnvs, args);
}

return {
Expand Down Expand Up @@ -790,7 +799,24 @@ export async function startDev(args: StartDevOptions) {
// otherwise config at startup ends up overriding future config changes in the
// ConfigController
assets: args.assets ? assetsOptions : undefined,
} satisfies StartDevWorkerInput);
};

assert(args.config);
logger.log(args.config);
const options = await Promise.all(
args.config.map((configPath1, workerIndex) =>
workerIndex === 0 ? startWorkerInput : { config: configPath1 }
)
);

const devEnvs = await startMultiWorker(options, devEnv);
args.disableDevRegistry = args.config.length === 1;
args.forceLocal ||= args.config.length > 1;

let unregisterHotKeys = () => {};
if (isInteractive() && args.showInteractiveDevSession !== false) {
unregisterHotKeys = registerDevHotKeys(devEnvs, args);
}

void metrics.sendMetricsEvent(
"run dev",
Expand Down Expand Up @@ -1031,7 +1057,8 @@ export async function startApiDev(args: StartDevOptions) {
}

const configPath =
args.config || (args.script && findWranglerToml(path.dirname(args.script)));
args.config?.[0] ||
(args.script && findWranglerToml(path.dirname(args.script)));
const projectRoot = configPath && path.dirname(configPath);
const config = readConfig(configPath, args);

Expand Down
19 changes: 10 additions & 9 deletions packages/wrangler/src/dev/hotkeys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,42 @@ import { openInspector } from "./inspect";
import type { DevEnv } from "../api";

export default function registerDevHotKeys(
devEnv: DevEnv,
devEnvs: DevEnv[],
args: { forceLocal?: boolean }
) {
const primaryDevEnv = devEnvs[0];
const unregisterHotKeys = registerHotKeys([
{
keys: ["b"],
label: "open a browser",
handler: async () => {
const { url } = await devEnv.proxy.ready.promise;
const { url } = await primaryDevEnv.proxy.ready.promise;
await openInBrowser(url.href);
},
},
{
keys: ["d"],
label: "open devtools",
handler: async () => {
const { inspectorUrl } = await devEnv.proxy.ready.promise;
const { inspectorUrl } = await primaryDevEnv.proxy.ready.promise;

// TODO: refactor this function to accept a whole URL (not just .port and assuming .hostname)
await openInspector(
parseInt(inspectorUrl.port),
devEnv.config.latestConfig?.name
primaryDevEnv.config.latestConfig?.name
);
},
},
{
keys: ["l"],
disabled: () => args.forceLocal ?? false,
label: () =>
`turn ${devEnv.config.latestConfig?.dev?.remote ? "on" : "off"} local mode`,
`turn ${primaryDevEnv.config.latestConfig?.dev?.remote ? "on" : "off"} local mode`,
handler: async () => {
await devEnv.config.patch({
await primaryDevEnv.config.patch({
dev: {
...devEnv.config.latestConfig?.dev,
remote: !devEnv.config.latestConfig?.dev?.remote,
...primaryDevEnv.config.latestConfig?.dev,
remote: !primaryDevEnv.config.latestConfig?.dev?.remote,
},
});
},
Expand All @@ -56,7 +57,7 @@ export default function registerDevHotKeys(
label: "to exit",
handler: async () => {
unregisterHotKeys();
await devEnv.teardown();
await Promise.allSettled(devEnvs.map((devEnv) => devEnv.teardown()));
},
},
]);
Expand Down
Loading

0 comments on commit d2e2ec8

Please sign in to comment.