Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colossus pruning service #4988

Merged
merged 11 commits into from
Dec 1, 2023
Merged
4 changes: 4 additions & 0 deletions storage-node/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### 3.9.0

- Add background pruning worker to delete data objects which the node is no longer obligated to store. New optional argument `--cleanup` and `--cleanupInterval`

### 3.8.1

- Hotfix: Fix call stack size exceeded when handling large number of initial object to sync.
Expand Down
74 changes: 74 additions & 0 deletions storage-node/client/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,61 @@ export interface StatusResponse {
* @memberof StatusResponse
*/
'queryNodeStatus': StatusResponseQueryNodeStatus;
/**
*
* @type {Array<string>}
* @memberof StatusResponse
*/
'uploadBuckets': Array<string>;
/**
*
* @type {Array<string>}
* @memberof StatusResponse
*/
'downloadBuckets': Array<string>;
/**
*
* @type {StatusResponseSync}
* @memberof StatusResponse
*/
'sync': StatusResponseSync;
/**
*
* @type {StatusResponseCleanup}
* @memberof StatusResponse
*/
'cleanup': StatusResponseCleanup;
}
/**
*
* @export
* @interface StatusResponseCleanup
*/
export interface StatusResponseCleanup {
/**
*
* @type {boolean}
* @memberof StatusResponseCleanup
*/
'enabled': boolean;
/**
*
* @type {number}
* @memberof StatusResponseCleanup
*/
'interval': number;
/**
*
* @type {number}
* @memberof StatusResponseCleanup
*/
'maxQnLaggingThresholdInBlocks'?: number;
/**
*
* @type {number}
* @memberof StatusResponseCleanup
*/
'minReplicationThresholdForPruning'?: number;
}
/**
*
Expand All @@ -134,6 +189,25 @@ export interface StatusResponseQueryNodeStatus {
*/
'blocksProcessed': number;
}
/**
*
* @export
* @interface StatusResponseSync
*/
export interface StatusResponseSync {
/**
*
* @type {boolean}
* @memberof StatusResponseSync
*/
'enabled': boolean;
/**
*
* @type {number}
* @memberof StatusResponseSync
*/
'interval': number;
}
/**
*
* @export
Expand Down
2 changes: 1 addition & 1 deletion storage-node/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "storage-node",
"description": "Joystream storage subsystem.",
"version": "3.8.1",
"version": "3.9.0-beta-2",
"author": "Joystream contributors",
"bin": {
"storage-node": "./bin/run"
Expand Down
37 changes: 37 additions & 0 deletions storage-node/src/api-spec/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,10 @@ components:
required:
- version
- queryNodeStatus
- uploadBuckets
- downloadBuckets
- sync
- cleanup
properties:
version:
type: string
Expand All @@ -315,6 +319,39 @@ components:
blocksProcessed:
type: integer
minimum: 0
uploadBuckets:
type: array
items:
type: string
downloadBuckets:
type: array
items:
type: string
sync:
type: object
required:
- 'enabled'
- 'interval'
properties:
enabled:
type: boolean
interval:
type: integer
cleanup:
type: object
required:
- 'enabled'
- 'interval'
properties:
enabled:
type: boolean
interval:
type: integer
maxQnLaggingThresholdInBlocks:
type: integer
minReplicationThresholdForPruning:
type: integer

DataObjectResponse:
type: array
items:
Expand Down
63 changes: 63 additions & 0 deletions storage-node/src/commands/dev/cleanup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { Command, flags } from '@oclif/command'
import stringify from 'fast-safe-stringify'
import logger from '../../services/logger'
import { QueryNodeApi } from '../../services/queryNode/api'
import { performCleanup } from '../../services/sync/cleanupService'

/**
* CLI command:
* Prunes outdated data objects: removes all the local stored data objects that the operator is no longer obliged to store.
* storage.
*
* @remarks
* Should be run only during the development.
* Shell command: "dev:cleanup"
*/
export default class DevCleanup extends Command {
static description = `Runs the data objects cleanup/pruning workflow. It removes all the local stored data objects that the operator is no longer obliged to store`

static flags = {
help: flags.help({ char: 'h' }),
workerId: flags.integer({
char: 'w',
required: true,
description: 'Storage node operator worker ID.',
}),
bucketId: flags.integer({
char: 'b',
required: true,
description: 'The buckerId to sync prune/cleanup',
}),
cleanupWorkersNumber: flags.integer({
char: 'p',
required: false,
description: 'Cleanup/Pruning workers number (max async operations in progress).',
default: 20,
}),
queryNodeEndpoint: flags.string({
char: 'q',
required: false,
default: 'http://localhost:8081/graphql',
description: 'Query node endpoint (e.g.: http://some.com:8081/graphql)',
}),
uploads: flags.string({
char: 'd',
required: true,
description: 'Data uploading directory (absolute path).',
}),
}

async run(): Promise<void> {
const { flags } = this.parse(DevCleanup)
const bucketId = flags.bucketId.toString()
const qnApi = new QueryNodeApi(flags.queryNodeEndpoint)
logger.info('Cleanup...')

try {
await performCleanup(flags.workerId, [bucketId], flags.cleanupWorkersNumber, qnApi, flags.uploads)
} catch (err) {
logger.error(err)
logger.error(stringify(err))
}
}
}
8 changes: 5 additions & 3 deletions storage-node/src/commands/dev/sync.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Command, flags } from '@oclif/command'
import { performSync } from '../../services/sync/synchronizer'
import logger from '../../services/logger'
import stringify from 'fast-safe-stringify'
import logger from '../../services/logger'
import { QueryNodeApi } from '../../services/queryNode/api'
import { performSync } from '../../services/sync/synchronizer'

/**
* CLI command:
Expand Down Expand Up @@ -62,6 +63,7 @@ export default class DevSync extends Command {
async run(): Promise<void> {
const { flags } = this.parse(DevSync)
const bucketId = flags.bucketId.toString()
const qnApi = new QueryNodeApi(flags.queryNodeEndpoint)
logger.info('Syncing...')

try {
Expand All @@ -71,7 +73,7 @@ export default class DevSync extends Command {
[bucketId],
flags.syncWorkersNumber,
flags.syncWorkersTimeout,
flags.queryNodeEndpoint,
qnApi,
flags.uploads,
flags.dataSourceOperatorUrl
)
Expand Down
Loading
Loading