Skip to content

Commit

Permalink
Merge pull request #6083 from mzimandl/aiofiles-fix
Browse files Browse the repository at this point in the history
aiofiles fixes
  • Loading branch information
tomachalek authored Jan 8, 2024
2 parents 447ad99 + b8f1f2c commit cde9826
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 51 deletions.
9 changes: 5 additions & 4 deletions lib/bgcalc/freqs/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from bgcalc.freqs.types import FreqCalcArgs, FreqCalcResult
from conclib.freq import FreqData, FreqItem
from dataclasses_json import dataclass_json
from util import AsyncBatchWriter


def _cache_file_path(args: FreqCalcArgs):
Expand Down Expand Up @@ -101,17 +102,17 @@ async def wrapper(args: FreqCalcArgs) -> FreqCalcResult:
data, cache_path = await find_cached_result(args)
if data is None:
data: FreqCalcResult = await func(args)
async with aiofiles.open(cache_path, 'w') as fw:
async with AsyncBatchWriter(cache_path, 'w', 100) as bw:
common_md = CommonMetadata(num_blocks=len(data.freqs), conc_size=data.conc_size)
await fw.write(json.dumps(common_md.to_dict()) + '\n')
await bw.write(json.dumps(common_md.to_dict()) + '\n')
for freq in data.freqs:
block_md = BlockMetadata(
head=[_Head(**x) for x in freq.Head],
skipped_empty=freq.SkippedEmpty,
no_rel_sorting=freq.NoRelSorting,
size=len(freq.Items))
await fw.write(json.dumps(block_md.to_dict()) + '\n')
await bw.write(json.dumps(block_md.to_dict()) + '\n')
for item in freq.Items:
await fw.write(json.dumps(item.to_dict()) + '\n')
await bw.write(json.dumps(item.to_dict()) + '\n')
return data
return wrapper
1 change: 0 additions & 1 deletion lib/bgcalc/jsonl_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.

import aiofiles
import ujson as json


Expand Down
21 changes: 11 additions & 10 deletions lib/bgcalc/keywords/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
import hashlib
import math
import os
import sys
import time
from dataclasses import dataclass
from functools import wraps
from typing import List, Tuple, Union
from typing import List, Union

import aiofiles
import aiofiles.os
Expand All @@ -32,6 +30,7 @@
from corplib.corpus import KCorpus
from dataclasses_json import dataclass_json
from manatee import Keyword # TODO wrap this out
from util import AsyncBatchWriter

CNC_SCORE_TYPES = ('logL', 'chi2', 'din')

