Skip to content

Commit

Permalink
Merge branch 'release/v0.16.8'
Browse files Browse the repository at this point in the history
  • Loading branch information
holtwick committed Feb 8, 2024
2 parents 9e061f3 + 2658964 commit e5a1c37
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 392 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "zeed",
"type": "module",
"version": "0.16.7",
"version": "0.16.8",
"description": "🌱 Simple foundation library",
"author": {
"name": "Dirk Holtwick",
Expand Down
2 changes: 0 additions & 2 deletions src/common/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// (C)opyright 2021-07-15 Dirk Holtwick, holtwick.it. All rights reserved.

export * from './assert'
export * from './bin'
export * from './crypto'
Expand Down
56 changes: 56 additions & 0 deletions src/common/msg/pipe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Experiment to replace "Channel" by "Pipe".
// Different naming just to avoid confusion.
// Goal: Simplify things by removing event handling etc.

export interface Pipe<PipeObjectData = object, PipeRawData = PipeObjectData> {
/** Function to post raw message */
post: (data: PipeObjectData) => Promise<void> | void

/** Listener to receive raw message */
on: (fn: (data: PipeObjectData) => Promise<void> | void) => void

/** Custom function to serialize data */
serialize?: (data: PipeObjectData) => Promise<PipeRawData> | PipeRawData

/** Custom function to deserialize data */
deserialize?: (data: PipeRawData) => Promise<PipeObjectData> | PipeObjectData
}

// export interface Pipe<PipeObjectData = object, PipeRawData = PipeObjectData> {
// /** Function to post raw message */
// post: (data: PipeObjectData) => void

// /** Listener to receive raw message */
// on: (fn: (data: PipeObjectData) => void) => void

// /** Custom function to serialize data */
// serialize?: (data: PipeObjectData) => PipeRawData

// /** Custom function to deserialize data */
// deserialize?: (data: PipeRawData) => PipeObjectData
// }

// describe('types.spec', () => {
// it.skip('should pipe', async () => {
// const p1: Pipe<object, string> = {
// post(s) { }, // todo
// on: (fn) => { }, // todo
// serialize(o) {
// return JSON.stringify(o)
// },
// deserialize(s) {
// return JSON.parse(s)
// },
// }

// async function echo(pipe: Pipe<any, any>, o: object) {
// let resolve: any
// pipe.on(fn => resolve = fn)
// pipe.post(o)
// return new Promise(resolve)
// }

// const x = await echo(p1, { a: 1 })
// expect(x).toMatchInlineSnapshot()
// })
// })
77 changes: 69 additions & 8 deletions src/common/msg/rpc.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { MessageChannel } from 'node:worker_threads'
import { decodeJson, encodeJson } from '../bin'
import { createLocalChannelPair } from '../msg/channel'
import { sleep } from '../exec/promise'
import { useRPC, useRPCHub } from './rpc'

let bobCount = 0
Expand All @@ -25,23 +27,19 @@ const Alice = {
type BobFunctions = typeof Bob
type AliceFunctions = typeof Alice

describe('rpc', () => {
describe('rpc async', () => {
beforeEach(() => {
bobCount = 0
})

it('basic', async () => {
const channel = new MessageChannel()

// const serialize = (data: any) => JSON.stringify(data)
// const deserialize = (data: any) => JSON.parse(data)

const serialize = (data: any) => encodeJson(data)
const deserialize = (data: any) => decodeJson(data)

const bob = useRPC<BobFunctions, AliceFunctions>(Bob, {
post: data => channel.port1.postMessage(data),

on: data => channel.port1.on('message', data),
serialize,
deserialize,
Expand All @@ -62,7 +60,7 @@ describe('rpc', () => {
expect(await alice.hi('Alice')).toEqual('Hi Alice, I am Bob')

// one-way event
expect(alice.bump()).toBeUndefined()
expect(void alice.bump()).toBeUndefined()

expect(Bob.getCount()).toBe(0)
await new Promise(resolve => setTimeout(resolve, 100))
Expand All @@ -79,7 +77,6 @@ describe('rpc', () => {

const bobHub = useRPCHub({
post: data => channel.port1.postMessage(data),

on: data => channel.port1.on('message', data),
serialize,
deserialize,
Expand All @@ -102,7 +99,7 @@ describe('rpc', () => {
expect(await alice.hi('Alice')).toEqual('Hi Alice, I am Bob')

// one-way event
expect(alice.bump()).toBeUndefined()
expect(void alice.bump()).toBeUndefined()

expect(Bob.getCount()).toBe(0)
await new Promise(resolve => setTimeout(resolve, 100))
Expand All @@ -111,4 +108,68 @@ describe('rpc', () => {
channel.port1.close()
channel.port2.close()
})

it('timeout async', async (done) => {
const [f1, f2] = createLocalChannelPair()

const rpc1 = useRPC({
async echo(v: string, s = 5) {
// console.log('echo 1', v)
await sleep(s)
return `${v}_1`
},
}, {
post: (data) => {
// console.log(1, 'post', data)
f1.postMessage(data)
},
on: data => f1.on('message', (msg) => {
// console.log(1, msg)
void data(msg.data)
}),
timeout: 10,
})

const rpc2 = useRPC({
async echo(v: string, s = 5) {
// console.log('echo 2', v)
await sleep(s)
return `${v}_2`
},
}, {
post: (data) => {
// console.log(2, 'post', data)
f2.postMessage(data)
},
on: data => f2.on('message', (msg) => {
// console.log(2, msg)
void data(msg.data)
}),
timeout: 10,
})

const r = await rpc1.echo('abc')
expect(r).toMatchInlineSnapshot(`"abc_2"`)

try {
const r2 = await rpc1.echo('abc', 50)
expect(false).toBe(true)
}
catch (err) {
expect(err).toMatchInlineSnapshot(`[Error: rpc timeout on calling "echo"]`)
}
})

it('async', async (done) => {
function echo(n: number): any {
return n
}

async function echoAsync(n: number) {
return n
}

expect(await echo(1)).toBe(1)
expect(await echoAsync(2)).toBe(2)
})
})
104 changes: 68 additions & 36 deletions src/common/msg/rpc.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
// From https://github.com/antfu/birpc/blob/main/src/index.ts MIT

import { createPromise } from '../exec/promise'
import type { LoggerInterface } from '../log/log-base'
import type { Pipe } from './pipe'

export type ArgumentsType<T> = T extends (...args: infer A) => any ? A : never
export type ReturnType<T> = T extends (...args: any) => infer R ? R : never

export interface RPCOptionsBasic {
export interface RPCOptionsBasic extends Pipe {
/** No return values expected */
onlyEvents?: boolean
/** Function to post raw message */
post: (data: any) => void
/** Listener to receive raw message */
on: (fn: (data: any) => void | Promise<void>) => void
/** Custom function to serialize data */
serialize?: (data: any) => any
/** Custom function to deserialize data */
deserialize?: (data: any) => any
/** Maximum timeout for waiting for response, in milliseconds */
timeout?: number
/** Custom logger */
log?: LoggerInterface
/** Custom error handler */
onError?: (error: Error, functionName: string, args: any[]) => boolean | void
/** Custom error handler for timeouts */
onTimeoutError?: (functionName: string, args: any[]) => boolean | void
}

export interface RPCOptions<Remote> extends RPCOptionsBasic {
Expand Down Expand Up @@ -49,34 +49,44 @@ enum RPCMode {

type RPCMessage = [
RPCMode,
any, // args
number | undefined | null, // id
string | undefined | null, // method
number,
string | any,
...any,
]

const defaultSerialize = (i: any) => i
const defaultDeserialize = defaultSerialize

function setupRPCBasic(options: RPCOptionsBasic, functions: any, eventNames: string[] = []) {
const {
log,
post,
on,
serialize = defaultSerialize,
deserialize = defaultDeserialize,
log,
timeout = 60e3,
onError,
onTimeoutError,
onlyEvents = false,
} = options

const rpcPromiseMap = new Map<number, { resolve: (...args: any) => any, reject: (...args: any) => any }>()
const rpcPromiseMap = new Map<number, {
resolve: (...args: any) => any
reject: (...args: any) => any
timeoutId: Parameters<typeof clearTimeout>[0]
}>()

on(async (data) => {
try {
const msg = deserialize(data) as RPCMessage
const [mode, args, id, method] = msg
const msg = await deserialize(data) as RPCMessage
const mode = msg?.[0]
const id = mode === RPCMode.event ? 0 : msg?.[1]
const [method, ...args] = msg.slice(mode === RPCMode.event ? 1 : 2)
if (mode === RPCMode.request || mode === RPCMode.event) {
let result, error: any
if (method != null) {
try {
const fn = functions[method]
const fn = functions[method] as Function
result = await fn(...args)
}
catch (e) {
Expand All @@ -86,21 +96,25 @@ function setupRPCBasic(options: RPCOptionsBasic, functions: any, eventNames: str
else {
error = 'Method implementation missing'
}
if (error)
if (error) {
log?.warn('error', msg, error)
if (mode === RPCMode.request && id) {
if (error)
post(serialize([RPCMode.reject, error, id]))
else
post(serialize([RPCMode.resolve, result, id]))
onError?.(error, method ?? '', args)
}
if (id > 0) {
const data = await serialize(error
? [RPCMode.reject, id, error]
: [RPCMode.resolve, id, result])
await post(data)
}
}
else if (id) {
const promise = rpcPromiseMap.get(id)
if (promise != null) {
clearTimeout(promise.timeoutId)
if (mode === RPCMode.reject)
promise.reject(args)
else promise.resolve(args)
promise.reject(method)
else
promise.resolve(method)
}
rpcPromiseMap.delete(id)
}
Expand All @@ -112,19 +126,36 @@ function setupRPCBasic(options: RPCOptionsBasic, functions: any, eventNames: str

const proxyHandler = {
get(_: any, method: string) {
const sendEvent = (...args: any[]) => {
post(serialize([RPCMode.event, args, null, method]))
}
if (options.onlyEvents || eventNames.includes(method)) {
const sendEvent = async (...args: any[]) => await post(await serialize([RPCMode.event, method, ...args]))

if (onlyEvents || eventNames.includes(method)) {
sendEvent.asEvent = sendEvent
return sendEvent
}
const sendCall = (...args: any[]) => {
return new Promise((resolve, reject) => {
const id = rpcCounter++
rpcPromiseMap.set(id, { resolve, reject })
post(serialize([RPCMode.request, args, id, method]))
})

const sendCall = async (...args: any[]) => {
const [promise, resolve, reject] = createPromise()
const id = rpcCounter++

let timeoutId
if (timeout >= 0) {
timeoutId = setTimeout(() => {
try {
// Custom onTimeoutError handler can throw its own error too
onTimeoutError?.(method, args)
throw new Error(`rpc timeout on calling "${method}"`)
}
catch (e) {
reject(e)
}
rpcPromiseMap.delete(id)
}, timeout).unref?.()
}

rpcPromiseMap.set(id, { resolve, reject, timeoutId })
const data = await serialize([RPCMode.request, id, method, ...args])
await post(data)
return promise
}
sendCall.asEvent = sendEvent
return sendCall
Expand All @@ -139,6 +170,7 @@ export function useRPC<LocalFunctions, RemoteFunctions = LocalFunctions>(
options: RPCOptions<RemoteFunctions>,
): RPCReturn<RemoteFunctions> {
const { eventNames = [] } = options

const { proxyHandler } = setupRPCBasic(options, functions, eventNames as any)

return new Proxy({}, proxyHandler)
Expand All @@ -165,7 +197,7 @@ export function useRPCHub(options: RPCOptionsBasic) {
}
}

export type UseRPCHubType = ReturnType<typeof useRPCHub>
export type UseRPCAsyncHubType = ReturnType<typeof useRPCHub>

// Syntax test case
// const hub: UseRPCHubType = {} as any
Expand Down
2 changes: 0 additions & 2 deletions src/common/pipes/index.ts

This file was deleted.

Loading

0 comments on commit e5a1c37

Please sign in to comment.