Skip to content

Commit

Permalink
Support push for mailboxes.
Browse files Browse the repository at this point in the history
Related to linagora#33
  • Loading branch information
alagane committed May 5, 2022
1 parent 1441491 commit 9bb3c46
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 18 deletions.
10 changes: 8 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@
"url": "https://github.com/OpenPaaS-Suite/jmap-client-ts/issues"
},
"homepage": "https://github.com/OpenPaaS-Suite/jmap-client-ts#README.md",
"dependencies": {
"isomorphic-ws": "4.0.1"
},
"devDependencies": {
"@types/jest": "^26.0.20",
"@types/node-fetch": "^2.5.8",
"@types/ws": "^8.5.0",
"@typescript-eslint/eslint-plugin": "^4.13.0",
"@typescript-eslint/parser": "^4.13.0",
"axios": "^0.21.1",
Expand All @@ -48,9 +52,11 @@
"lint-staged": "^10.5.3",
"node-fetch": "^2.6.1",
"prettier": "2.1.2",
"testcontainers": "^6.4.1",
"rxjs": "^7.5.5",
"testcontainers": "^8.4.0",
"ts-jest": "^26.4.4",
"ts-node": "9.0.0",
"typescript": "4.0.3"
"typescript": "^4.6.2",
"ws": "^8.5.0"
}
}
50 changes: 48 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import {
IEmailChangesResponse,
IInvocation,
} from './types';

