From d085982dcf4dd800731ac869b3bb998892261f50 Mon Sep 17 00:00:00 2001 From: Martin Zimandl Date: Fri, 5 Jan 2024 15:44:39 +0100 Subject: [PATCH 1/3] get rid of async file line iteration --- lib/bgcalc/jsonl_cache.py | 1 - lib/bgcalc/keywords/__init__.py | 14 +++++++------- lib/bgcalc/pquery/storage.py | 9 +++++---- lib/bgcalc/wordlist/__init__.py | 6 +++--- .../default_taghelper/loaders/positional.py | 8 ++++---- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/lib/bgcalc/jsonl_cache.py b/lib/bgcalc/jsonl_cache.py index 0e411002f0..5b6b995948 100644 --- a/lib/bgcalc/jsonl_cache.py +++ b/lib/bgcalc/jsonl_cache.py @@ -12,7 +12,6 @@ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -import aiofiles import ujson as json diff --git a/lib/bgcalc/keywords/__init__.py b/lib/bgcalc/keywords/__init__.py index 966182f769..d695aaabab 100644 --- a/lib/bgcalc/keywords/__init__.py +++ b/lib/bgcalc/keywords/__init__.py @@ -15,11 +15,9 @@ import hashlib import math import os -import sys -import time from dataclasses import dataclass from functools import wraps -from typing import List, Tuple, Union +from typing import List, Union import aiofiles import aiofiles.os @@ -102,9 +100,9 @@ async def wrapper(corp: KCorpus, ref_corp: KCorpus, args: KeywordsFormArgs, max_ LineDataClass = CNCKeywordLine if manatee_is_custom_cnc else KeywordLine if await aiofiles.os.path.exists(path): - async with aiofiles.open(path, 'r') as fr: - await fr.readline() - return [LineDataClass.from_dict(json.loads(item)) async for item in fr][:max_items] + with open(path, 'r') as fr: + fr.readline() + return [LineDataClass.from_dict(json.loads(item)) for item in fr][:max_items] else: ans = await f(corp, ref_corp, args, max_items) # ans = sorted(ans, key=lambda x: x[1], reverse=True) @@ -117,9 +115,11 @@ async def wrapper(corp: KCorpus, ref_corp: KCorpus, args: KeywordsFormArgs, max_ return wrapper + def filter_nan(v: float, round_num): return None if math.isnan(v) else round(v, round_num) + @cached async def keywords(corp: KCorpus, ref_corp: KCorpus, args: KeywordsFormArgs, max_items: int) -> KeywordsResultType: c_wl = corp.get_attr(args.wlattr) @@ -135,7 +135,7 @@ async def keywords(corp: KCorpus, ref_corp: KCorpus, args: KeywordsFormArgs, max wlminfreq=args.wlminfreq, pfilter_words=[], nfilter_words=[], wlnums=args.wlnums, attrfreq=attrfreq) - words =[x[0] for x in wl_items] + words = [x[0] for x in wl_items] simple_n = 1.0 # this does not apply for CNC-custom manatee-open keywords keyword = Keyword( diff --git a/lib/bgcalc/pquery/storage.py b/lib/bgcalc/pquery/storage.py index 50d9562190..498ca99ab4 100644 --- a/lib/bgcalc/pquery/storage.py +++ b/lib/bgcalc/pquery/storage.py @@ -12,10 +12,10 @@ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. +import csv import hashlib import os.path from dataclasses import dataclass -from dataclasses_json import dataclass_json from functools import wraps from typing import List @@ -27,6 +27,7 @@ from action.argmapping.pquery import PqueryFormArgs from bgcalc.adapter.abstract import AbstractBgClient from bgcalc.csv_cache import load_cached_full, load_cached_partial +from dataclasses_json import dataclass_json from .errors import PqueryArgumentError, PqueryResultNotFound @@ -60,9 +61,9 @@ async def wrapper(worker: AbstractBgClient, pquery: PqueryFormArgs, raw_queries, path = _create_cache_path(pquery) if await aiofiles.os.path.exists(path): - async with aiofiles.open(path, 'r') as fr: - csv_reader = aiocsv.AsyncReader(fr) - return [item async for item in csv_reader] + with open(path, 'r') as fr: + csv_reader = csv.reader(fr) + return [item for item in csv_reader] else: ans = await f(worker, pquery, raw_queries, subcpath, user_id, collator_locale) num_lines = ans[0][1] diff --git a/lib/bgcalc/wordlist/__init__.py b/lib/bgcalc/wordlist/__init__.py index ffec1d9c8b..812ab49e3d 100644 --- a/lib/bgcalc/wordlist/__init__.py +++ b/lib/bgcalc/wordlist/__init__.py @@ -69,9 +69,9 @@ async def wrapper(corp: AbstractKCorpus, args: WordlistFormArgs, max_items: int) path = _create_cache_path(args) if await aiofiles.os.path.exists(path): - async with aiofiles.open(path, 'r') as fr: - await fr.readline() - return [json.loads(item) async for item in fr] + with open(path, 'r') as fr: + fr.readline() + return [json.loads(item) for item in fr] else: ans = await f(corp, args, sys.maxsize) ans = sorted(ans, key=lambda x: x[1], reverse=True) diff --git a/lib/plugins/default_taghelper/loaders/positional.py b/lib/plugins/default_taghelper/loaders/positional.py index 877fc9b2ec..d3650a0a23 100644 --- a/lib/plugins/default_taghelper/loaders/positional.py +++ b/lib/plugins/default_taghelper/loaders/positional.py @@ -115,8 +115,8 @@ async def _get_initial_values(self, lang, translate): if not await aiofiles.os.path.exists(tst_path): await aiofiles.os.mkdir(tst_path, 0o775) ans = [set() for _ in range(tagset['num_pos'])] - async with aiofiles.open(self.variants_file_path) as fr: - async for line in fr: + with open(self.variants_file_path) as fr: + for line in fr: line = line.strip() + (tagset['num_pos'] - len(line.strip())) * '-' for i in range(tagset['num_pos']): value = ''.join([char_replac_tab.get(x, x) for x in line[i]]) @@ -161,8 +161,8 @@ async def _calculate_variant(self, required_pattern, lang, translate): char_replac_tab = dict(self.__class__.SPEC_CHAR_REPLACEMENTS) patt = re.compile(required_pattern) matching_tags = [] - async with aiofiles.open(self.variants_file_path) as fr: - async for line in fr: + with open(self.variants_file_path) as fr: + for line in fr: line = line.strip() + (tagset['num_pos'] - len(line.strip())) * '-' if patt.match(line): matching_tags.append(line) From c5e7ad4697d18b5bb2055b9af48f50e451a8a8a2 Mon Sep 17 00:00:00 2001 From: Martin Zimandl Date: Mon, 8 Jan 2024 13:55:23 +0100 Subject: [PATCH 2/3] implement AsyncBatchWriter --- lib/bgcalc/freqs/storage.py | 9 +++--- lib/bgcalc/keywords/__init__.py | 7 +++-- lib/bgcalc/wordlist/__init__.py | 7 +++-- lib/plugins/default_subcmixer/__init__.py | 11 +++---- .../default_taghelper/loaders/keyval.py | 6 ++-- .../masm_live_attributes/doclist/writer.py | 5 ++-- lib/plugins/masm_subcmixer/__init__.py | 7 +++-- lib/plugins/mysql_subc_storage/__init__.py | 7 +++-- lib/util.py | 30 +++++++++++++++++-- 9 files changed, 60 insertions(+), 29 deletions(-) diff --git a/lib/bgcalc/freqs/storage.py b/lib/bgcalc/freqs/storage.py index c1d7d34e44..88e0a86371 100644 --- a/lib/bgcalc/freqs/storage.py +++ b/lib/bgcalc/freqs/storage.py @@ -30,6 +30,7 @@ from bgcalc.freqs.types import FreqCalcArgs, FreqCalcResult from conclib.freq import FreqData, FreqItem from dataclasses_json import dataclass_json +from util import AsyncBatchWriter def _cache_file_path(args: FreqCalcArgs): @@ -101,17 +102,17 @@ async def wrapper(args: FreqCalcArgs) -> FreqCalcResult: data, cache_path = await find_cached_result(args) if data is None: data: FreqCalcResult = await func(args) - async with aiofiles.open(cache_path, 'w') as fw: + async with aiofiles.open(cache_path, 'w') as fw, AsyncBatchWriter(fw, 100) as bw: common_md = CommonMetadata(num_blocks=len(data.freqs), conc_size=data.conc_size) - await fw.write(json.dumps(common_md.to_dict()) + '\n') + await bw.write(json.dumps(common_md.to_dict()) + '\n') for freq in data.freqs: block_md = BlockMetadata( head=[_Head(**x) for x in freq.Head], skipped_empty=freq.SkippedEmpty, no_rel_sorting=freq.NoRelSorting, size=len(freq.Items)) - await fw.write(json.dumps(block_md.to_dict()) + '\n') + await bw.write(json.dumps(block_md.to_dict()) + '\n') for item in freq.Items: - await fw.write(json.dumps(item.to_dict()) + '\n') + await bw.write(json.dumps(item.to_dict()) + '\n') return data return wrapper diff --git a/lib/bgcalc/keywords/__init__.py b/lib/bgcalc/keywords/__init__.py index d695aaabab..f8f980ab61 100644 --- a/lib/bgcalc/keywords/__init__.py +++ b/lib/bgcalc/keywords/__init__.py @@ -30,6 +30,7 @@ from corplib.corpus import KCorpus from dataclasses_json import dataclass_json from manatee import Keyword # TODO wrap this out +from util import AsyncBatchWriter CNC_SCORE_TYPES = ('logL', 'chi2', 'din') @@ -107,10 +108,10 @@ async def wrapper(corp: KCorpus, ref_corp: KCorpus, args: KeywordsFormArgs, max_ ans = await f(corp, ref_corp, args, max_items) # ans = sorted(ans, key=lambda x: x[1], reverse=True) num_lines = len(ans) - async with aiofiles.open(path, 'w') as fw: - await fw.write(json.dumps(dict(total=num_lines)) + '\n') + async with aiofiles.open(path, 'w') as fw, AsyncBatchWriter(fw, 100) as bw: + await bw.write(json.dumps(dict(total=num_lines)) + '\n') for item in ans: - await fw.write(item.to_json() + '\n') + await bw.write(item.to_json() + '\n') return ans[:max_items] return wrapper diff --git a/lib/bgcalc/wordlist/__init__.py b/lib/bgcalc/wordlist/__init__.py index 812ab49e3d..a4577c84f1 100644 --- a/lib/bgcalc/wordlist/__init__.py +++ b/lib/bgcalc/wordlist/__init__.py @@ -29,6 +29,7 @@ from corplib import frq_db from corplib.corpus import AbstractKCorpus from manatee import Structure # TODO wrap this out +from util import AsyncBatchWriter def _create_cache_path(form: WordlistFormArgs) -> str: @@ -76,10 +77,10 @@ async def wrapper(corp: AbstractKCorpus, args: WordlistFormArgs, max_items: int) ans = await f(corp, args, sys.maxsize) ans = sorted(ans, key=lambda x: x[1], reverse=True) num_lines = len(ans) - async with aiofiles.open(path, 'w') as fw: - await fw.write(json.dumps(dict(total=num_lines)) + '\n') + async with aiofiles.open(path, 'w') as fw, AsyncBatchWriter(fw, 100) as bw: + await bw.write(json.dumps(dict(total=num_lines)) + '\n') for item in ans: - await fw.write(json.dumps(item) + '\n') + await bw.write(json.dumps(item) + '\n') return ans[:max_items] return wrapper diff --git a/lib/plugins/default_subcmixer/__init__.py b/lib/plugins/default_subcmixer/__init__.py index abb9115409..17b775412b 100644 --- a/lib/plugins/default_subcmixer/__init__.py +++ b/lib/plugins/default_subcmixer/__init__.py @@ -15,10 +15,10 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +import os import struct from collections import defaultdict from typing import Any, Dict, List -import os import aiofiles import plugins @@ -27,16 +27,17 @@ from action.krequest import KRequest from action.model.corpus import CorpusActionModel from action.response import KResponse +from corplib.abstract import create_new_subc_ident from plugin_types.subcmixer import AbstractSubcMixer from plugin_types.subcmixer.error import ( ResultNotFoundException, SubcMixerException) from plugins import inject from sanic.blueprints import Blueprint +from util import AsyncBatchWriter from .category_tree import CategoryExpression, CategoryTree from .database import Database from .metadata_model import MetadataModel -from corplib.abstract import create_new_subc_ident bp = Blueprint('default_subcmixer') @@ -74,10 +75,10 @@ async def subcmixer_create_subcorpus(amodel: CorpusActionModel, req: KRequest, r struct_indices = sorted([int(x) for x in req.form.get('ids').split(',')]) id_attr = req.form.get('idAttr').split('.') attr = amodel.corp.get_struct(id_attr[0]) - async with aiofiles.open(os.path.join(amodel.subcpath, subc_id.data_path), 'wb') as fw: + async with aiofiles.open(os.path.join(amodel.subcpath, subc_id.data_path), 'wb') as fw, AsyncBatchWriter(fw, 100) as bw: for idx in struct_indices: - await fw.write(struct.pack(' bool: @@ -47,9 +48,9 @@ async def export_xml(data: List[DocListItem], target_path: str) -> bool: async def export_jsonl(data: List[DocListItem], target_path: str) -> bool: if len(data) == 0: return False - async with aiofiles.open(target_path, 'w') as fw: + async with aiofiles.open(target_path, 'w') as fw, AsyncBatchWriter(fw, 100) as bw: for item in data: - await fw.write(json.dumps(item.to_dict()) + "\n") + await bw.write(json.dumps(item.to_dict()) + "\n") return True diff --git a/lib/plugins/masm_subcmixer/__init__.py b/lib/plugins/masm_subcmixer/__init__.py index 959eb40b0f..9734ca58a5 100644 --- a/lib/plugins/masm_subcmixer/__init__.py +++ b/lib/plugins/masm_subcmixer/__init__.py @@ -37,6 +37,7 @@ from plugin_types.subcmixer.error import ( ResultNotFoundException, SubcMixerException) from sanic.blueprints import Blueprint +from util import AsyncBatchWriter bp = Blueprint('masm_subcmixer', url_prefix='subcorpus') @@ -78,10 +79,10 @@ async def subcmixer_create_subcorpus(amodel: CorpusActionModel, req: KRequest, r struct_idxs = sorted(attr.str2id(sid) for sid in struct_ids) subc_id = await create_new_subc_ident(amodel.subcpath, amodel.corp.corpname) - async with aiofiles.open(os.path.join(amodel.subcpath, subc_id.data_path), 'wb') as fw: + async with aiofiles.open(os.path.join(amodel.subcpath, subc_id.data_path), 'wb') as fw, AsyncBatchWriter(fw, 100) as bw: for idx in struct_idxs: - await fw.write(struct.pack(' str: @@ -46,3 +47,26 @@ def int2chash(hex_num: int, length: int) -> str: ans.append(_KEY_ALPHABET[p]) hex_num = int(hex_num / len(_KEY_ALPHABET)) return ''.join([str(x) for x in ans]) + + +class AsyncBatchWriter(AsyncContextDecorator): + def __init__(self, f, batch_size: int): + self.f = f + self.batch_size = batch_size + self.lines = [] + + async def write(self, data: str): + self.lines.append(data) + if len(self.lines) > self.batch_size: + await self.flush() + + async def flush(self): + await self.f.writelines(self.lines) + self.lines = [] + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc): + await self.flush() + return False From b8f1f2c2e595b11647ed611f67e34b545ac2b55a Mon Sep 17 00:00:00 2001 From: Martin Zimandl Date: Mon, 8 Jan 2024 14:48:25 +0100 Subject: [PATCH 3/3] AsyncBatchWriter handles files internaly --- lib/bgcalc/freqs/storage.py | 2 +- lib/bgcalc/keywords/__init__.py | 2 +- lib/bgcalc/wordlist/__init__.py | 2 +- lib/plugins/default_subcmixer/__init__.py | 3 +-- lib/plugins/masm_live_attributes/doclist/writer.py | 2 +- lib/plugins/masm_subcmixer/__init__.py | 3 +-- lib/plugins/mysql_subc_storage/__init__.py | 3 +-- lib/util.py | 10 ++++++++-- 8 files changed, 15 insertions(+), 12 deletions(-) diff --git a/lib/bgcalc/freqs/storage.py b/lib/bgcalc/freqs/storage.py index 88e0a86371..d03fb574b3 100644 --- a/lib/bgcalc/freqs/storage.py +++ b/lib/bgcalc/freqs/storage.py @@ -102,7 +102,7 @@ async def wrapper(args: FreqCalcArgs) -> FreqCalcResult: data, cache_path = await find_cached_result(args) if data is None: data: FreqCalcResult = await func(args) - async with aiofiles.open(cache_path, 'w') as fw, AsyncBatchWriter(fw, 100) as bw: + async with AsyncBatchWriter(cache_path, 'w', 100) as bw: common_md = CommonMetadata(num_blocks=len(data.freqs), conc_size=data.conc_size) await bw.write(json.dumps(common_md.to_dict()) + '\n') for freq in data.freqs: diff --git a/lib/bgcalc/keywords/__init__.py b/lib/bgcalc/keywords/__init__.py index f8f980ab61..30d7869b17 100644 --- a/lib/bgcalc/keywords/__init__.py +++ b/lib/bgcalc/keywords/__init__.py @@ -108,7 +108,7 @@ async def wrapper(corp: KCorpus, ref_corp: KCorpus, args: KeywordsFormArgs, max_ ans = await f(corp, ref_corp, args, max_items) # ans = sorted(ans, key=lambda x: x[1], reverse=True) num_lines = len(ans) - async with aiofiles.open(path, 'w') as fw, AsyncBatchWriter(fw, 100) as bw: + async with AsyncBatchWriter(path, 'w', 100) as bw: await bw.write(json.dumps(dict(total=num_lines)) + '\n') for item in ans: await bw.write(item.to_json() + '\n') diff --git a/lib/bgcalc/wordlist/__init__.py b/lib/bgcalc/wordlist/__init__.py index a4577c84f1..1a29aa7e7e 100644 --- a/lib/bgcalc/wordlist/__init__.py +++ b/lib/bgcalc/wordlist/__init__.py @@ -77,7 +77,7 @@ async def wrapper(corp: AbstractKCorpus, args: WordlistFormArgs, max_items: int) ans = await f(corp, args, sys.maxsize) ans = sorted(ans, key=lambda x: x[1], reverse=True) num_lines = len(ans) - async with aiofiles.open(path, 'w') as fw, AsyncBatchWriter(fw, 100) as bw: + async with AsyncBatchWriter(path, 'w', 100) as bw: await bw.write(json.dumps(dict(total=num_lines)) + '\n') for item in ans: await bw.write(json.dumps(item) + '\n') diff --git a/lib/plugins/default_subcmixer/__init__.py b/lib/plugins/default_subcmixer/__init__.py index 17b775412b..4050040ee9 100644 --- a/lib/plugins/default_subcmixer/__init__.py +++ b/lib/plugins/default_subcmixer/__init__.py @@ -20,7 +20,6 @@ from collections import defaultdict from typing import Any, Dict, List -import aiofiles import plugins import ujson as json from action.control import http_action @@ -75,7 +74,7 @@ async def subcmixer_create_subcorpus(amodel: CorpusActionModel, req: KRequest, r struct_indices = sorted([int(x) for x in req.form.get('ids').split(',')]) id_attr = req.form.get('idAttr').split('.') attr = amodel.corp.get_struct(id_attr[0]) - async with aiofiles.open(os.path.join(amodel.subcpath, subc_id.data_path), 'wb') as fw, AsyncBatchWriter(fw, 100) as bw: + async with AsyncBatchWriter(os.path.join(amodel.subcpath, subc_id.data_path), 'wb', 100) as bw: for idx in struct_indices: await bw.write(struct.pack(' bool: async def export_jsonl(data: List[DocListItem], target_path: str) -> bool: if len(data) == 0: return False - async with aiofiles.open(target_path, 'w') as fw, AsyncBatchWriter(fw, 100) as bw: + async with AsyncBatchWriter(target_path, 'w', 100) as bw: for item in data: await bw.write(json.dumps(item.to_dict()) + "\n") return True diff --git a/lib/plugins/masm_subcmixer/__init__.py b/lib/plugins/masm_subcmixer/__init__.py index 9734ca58a5..c13b391266 100644 --- a/lib/plugins/masm_subcmixer/__init__.py +++ b/lib/plugins/masm_subcmixer/__init__.py @@ -21,7 +21,6 @@ from dataclasses import dataclass from typing import List, Optional -import aiofiles import aiohttp import plugins import ujson as json @@ -79,7 +78,7 @@ async def subcmixer_create_subcorpus(amodel: CorpusActionModel, req: KRequest, r struct_idxs = sorted(attr.str2id(sid) for sid in struct_ids) subc_id = await create_new_subc_ident(amodel.subcpath, amodel.corp.corpname) - async with aiofiles.open(os.path.join(amodel.subcpath, subc_id.data_path), 'wb') as fw, AsyncBatchWriter(fw, 100) as bw: + async with AsyncBatchWriter(os.path.join(amodel.subcpath, subc_id.data_path), 'wb', 100) as bw: for idx in struct_idxs: await bw.write(struct.pack(' str: class AsyncBatchWriter(AsyncContextDecorator): - def __init__(self, f, batch_size: int): - self.f = f + def __init__(self, filename: str, mode: str, batch_size: int): + self.filename = filename + self.mode = mode + self.f = None self.batch_size = batch_size self.lines = [] @@ -65,8 +69,10 @@ async def flush(self): self.lines = [] async def __aenter__(self): + self.f = await aiofiles.open(self.filename, self.mode).__aenter__() return self async def __aexit__(self, *exc): await self.flush() + self.f.close() return False