Skip to content

Commit

Permalink
Merge pull request #181 from m-ld/reactive-observable-query
Browse files Browse the repository at this point in the history
Reactive observable query
  • Loading branch information
gsvarovsky authored Dec 10, 2023
2 parents 6762585 + 5335673 commit d708319
Show file tree
Hide file tree
Showing 8 changed files with 670 additions and 397 deletions.
2 changes: 1 addition & 1 deletion doc/includes/live-code-setup.script.html
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
content: content
.replaceAll('https://js.m-ld.org', window.location.origin)
.replaceAll('https://m-ld.org', window.location.origin.includes('localhost') ?
'https://edge.m-ld.org' : window.location.origin.replaceAll('.js', ''))
'https://edge.m-ld.org' : window.location.origin.replaceAll('js.', ''))
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion doc/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ They include the core (`index.mjs`), bundled [remotes](#remotes) (`mqtt.mjs`, `a

Some example starter projects available:

- The [TodoMVC App](https://github.com/m-ld/m-ld-web-starter) shows one way to build a multi-collaborator application for browsers.
- The [TodoMVC App](https://github.com/m-ld/m-ld-todomvc-vanillajs) shows one way to build a multi-collaborator application for browsers.
- The [Node.js Starter Project](https://github.com/m-ld/m-ld-nodejs-starter) uses Node processes to initialise two clones, and an MQTT broker for messaging.

### Data Persistence
Expand Down
822 changes: 434 additions & 388 deletions package-lock.json

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@m-ld/m-ld",
"version": "0.10.0-edge.12",
"version": "0.10.0",
"description": "m-ld native Javascript",
"main": "ext/index.js",
"types": "ext/index.d.ts",
Expand All @@ -18,7 +18,8 @@
"./ext/security": "./ext/security/index.js",
"./ext/shacl": "./ext/shacl/index.js",
"./ext/statutes": "./ext/statutes/index.js",
"./ext/tseq": "./ext/tseq/index.js"
"./ext/tseq": "./ext/tseq/index.js",
"./ext/rx": "./ext/rx/index.js"
},
"files": [
"ext"
Expand Down Expand Up @@ -117,6 +118,7 @@
"cuid": "^2.1.8",
"fast-array-diff": "^1.1.0",
"fflate": "^0.8.0",
"immer": "^10.0.3",
"json-rql": "^0.6.2",
"loglevel": "^1.7.1",
"lru-cache": "^6.0.0",
Expand Down
9 changes: 5 additions & 4 deletions src/engine/api-support.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { defer, firstValueFrom, Observable } from 'rxjs';
import { defer, firstValueFrom, merge, Observable } from 'rxjs';
import { map, toArray } from 'rxjs/operators';
import { inflate } from './util';
import { Consumable, each, flow } from 'rx-flowable';
import { SubjectGraph } from './SubjectGraph';
import { GraphSubject, GraphSubjects, ReadResult } from '../api';
Expand All @@ -20,8 +19,10 @@ export function liveRollup<R extends { [key: string]: unknown }>(
partial[k] = k === key ? value : liveValues[k].value);
return partial as R;
}
const values = defer(() => inflate(Object.keys(liveValues), (key: keyof R) =>
liveValues[key].pipe(map(value => get(key, value)))));
const values = defer(() => merge(
...Object.keys(liveValues)
.map(key => liveValues[key].pipe(map(value => get(key, value)))))
);
return Object.defineProperties(values, { value: { get } }) as LiveValue<R>;
}

Expand Down
4 changes: 3 additions & 1 deletion src/engine/dataset/DatasetEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,9 @@ export class DatasetEngine extends AbstractMeld implements CloneEngine, MeldLoca
const values = stateRollup.pipe(
skipWhile(() => this.messageService == null),
map(toStatus),
distinctUntilChanged<MeldStatus>(matchStatus));
distinctUntilChanged<MeldStatus>(matchStatus),
takeUntilComplete(this.live)
);
const becomes = async (match?: Partial<MeldStatus>) => firstValueFrom(
values.pipe(filter(status => matchStatus(status, match)),
defaultIfEmpty(undefined)));
Expand Down
82 changes: 82 additions & 0 deletions src/rx/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { asapScheduler, Observable, observeOn } from 'rxjs';
import type { GraphSubject, Iri, MeldClone, MeldReadState, MeldUpdate } from '..';
import { updateSubject } from '..';
import { enablePatches, produceWithPatches } from 'immer';
import { takeUntilComplete } from '../engine/util';

enablePatches();

type Eventually<T> = T | undefined | Promise<T | undefined>;

/**
* 'Watches' the given read/follow procedures on the given m-ld clone, by
* creating an observable that emit when the initial read or update emit a
* non-null value.
* @param meld the clone to attach the observable to
* @param readValue reads an initial value; or null if not available
* @param updateValue provides an updated value, or null if not available. If
* this param is omitted, `readValue` is called for every update.
* @category Reactive
*/
export function watchQuery<T>(
meld: MeldClone,
readValue: (state: MeldReadState) => Eventually<T>,
updateValue: (update: MeldUpdate, state: MeldReadState) => Eventually<T> =
(_, state) => readValue(state)
): Observable<T> {
return new Observable<T>(subs => {
subs.add(meld.read(
async state => {
try {
const value = await readValue(state);
!subs.closed && value != null && subs.next(value);
} catch (e) {
subs.error(e);
}
},
async (update, state) => {
try {
const value = await updateValue(update, state);
!subs.closed && value != null && subs.next(value);
} catch (e) {
subs.error(e);
}
}
));
}).pipe(
takeUntilComplete(meld.status),
// TODO: workaround: live lock throws due to overlapping states
observeOn(asapScheduler)
);
}

/**
* Shorthand for following the state of a specific subject in the m-ld clone.
* Will emit an initial state as soon as the subject exists, and every time the
* subject changes.
* @param meld the clone to attach the observable to
* @param id the subject identity IRI
* @category Reactive
*/
export function watchSubject(
meld: MeldClone,
id: Iri
): Observable<GraphSubject> {
let subject: GraphSubject | undefined, patches = [];
return watchQuery(
meld,
async state => {
return subject = await state.get(id);
},
async (update, state) => {
if (subject != null) {
[subject, patches] = produceWithPatches(subject,
// @ts-ignore: TS cannot cope with mutable GraphSubject
mutable => updateSubject(mutable, update));
return patches.length ? subject : undefined;
} else {
return subject = await state.get(id);
}
}
);
}
140 changes: 140 additions & 0 deletions test/rx.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import { clone, MeldClone } from '../src/index';
import { MemoryLevel } from 'memory-level';
import { MockRemotes, testConfig } from './testClones';
import { watchQuery, watchSubject } from '../src/rx/index';
import { take, toArray } from 'rxjs/operators';
import { firstValueFrom } from 'rxjs';
import { setTimeout } from 'timers/promises';

describe('Rx utilities', () => {
let api: MeldClone;

beforeEach(async () => {
api = await clone(new MemoryLevel, MockRemotes, testConfig());
});

afterEach(() => api.close().catch(() => {/*Some tests do close*/}));

test('watch query emits if found sync', async () => {
const observable = watchQuery(api, () => 1, () => 2);
const found = firstValueFrom(observable.pipe(take(2), toArray()));
await api.write({ '@id': 'fred', name: 'Fred' });
await expect(found).resolves.toEqual([1, 2]);
});

test('watch query does not emit if not found on read sync', async () => {
const observable = watchQuery(api, () => undefined, () => 2);
const found = firstValueFrom(observable.pipe(take(1), toArray()));
await api.write({ '@id': 'fred', name: 'Fred' });
await expect(found).resolves.toEqual([2]);
});

test('watch query does not emit if not found on update sync', async () => {
const observable = watchQuery(api, () => 1, () => undefined);
const found = firstValueFrom(observable.pipe(take(1), toArray()));
await api.write({ '@id': 'fred', name: 'Fred' });
await expect(found).resolves.toEqual([1]);
});

test('watch query emits if found async on read', async () => {
const observable = watchQuery(api, () => setTimeout(1, 1), () => 2);
const found = firstValueFrom(observable.pipe(take(2), toArray()));
await api.write({ '@id': 'fred', name: 'Fred' });
await expect(found).resolves.toEqual([1, 2]);
});

test('watch query emits if found async on update', async () => {
const observable = watchQuery(api,
() => 1, () => setTimeout(1, 2));
const found = firstValueFrom(observable.pipe(take(2), toArray()));
await api.write({ '@id': 'fred', name: 'Fred' });
await expect(found).resolves.toEqual([1, 2]);
});

test('watch query completes if clone closed', async () => {
const observable = watchQuery(api, () => 1, () => 2);
const found = firstValueFrom(observable.pipe(toArray()));
await api.write({ '@id': 'fred', name: 'Fred' });
await api.close();
await expect(found).resolves.toEqual([1, 2]);
});

test('watch can unsubscribe between read and update', async () => {
const observable = watchQuery(api, () => 1, () => 2);
const found: number[] = [];
const subs = observable.subscribe(value => {
found.push(value);
subs.unsubscribe();
});
await api.write({ '@id': 'fred', name: 'Fred' });
expect(found).toEqual([1]);
});

test('watch reads immediately', async () => {
const observable = watchQuery(api, state => state.get('fred'));
await api.write({ '@id': 'fred', name: 'Fred' });
const found = firstValueFrom(observable.pipe(take(1), toArray()));
await expect(found).resolves.toEqual([{ '@id': 'fred', name: 'Fred' }]);
});

test('watch reads on update', async () => {
const observable = watchQuery(api, state => state.get('fred'));
const found = firstValueFrom(observable.pipe(take(1), toArray()));
await api.write({ '@id': 'fred', name: 'Fred' });
await expect(found).resolves.toEqual([{ '@id': 'fred', name: 'Fred' }]);
});

test('watch subject initial state', async () => {
const observable = watchSubject(api, 'fred');
await api.write({ '@id': 'fred', name: 'Fred' });
const found = firstValueFrom(observable.pipe(take(1), toArray()));
await expect(found).resolves.toEqual([{ '@id': 'fred', name: 'Fred' }]);
});

test('watch subject update', async () => {
const observable = watchSubject(api, 'fred');
const found = firstValueFrom(observable.pipe(take(1), toArray()));
await api.write({ '@id': 'fred', name: 'Fred' });
await expect(found).resolves.toEqual([{ '@id': 'fred', name: 'Fred' }]);
});

test('watch subject update', async () => {
const observable = watchSubject(api, 'fred');
await api.write({ '@id': 'fred', name: 'Fred' });
const found = firstValueFrom(observable.pipe(take(2), toArray()));
await api.write({ '@update': { '@id': 'fred', name: 'Flintstone' } });
await expect(found).resolves.toEqual([
{ '@id': 'fred', name: 'Fred' },
{ '@id': 'fred', name: 'Flintstone' }
]);
});

test('watched subject did not change', async () => {
const observable = watchSubject(api, 'fred');
await api.write({ '@id': 'fred', name: 'Fred' });
const found = firstValueFrom(observable.pipe(take(2), toArray()));
// A non-overlapping update should be ignored
await api.write({ '@id': 'wilma', name: 'Wilma' } );
await api.write({ '@update': { '@id': 'fred', name: 'Flintstone' } });
await expect(found).resolves.toEqual([
{ '@id': 'fred', name: 'Fred' },
{ '@id': 'fred', name: 'Flintstone' }
]);
});

test('watch subject update with other data', async () => {
const observable = watchSubject(api, 'fred');
await api.write({ '@id': 'fred', name: 'Fred' });
const found = firstValueFrom(observable.pipe(take(2), toArray()));
// A non-overlapping update should be ignored
await api.write({
'@id': 'wilma',
name: 'Wilma',
spouse: { '@id': 'fred', name: 'Flintstone' }
});
await expect(found).resolves.toEqual([
{ '@id': 'fred', name: 'Fred' },
{ '@id': 'fred', name: ['Fred', 'Flintstone'] }
]);
});
});

0 comments on commit d708319

Please sign in to comment.