Expand Down Expand Up @@ -102,24 +101,26 @@ async def wrapper(corp: KCorpus, ref_corp: KCorpus, args: KeywordsFormArgs, max_
LineDataClass = CNCKeywordLine if manatee_is_custom_cnc else KeywordLine

if await aiofiles.os.path.exists(path):
async with aiofiles.open(path, 'r') as fr:
await fr.readline()
return [LineDataClass.from_dict(json.loads(item)) async for item in fr][:max_items]
with open(path, 'r') as fr:
fr.readline()
return [LineDataClass.from_dict(json.loads(item)) for item in fr][:max_items]
else:
ans = await f(corp, ref_corp, args, max_items)
# ans = sorted(ans, key=lambda x: x[1], reverse=True)
num_lines = len(ans)
async with aiofiles.open(path, 'w') as fw:
await fw.write(json.dumps(dict(total=num_lines)) + '\n')
async with AsyncBatchWriter(path, 'w', 100) as bw:
await bw.write(json.dumps(dict(total=num_lines)) + '\n')
for item in ans:
await fw.write(item.to_json() + '\n')
await bw.write(item.to_json() + '\n')
return ans[:max_items]

return wrapper


def filter_nan(v: float, round_num):
return None if math.isnan(v) else round(v, round_num)


@cached
async def keywords(corp: KCorpus, ref_corp: KCorpus, args: KeywordsFormArgs, max_items: int) -> KeywordsResultType:
c_wl = corp.get_attr(args.wlattr)
Expand All @@ -135,7 +136,7 @@ async def keywords(corp: KCorpus, ref_corp: KCorpus, args: KeywordsFormArgs, max
wlminfreq=args.wlminfreq, pfilter_words=[],
nfilter_words=[], wlnums=args.wlnums,
attrfreq=attrfreq)
words =[x[0] for x in wl_items]
words = [x[0] for x in wl_items]

simple_n = 1.0 # this does not apply for CNC-custom manatee-open keywords
keyword = Keyword(
Expand Down
9 changes: 5 additions & 4 deletions lib/bgcalc/pquery/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.

import csv
import hashlib
import os.path
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from functools import wraps
from typing import List

Expand All @@ -27,6 +27,7 @@
from action.argmapping.pquery import PqueryFormArgs
from bgcalc.adapter.abstract import AbstractBgClient
from bgcalc.csv_cache import load_cached_full, load_cached_partial
from dataclasses_json import dataclass_json

from .errors import PqueryArgumentError, PqueryResultNotFound

Expand Down Expand Up @@ -60,9 +61,9 @@ async def wrapper(worker: AbstractBgClient, pquery: PqueryFormArgs, raw_queries,
path = _create_cache_path(pquery)

if await aiofiles.os.path.exists(path):
async with aiofiles.open(path, 'r') as fr:
csv_reader = aiocsv.AsyncReader(fr)
return [item async for item in csv_reader]
with open(path, 'r') as fr:
csv_reader = csv.reader(fr)
return [item for item in csv_reader]
else:
ans = await f(worker, pquery, raw_queries, subcpath, user_id, collator_locale)
num_lines = ans[0][1]
Expand Down
13 changes: 7 additions & 6 deletions lib/bgcalc/wordlist/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from corplib import frq_db
from corplib.corpus import AbstractKCorpus
from manatee import Structure # TODO wrap this out
from util import AsyncBatchWriter


def _create_cache_path(form: WordlistFormArgs) -> str:
Expand Down Expand Up @@ -69,17 +70,17 @@ async def wrapper(corp: AbstractKCorpus, args: WordlistFormArgs, max_items: int)
path = _create_cache_path(args)

if await aiofiles.os.path.exists(path):
async with aiofiles.open(path, 'r') as fr:
await fr.readline()
return [json.loads(item) async for item in fr]
with open(path, 'r') as fr:
fr.readline()
return [json.loads(item) for item in fr]
else:
ans = await f(corp, args, sys.maxsize)
ans = sorted(ans, key=lambda x: x[1], reverse=True)
num_lines = len(ans)
async with aiofiles.open(path, 'w') as fw:
await fw.write(json.dumps(dict(total=num_lines)) + '\n')
async with AsyncBatchWriter(path, 'w', 100) as bw:
await bw.write(json.dumps(dict(total=num_lines)) + '\n')
for item in ans:
await fw.write(json.dumps(item) + '\n')
await bw.write(json.dumps(item) + '\n')
return ans[:max_items]

return wrapper
Expand Down
12 changes: 6 additions & 6 deletions lib/plugins/default_subcmixer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,28 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.

import os
import struct
from collections import defaultdict
from typing import Any, Dict, List
import os

import aiofiles
import plugins
import ujson as json
from action.control import http_action
from action.krequest import KRequest
from action.model.corpus import CorpusActionModel
from action.response import KResponse
from corplib.abstract import create_new_subc_ident
from plugin_types.subcmixer import AbstractSubcMixer
from plugin_types.subcmixer.error import (
ResultNotFoundException, SubcMixerException)
from plugins import inject
from sanic.blueprints import Blueprint
from util import AsyncBatchWriter

from .category_tree import CategoryExpression, CategoryTree
from .database import Database
from .metadata_model import MetadataModel
from corplib.abstract import create_new_subc_ident

bp = Blueprint('default_subcmixer')

Expand Down Expand Up @@ -74,10 +74,10 @@ async def subcmixer_create_subcorpus(amodel: CorpusActionModel, req: KRequest, r
struct_indices = sorted([int(x) for x in req.form.get('ids').split(',')])
id_attr = req.form.get('idAttr').split('.')
attr = amodel.corp.get_struct(id_attr[0])
async with aiofiles.open(os.path.join(amodel.subcpath, subc_id.data_path), 'wb') as fw:
async with AsyncBatchWriter(os.path.join(amodel.subcpath, subc_id.data_path), 'wb', 100) as bw:
for idx in struct_indices:
await fw.write(struct.pack('<q', attr.beg(idx)))
await fw.write(struct.pack('<q', attr.end(idx)))
await bw.write(struct.pack('<q', attr.beg(idx)))
await bw.write(struct.pack('<q', attr.end(idx)))
return dict(status=True)


Expand Down
6 changes: 3 additions & 3 deletions lib/plugins/default_taghelper/loaders/keyval.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ async def _initialize_tags(self):
if await self.is_available(lambda x: x):
async with aiofiles.open(self.variants_file_path, 'r') as f:
self.initial_values = json.loads(await f.read())
for item in self.initial_values:
for i, v in enumerate(item):
item[i] = tuple(v)
for item in self.initial_values:
for i, v in enumerate(item):
item[i] = tuple(v)
else:
self.initial_values = []

Expand Down
8 changes: 4 additions & 4 deletions lib/plugins/default_taghelper/loaders/positional.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ async def _get_initial_values(self, lang, translate):
if not await aiofiles.os.path.exists(tst_path):
await aiofiles.os.mkdir(tst_path, 0o775)
ans = [set() for _ in range(tagset['num_pos'])]
async with aiofiles.open(self.variants_file_path) as fr:
async for line in fr:
with open(self.variants_file_path) as fr:
for line in fr:
line = line.strip() + (tagset['num_pos'] - len(line.strip())) * '-'
for i in range(tagset['num_pos']):
value = ''.join([char_replac_tab.get(x, x) for x in line[i]])
Expand Down Expand Up @@ -161,8 +161,8 @@ async def _calculate_variant(self, required_pattern, lang, translate):
char_replac_tab = dict(self.__class__.SPEC_CHAR_REPLACEMENTS)
patt = re.compile(required_pattern)
matching_tags = []
async with aiofiles.open(self.variants_file_path) as fr:
async for line in fr:
with open(self.variants_file_path) as fr:
for line in fr:
line = line.strip() + (tagset['num_pos'] - len(line.strip())) * '-'
if patt.match(line):
matching_tags.append(line)
Expand Down
5 changes: 3 additions & 2 deletions lib/plugins/masm_live_attributes/doclist/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from openpyxl import Workbook
from plugins.masm_live_attributes.doclist import DocListItem
from templating import Type2XML
from util import AsyncBatchWriter


async def export_csv(data: List[DocListItem], target_path: str) -> bool:
Expand All @@ -47,9 +48,9 @@ async def export_xml(data: List[DocListItem], target_path: str) -> bool:
async def export_jsonl(data: List[DocListItem], target_path: str) -> bool:
if len(data) == 0:
return False
async with aiofiles.open(target_path, 'w') as fw:
async with AsyncBatchWriter(target_path, 'w', 100) as bw:
for item in data:
await fw.write(json.dumps(item.to_dict()) + "\n")
await bw.write(json.dumps(item.to_dict()) + "\n")
return True


Expand Down
8 changes: 4 additions & 4 deletions lib/plugins/masm_subcmixer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from dataclasses import dataclass
from typing import List, Optional

import aiofiles
import aiohttp
import plugins
import ujson as json
Expand All @@ -37,6 +36,7 @@
from plugin_types.subcmixer.error import (
ResultNotFoundException, SubcMixerException)
from sanic.blueprints import Blueprint
from util import AsyncBatchWriter

bp = Blueprint('masm_subcmixer', url_prefix='subcorpus')

Expand Down Expand Up @@ -78,10 +78,10 @@ async def subcmixer_create_subcorpus(amodel: CorpusActionModel, req: KRequest, r
struct_idxs = sorted(attr.str2id(sid) for sid in struct_ids)

subc_id = await create_new_subc_ident(amodel.subcpath, amodel.corp.corpname)
async with aiofiles.open(os.path.join(amodel.subcpath, subc_id.data_path), 'wb') as fw:
async with AsyncBatchWriter(os.path.join(amodel.subcpath, subc_id.data_path), 'wb', 100) as bw:
for idx in struct_idxs:
await fw.write(struct.pack('<q', mstruct.beg(idx)))
await fw.write(struct.pack('<q', mstruct.end(idx)))
await bw.write(struct.pack('<q', mstruct.beg(idx)))
await bw.write(struct.pack('<q', mstruct.end(idx)))
subc = await amodel.cf.get_corpus(subc_id)
author = amodel.plugin_ctx.user_dict
specification = CreateSubcorpusArgs(
Expand Down
8 changes: 4 additions & 4 deletions lib/plugins/mysql_subc_storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from datetime import datetime
from typing import Any, Dict, List, Optional, Union

import aiofiles
import plugins
import ujson as json
from action.argmapping.subcorpus import (
Expand All @@ -36,6 +35,7 @@
from plugins.mysql_integration_db import MySqlIntegrationDb
from pymysql.err import IntegrityError
from sanic import Sanic
from util import AsyncBatchWriter

try:
from markdown import markdown
Expand Down Expand Up @@ -160,9 +160,9 @@ async def create_preflight(self, subc_root_dir, corpname):
new ID of the subcorpus
"""
subc_id = await create_new_subc_ident(subc_root_dir, corpname)
async with aiofiles.open(os.path.join(subc_root_dir, subc_id.data_path), 'wb') as fw:
await fw.write(struct.pack('<q', 0))
await fw.write(struct.pack('<q', self.preflight_subcorpus_size))
async with AsyncBatchWriter(os.path.join(subc_root_dir, subc_id.data_path), 'wb', 100) as bw:
await bw.write(struct.pack('<q', 0))
await bw.write(struct.pack('<q', self.preflight_subcorpus_size))
subcname = f'{corpname}-preflight'
async with self._db.cursor() as cursor:
await cursor.execute('START TRANSACTION')
Expand Down
36 changes: 33 additions & 3 deletions lib/util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import asyncio
from contextlib import AsyncContextDecorator
from functools import partial, wraps
from typing import AsyncIterator, TypeVar

import aiofiles

T = TypeVar('T')


Expand Down Expand Up @@ -30,9 +33,9 @@ async def anext(ait: AsyncIterator):


_KEY_ALPHABET = (
[chr(x) for x in range(ord('a'), ord('z') + 1)] +
[chr(x) for x in range(ord('A'), ord('Z') + 1)] +
['%d' % i for i in range(10)])
[chr(x) for x in range(ord('a'), ord('z') + 1)] +
[chr(x) for x in range(ord('A'), ord('Z') + 1)] +
['%d' % i for i in range(10)])


def int2chash(hex_num: int, length: int) -> str:
Expand All @@ -46,3 +49,30 @@ def int2chash(hex_num: int, length: int) -> str:
ans.append(_KEY_ALPHABET[p])
hex_num = int(hex_num / len(_KEY_ALPHABET))
return ''.join([str(x) for x in ans])


class AsyncBatchWriter(AsyncContextDecorator):
def __init__(self, filename: str, mode: str, batch_size: int):
self.filename = filename
self.mode = mode
self.f = None
self.batch_size = batch_size
self.lines = []

async def write(self, data: str):
self.lines.append(data)
if len(self.lines) > self.batch_size:
await self.flush()

async def flush(self):
await self.f.writelines(self.lines)
self.lines = []

async def __aenter__(self):
self.f = await aiofiles.open(self.filename, self.mode).__aenter__()
return self

async def __aexit__(self, *exc):
await self.flush()
self.f.close()
return False

0 comments on commit cde9826

Please sign in to comment.