import { PushClient } from './push';
import { Observable } from 'rxjs';
import * as WebSocket from 'isomorphic-ws';
export class Client {
private readonly DEFAULT_USING = ['urn:ietf:params:jmap:core', 'urn:ietf:params:jmap:mail'];

Expand All @@ -35,31 +37,45 @@ export class Client {

private sessionUrl: string;
private overriddenApiUrl?: string;
private overriddenPushUrl?: string;
private session?: ISession;

private pushClient?: PushClient;

constructor({
sessionUrl,
accessToken,
overriddenApiUrl,
overriddenPushUrl,
transport,
httpHeaders,
}: {
sessionUrl: string;
accessToken: string;
overriddenApiUrl?: string;
overriddenPushUrl?: string;
transport: Transport;
httpHeaders?: { [headerName: string]: string };
}) {
this.sessionUrl = sessionUrl;
if (overriddenApiUrl) {
this.overriddenApiUrl = overriddenApiUrl;
}
if (overriddenPushUrl) {
this.overriddenPushUrl = overriddenPushUrl;
}
this.transport = transport;
this.httpHeaders = {
Accept: 'application/json;jmapVersion=rfc-8621',
Authorization: `Bearer ${accessToken}`,
...(httpHeaders ? httpHeaders : {}),
};
this.pushClient = new PushClient({
client: this,
transport,
webSocketConstructor: (url: string) => new WebSocket(url),
httpHeaders: this.httpHeaders,
});
}

public fetchSession(): Promise<void> {
Expand Down Expand Up @@ -140,7 +156,7 @@ export class Client {
}

private request<ResponseType>(methodName: IMethodName, args: IArguments) {
const apiUrl = this.overriddenApiUrl || this.getSession().apiUrl;
const apiUrl = this.getApiUrl();
return this.transport
.post<{
sessionState: string;
Expand All @@ -164,6 +180,32 @@ export class Client {
});
}

public getApiUrl(): string {
return this.overriddenApiUrl || this.getSession().apiUrl;
}

public getPushUrl(): string {
return (
this.overriddenPushUrl || this.getSession().capabilities['urn:ietf:params:jmap:websocket'].url
);
}

public pushStart(): Promise<void> {
if (this.pushClient) {
return this.pushClient.start();
}

this.throwWebsocketUndefined();
}

public pushMailbox(): Observable<{ [accountId: string]: string }> {
if (this.pushClient) {
return this.pushClient.mailbox();
}

this.throwWebsocketUndefined();
}

private replaceAccountId<U extends IReplaceableAccountId>(input: U): U {
return input.accountId !== null
? input
Expand All @@ -176,4 +218,8 @@ export class Client {
private getCapabilities() {
return this.session?.capabilities ? Object.keys(this.session.capabilities) : this.DEFAULT_USING;
}

private throwWebsocketUndefined(): never {
throw new Error('Websocket API is not defined');
}
}
142 changes: 142 additions & 0 deletions src/push.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { defer, Observable, Subject } from 'rxjs';
import { Client } from '.';
import { ENTITY_TYPES, IEntityType, IStateChange, ITypeState, IWebSocketPushEnable } from './types';
import { Transport } from './utils/transport';
import * as ws from 'isomorphic-ws';

export class PushClient {
private client: Client;
private transport: Transport;
private httpHeaders: { [headerName: string]: string };
private webSocket?: ws;
private webSocketSubjects: { [_ in IEntityType]: Subject<{ [accountId: string]: string }> };

private started?: Promise<void>;
private enabledDataTypes: { [_ in IEntityType]?: boolean };

constructor({
client,
transport,
httpHeaders,
}: {
client: Client;
transport: Transport;
httpHeaders: { [headerName: string]: string };
}) {
this.client = client;
this.transport = transport;
this.httpHeaders = httpHeaders;
this.webSocketSubjects = {
Mailbox: new Subject(),
Email: new Subject(),
EmailSubmission: new Subject(),
};
this.enabledDataTypes = {};
}

public start(): Promise<void> {
if (!this.started) {
this.started = new Promise(resolve => {
this.transport
.post<{
value: string;
}>(`${this.client.getApiUrl()}/ws/ticket`, '', this.httpHeaders)
.then(response => {
const ticket = response.value;
const webSocket = new ws(`${this.client.getPushUrl()}?ticket=${ticket}`);
webSocket.onopen = () => {
this.webSocket = webSocket;
this.sendSubscriptions(webSocket, this.enabledDataTypes);
resolve();
};
webSocket.onclose = () => {
for (const subject of Object.values(this.webSocketSubjects)) {
subject.complete();
}

delete this.webSocket;
};
webSocket.onmessage = message => {
const data = JSON.parse(message.data as string);
if (data['@type'] == 'StateChange') {
for (const entityType of ENTITY_TYPES) {
if (this.stateChangeContainsType(data, entityType)) {
this.webSocketSubjects[entityType].next(
this.transformStateChange(data.changed, entityType),
);
}
}
}
};
webSocket.onerror = event => {
console.log(`Error with websocket: ${JSON.stringify(event)}`);
for (const subject of Object.values(this.webSocketSubjects)) {
subject.error(event);
}

delete this.webSocket;
};
});
});
}

return this.started;
}

public stop(): void {
this.webSocket?.close();
}

public mailbox(): Observable<{ [accountId: string]: string }> {
return defer(() => {
if (!this.enabledDataTypes.Mailbox) {
this.enabledDataTypes.Mailbox = true;

if (this.webSocket) {
this.sendSubscriptions(this.webSocket, this.enabledDataTypes);
}
}

return this.webSocketSubjects.Mailbox.asObservable();
});
}

private sendSubscriptions(
webSocket: ws,
enabledDataTypes: {
[_ in IEntityType]?: boolean;
},
) {
const payload: IWebSocketPushEnable = {
'@type': 'WebSocketPushEnable',
dataTypes: Object.keys(enabledDataTypes),
};
webSocket.send(JSON.stringify(payload));
}

private stateChangeContainsType(stateChange: IStateChange, type: IEntityType): boolean {
for (const accountId of Object.keys(stateChange.changed)) {
if (stateChange.changed[accountId][type]) {
return true;
}
}

return false;
}

private transformStateChange(
changed: { [accountId: string]: ITypeState },
type: IEntityType,
): { [accountId: string]: string } {
const changedFlattened: { [accountId: string]: string } = {};

for (const accountId of Object.keys(changed)) {
const entityState = changed[accountId][type];
if (entityState) {
changedFlattened[accountId] = entityState;
}
}

return changedFlattened;
}
}
56 changes: 48 additions & 8 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
export const ENTITY_TYPES = ['Mailbox', 'Email', 'EmailSubmission'] as const;
export type IEntityType = typeof ENTITY_TYPES[number];

export type IMethodName =
| 'Mailbox/get'
| 'Mailbox/changes'
Expand Down Expand Up @@ -137,6 +140,37 @@ export type IEmailQueryArguments = IQueryArguments<IEmailFilterCondition>;

export type IEmailQueryResponse = IQueryResponse;

/**
* See https://jmap.io/spec-core.html#the-statechange-object
*/
export interface IStateChange {
'@type': 'StateChange';
changed: {
[accountId: string]: ITypeState;
};
}

/**
* See https://tools.ietf.org/html/rfc8887#section-4.3.5.2
*/
export interface IWebSocketPushEnable {
'@type': 'WebSocketPushEnable';
dataTypes: string[] | null;
pushState?: string;
}

/**
* See https://tools.ietf.org/html/rfc8887#section-4.3.5.3
*/
export interface IWebSocketPushDisable {
'@type': 'WebSocketPushDisable';
}

/**
* See https://jmap.io/spec-core.html#the-statechange-object
*/
export type ITypeState = Partial<{ [_ in IEntityType]: string }>;

/**
* See https://jmap.io/spec-core.html#query
*/
Expand Down Expand Up @@ -167,14 +201,20 @@ export interface IRequest {
* See https://jmap.io/spec-core.html#the-jmap-session-resource
*/
export interface ICapabilities {
maxSizeUpload: number;
maxConcurrentUpload: number;
maxSizeRequest: number;
maxConcurrentRequests: number;
maxCallsInRequest: number;
maxObjectsInGet: number;
maxObjectsInSet: number;
collationAlgorithms: string[];
'urn:ietf:params:jmap:core': {
maxSizeUpload: number;
maxConcurrentUpload: number;
maxSizeRequest: number;
maxConcurrentRequests: number;
maxCallsInRequest: number;
maxObjectsInGet: number;
maxObjectsInSet: number;
collationAlgorithms: string[];
};
'urn:ietf:params:jmap:websocket': {
url: string;
supportsPush: boolean;
};
}

/**
Expand Down
Loading

0 comments on commit 9bb3c46

Please sign in to comment.