Skip to content

Commit

Permalink
AsyncBatchWriter handles files internaly
Browse files Browse the repository at this point in the history
  • Loading branch information
mzimandl committed Jan 8, 2024
1 parent c5e7ad4 commit b8f1f2c
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 12 deletions.
2 changes: 1 addition & 1 deletion lib/bgcalc/freqs/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async def wrapper(args: FreqCalcArgs) -> FreqCalcResult:
data, cache_path = await find_cached_result(args)
if data is None:
data: FreqCalcResult = await func(args)
async with aiofiles.open(cache_path, 'w') as fw, AsyncBatchWriter(fw, 100) as bw:
async with AsyncBatchWriter(cache_path, 'w', 100) as bw:
common_md = CommonMetadata(num_blocks=len(data.freqs), conc_size=data.conc_size)
await bw.write(json.dumps(common_md.to_dict()) + '\n')
for freq in data.freqs:
Expand Down
2 changes: 1 addition & 1 deletion lib/bgcalc/keywords/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ async def wrapper(corp: KCorpus, ref_corp: KCorpus, args: KeywordsFormArgs, max_
ans = await f(corp, ref_corp, args, max_items)
# ans = sorted(ans, key=lambda x: x[1], reverse=True)
num_lines = len(ans)
async with aiofiles.open(path, 'w') as fw, AsyncBatchWriter(fw, 100) as bw:
async with AsyncBatchWriter(path, 'w', 100) as bw:
await bw.write(json.dumps(dict(total=num_lines)) + '\n')
for item in ans:
await bw.write(item.to_json() + '\n')
Expand Down
2 changes: 1 addition & 1 deletion lib/bgcalc/wordlist/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def wrapper(corp: AbstractKCorpus, args: WordlistFormArgs, max_items: int)
ans = await f(corp, args, sys.maxsize)
ans = sorted(ans, key=lambda x: x[1], reverse=True)
num_lines = len(ans)
async with aiofiles.open(path, 'w') as fw, AsyncBatchWriter(fw, 100) as bw:
async with AsyncBatchWriter(path, 'w', 100) as bw:
await bw.write(json.dumps(dict(total=num_lines)) + '\n')
for item in ans:
await bw.write(json.dumps(item) + '\n')
Expand Down
3 changes: 1 addition & 2 deletions lib/plugins/default_subcmixer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from collections import defaultdict
from typing import Any, Dict, List

import aiofiles
import plugins
import ujson as json
from action.control import http_action
Expand Down Expand Up @@ -75,7 +74,7 @@ async def subcmixer_create_subcorpus(amodel: CorpusActionModel, req: KRequest, r
struct_indices = sorted([int(x) for x in req.form.get('ids').split(',')])
id_attr = req.form.get('idAttr').split('.')
attr = amodel.corp.get_struct(id_attr[0])
async with aiofiles.open(os.path.join(amodel.subcpath, subc_id.data_path), 'wb') as fw, AsyncBatchWriter(fw, 100) as bw:
async with AsyncBatchWriter(os.path.join(amodel.subcpath, subc_id.data_path), 'wb', 100) as bw:
for idx in struct_indices:
await bw.write(struct.pack('<q', attr.beg(idx)))
await bw.write(struct.pack('<q', attr.end(idx)))
Expand Down
2 changes: 1 addition & 1 deletion lib/plugins/masm_live_attributes/doclist/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async def export_xml(data: List[DocListItem], target_path: str) -> bool:
async def export_jsonl(data: List[DocListItem], target_path: str) -> bool:
if len(data) == 0:
return False
async with aiofiles.open(target_path, 'w') as fw, AsyncBatchWriter(fw, 100) as bw:
async with AsyncBatchWriter(target_path, 'w', 100) as bw:
for item in data:
await bw.write(json.dumps(item.to_dict()) + "\n")
return True
Expand Down
3 changes: 1 addition & 2 deletions lib/plugins/masm_subcmixer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from dataclasses import dataclass
from typing import List, Optional

import aiofiles
import aiohttp
import plugins
import ujson as json
Expand Down Expand Up @@ -79,7 +78,7 @@ async def subcmixer_create_subcorpus(amodel: CorpusActionModel, req: KRequest, r
struct_idxs = sorted(attr.str2id(sid) for sid in struct_ids)

subc_id = await create_new_subc_ident(amodel.subcpath, amodel.corp.corpname)
async with aiofiles.open(os.path.join(amodel.subcpath, subc_id.data_path), 'wb') as fw, AsyncBatchWriter(fw, 100) as bw:
async with AsyncBatchWriter(os.path.join(amodel.subcpath, subc_id.data_path), 'wb', 100) as bw:
for idx in struct_idxs:
await bw.write(struct.pack('<q', mstruct.beg(idx)))
await bw.write(struct.pack('<q', mstruct.end(idx)))
Expand Down
3 changes: 1 addition & 2 deletions lib/plugins/mysql_subc_storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from datetime import datetime
from typing import Any, Dict, List, Optional, Union

import aiofiles
import plugins
import ujson as json
from action.argmapping.subcorpus import (
Expand Down Expand Up @@ -161,7 +160,7 @@ async def create_preflight(self, subc_root_dir, corpname):
new ID of the subcorpus
"""
subc_id = await create_new_subc_ident(subc_root_dir, corpname)
async with aiofiles.open(os.path.join(subc_root_dir, subc_id.data_path), 'wb') as fw, AsyncBatchWriter(fw, 100) as bw:
async with AsyncBatchWriter(os.path.join(subc_root_dir, subc_id.data_path), 'wb', 100) as bw:
await bw.write(struct.pack('<q', 0))
await bw.write(struct.pack('<q', self.preflight_subcorpus_size))
subcname = f'{corpname}-preflight'
Expand Down
10 changes: 8 additions & 2 deletions lib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from functools import partial, wraps
from typing import AsyncIterator, TypeVar

import aiofiles

T = TypeVar('T')


Expand Down Expand Up @@ -50,8 +52,10 @@ def int2chash(hex_num: int, length: int) -> str:


class AsyncBatchWriter(AsyncContextDecorator):
def __init__(self, f, batch_size: int):
self.f = f
def __init__(self, filename: str, mode: str, batch_size: int):
self.filename = filename
self.mode = mode
self.f = None
self.batch_size = batch_size
self.lines = []

Expand All @@ -65,8 +69,10 @@ async def flush(self):
self.lines = []

async def __aenter__(self):
self.f = await aiofiles.open(self.filename, self.mode).__aenter__()
return self

async def __aexit__(self, *exc):
await self.flush()
self.f.close()
return False

0 comments on commit b8f1f2c

Please sign in to comment.