Skip to content

Commit

Permalink
Importador revamp
Browse files Browse the repository at this point in the history
- use SQLite (menos memória)
- usa uma coleção de index
- não importa se já existe
  • Loading branch information
Phenome committed Nov 29, 2023
1 parent bce6010 commit 876dddc
Showing 1 changed file with 135 additions and 26 deletions.
161 changes: 135 additions & 26 deletions src/ocorrencia.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,147 @@
import { MongoClient } from 'https://deno.land/x/[email protected]/mod.ts'

import { processaZip } from './lib/dwca.ts'
import { getEml, processaZip, type Eml } from './lib/dwca.ts'
import { calculateObjectSize } from 'npm:bson'

const refUrls = await Deno.readTextFile('./referencias/herbarios.txt').then(
(contents) => contents.split('\n')
type InsertManyParams = Parameters<typeof ocorrenciasCol.insertMany>
async function safeInsertMany(
collection: typeof ocorrenciasCol,
docs: InsertManyParams[0],
options?: InsertManyParams[1]
): ReturnType<typeof iptsCol.insertMany> {
let chunkSize = docs.length
while (true) {
try {
const chunks: (typeof docs)[] = []
for (let i = 0; i < docs.length; i += chunkSize) {
chunks.push(docs.slice(i, i + chunkSize))
}
const returns: Awaited<ReturnType<typeof ocorrenciasCol.insertMany>>[] =
[]
for (const chunk of chunks) {
if (calculateObjectSize(chunk) > 16 * 1024 * 1024) {
throw new Error('Chunk size exceeds the BSON document size limit')
}
returns.push(await collection.insertMany(chunk, options))
}
return returns.reduce((acc, cur) => ({
insertedCount: acc.insertedCount + cur.insertedCount,
insertedIds: [...acc.insertedIds, ...cur.insertedIds]
}))
} catch (_e) {
chunkSize = Math.floor(chunkSize / 2)
console.log(
`Can't insert chunk of ${docs.length} documents, trying with ${chunkSize}`
)
continue
}
}
}

type Ipt = {
id: string
version: string
} & Eml['dataset']
const processaEml = (emlJson: Eml): Ipt => {
const [id, version] =
emlJson['@packageId'].match(/(.+)\/(.+)/)?.slice(1) ?? []
return { id, version, ...emlJson.dataset }
}

const iptSources = await Deno.readTextFile('./referencias/sources.json').then(
(contents) => JSON.parse(contents)
)

const client = new MongoClient()
await client.connect(Deno.env.get('MONGO_URI') as string)
const collection = client.database('dwc2json').collection('ocorrencias')
console.debug('Cleaning collection')
console.log(await collection.deleteMany({}))
const iptsCol = client.database('dwc2json').collection('ipts')
const ocorrenciasCol = client.database('dwc2json').collection('ocorrencias')

const CHUNK_SIZE = 5000
for (const url of refUrls) {
if (!url) continue
console.debug(`Processing ${url}`)
const json = await processaZip(url)
const ocorrencias = Object.values(json)
console.debug(`Inserting entries (${ocorrencias.length})`)
for (let i = 0, n = ocorrencias.length; i < n; i += CHUNK_SIZE) {
console.log(`Inserting ${i} to ${Math.min(i + CHUNK_SIZE, n)}`)
await collection.insertMany(ocorrencias.slice(i, i + CHUNK_SIZE), {
ordered: false
})
type DbIpt = {
_id: Ipt['id']
tag: string
collection: string
} & Omit<Ipt, 'id'>
for (const { ipt: iptName, baseUrl, datasets } of iptSources) {
for (const set of datasets) {
if (!set) continue
console.debug(`Processing ${set}`)
const eml = await getEml(`${baseUrl}eml.do?r=${set}`)
const ipt = processaEml(eml)
const dbVersion = (
(await iptsCol.findOne({ _id: ipt.id })) as DbIpt | undefined
)?.version
if (dbVersion === ipt.version) {
console.debug(`${set} already on version ${ipt.version}`)
continue
}
console.log(`Version mismatch: DB[${dbVersion}] vs REMOTE[${ipt.version}]`)
console.debug(`Downloading ${set} [${baseUrl}archive.do?r=${set}]`)
const ocorrencias = await processaZip(
`${baseUrl}archive.do?r=${set}`,
true,
5000
)
console.debug(`Cleaning ${set}`)
console.log(
`Deleted ${await ocorrenciasCol.deleteMany({ iptId: ipt.id })} entries`
)
let totalProcessed = 0
for (const batch of ocorrencias) {
if (!batch || !batch.length) break
console.debug(`Compiled ${(totalProcessed += batch.length)} entries`)
await safeInsertMany(
ocorrenciasCol,
batch.map((ocorrencia) => ({
iptId: ipt.id,
ipt: iptName,
...ocorrencia[1]
})),
{
ordered: false
}
)
console.debug('Inserted')
}
console.debug(`Inserting IPT ${set}`)
const { id: _id, ...iptDb } = ipt
await iptsCol.updateOne(
{ _id: ipt.id },
{ $set: { _id, ...iptDb, tag: set, ipt: iptName } },
{ upsert: true }
)
}
}

console.log('Creating indexes')
await collection.createIndexes({
indexes: [
{
key: { scientificName: 1 },
name: 'scientificName'
}
]
})
await Promise.all([
ocorrenciasCol.createIndexes({
indexes: [
{
key: { scientificName: 1 },
name: 'scientificName'
},
{
key: { iptId: 1 },
name: 'iptId'
},
{
key: { ipt: 1 },
name: 'ipt'
}
]
}),
iptsCol.createIndexes({
indexes: [
{
key: { tag: 1 },
name: 'tag'
},
{
key: { ipt: 1 },
name: 'ipt'
}
]
})
])
console.debug('Done')

0 comments on commit 876dddc

Please sign in to comment.