Skip to content

Commit

Permalink
First sketch at outputting Parquet formatted data
Browse files Browse the repository at this point in the history
  • Loading branch information
iandees committed Jun 24, 2024
1 parent 12075a0 commit 2f8ec52
Showing 1 changed file with 101 additions and 3 deletions.
104 changes: 101 additions & 3 deletions task/collect.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { mkdirp } from 'mkdirp';
import S3 from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import archiver from 'archiver';
import parquetjs from '@dsnp/parquetjs';
import minimist from 'minimist';
import { Transform } from 'stream';

Expand Down Expand Up @@ -108,9 +109,15 @@ async function collect(tmp, collection, oa) {
const zip = await zip_datas(tmp, collection_data, collection.name);

console.error(`ok - zip created: ${zip}`);
await upload_collection(zip, collection.name);
await upload_zip_collection(zip, collection.name);
console.error('ok - archive uploaded');

const pq = await parquet_datas(tmp, collection_data, collection.name);

console.error(`ok - parquet created: ${pq}`);
await upload_parquet_collection(pq, collection.name);
console.error('ok - parquet uploaded');

await oa.cmd('collection', 'update', {
':collection': collection.id,
size: fs.statSync(zip).size
Expand Down Expand Up @@ -193,7 +200,7 @@ async function get_source(oa, tmp, data, stats) {
return path.resolve(tmp, 'sources', dir, source);
}

async function upload_collection(file, name) {
async function upload_zip_collection(file, name) {
const s3uploader = new Upload({
client: s3,
params: {
Expand All @@ -217,7 +224,6 @@ async function upload_collection(file, name) {
endpoint: `https://${process.env.CLOUDFLARE_ACCOUNT_ID}.r2.cloudflarestorage.com`
});


const r2uploader = new Upload({
client: r2,
params: {
Expand All @@ -231,7 +237,45 @@ async function upload_collection(file, name) {
await r2uploader.done();

console.error(`ok - uploaded: r2://${process.env.R2Bucket}/v2.openaddresses.io/${process.env.StackName}/collection-${name}.zip`);
}

async function upload_parquet_collection(file, name) {
const s3uploader = new Upload({
client: s3,
params: {
ContentType: 'application/vnd.apache.parquet',
Body: fs.createReadStream(file),
Bucket: process.env.Bucket,
Key: `${process.env.StackName}/collection-${name}.parquet`
}
});

await s3uploader.done();

console.error(`ok - s3://${process.env.Bucket}/${process.env.StackName}/collection-${name}.parquet`);

const r2 = new S3.S3Client({
region: 'auto',
credentials: {
accessKeyId: process.env.R2_ACCESS_KEY_ID,
secretAccessKey: process.env.R2_SECRET_ACCESS_KEY
},
endpoint: `https://${process.env.CLOUDFLARE_ACCOUNT_ID}.r2.cloudflarestorage.com`
});

const r2uploader = new Upload({
client: r2,
params: {
ContentType: 'application/vnd.apache.parquet',
Body: fs.createReadStream(file),
Bucket: process.env.R2Bucket,
Key: `v2.openaddresses.io/${process.env.StackName}/collection-${name}.parquet`
}
});

await r2uploader.done();

console.error(`ok - uploaded: r2://${process.env.R2Bucket}/v2.openaddresses.io/${process.env.StackName}/collection-${name}.parquet`);
}

function zip_datas(tmp, datas, name) {
Expand Down Expand Up @@ -273,3 +317,57 @@ function zip_datas(tmp, datas, name) {
archive.finalize();
});
}

function parquet_datas(tmp, datas, name) {
return new Promise((resolve, reject) => {
const schema = {
source_name: { type: 'UTF8' },
geometry: { type: 'blob' },
id: { type: 'UTF8' },
pid: { type: 'UTF8' },
number: { type: 'UTF8' },
street: { type: 'UTF8' },
unit: { type: 'UTF8' },
city: { type: 'UTF8' },
postcode: { type: 'UTF8' },
district: { type: 'UTF8' },
region: { type: 'UTF8' },
addrtype: { type: 'UTF8' },
notes: { type: 'UTF8' },
};
const writer = parquet.ParquetWriter.openFile(schema, path.resolve(tmp, `${name}.parquet`));

for (const data of datas) {
const resolved_data_filename = path.resolve(tmp, 'sources', data);

// Read the file and parse it as linefeed-delimited JSON
const data_stream = fs.createReadStream(resolved_data_filename);
const data_lines = data_stream.pipe(split());
data_lines.on('data', (line) => {
const record = JSON.parse(line);
const properties = record.properties;
writer.appendRow({
source_name: data,
geometry: record.geometry, // TODO: Convert to WKB
id: properties.id,
pid: properties.pid,
number: properties.number,
street: properties.street,
unit: properties.unit,
city: properties.city,
postcode: properties.postcode,
district: properties.district,
region: properties.region,
addrtype: properties.addrtype,
notes: properties.notes,
});
});
data_lines.on('end', () => {
console.error(`ok - ${resolved_data_filename} processed and appended to parquet file`);
});
}

writer.close();
return resolve(path.resolve(tmp, `${name}.parquet`));
});
}

0 comments on commit 2f8ec52

Please sign in to comment.