Skip to content

Commit

Permalink
Merge branch 'master' into release-0.18.x
Browse files Browse the repository at this point in the history
  • Loading branch information
tomachalek committed Jul 25, 2024
2 parents d50a605 + a41115f commit eb634c4
Show file tree
Hide file tree
Showing 87 changed files with 978 additions and 1,450 deletions.
1 change: 0 additions & 1 deletion conf/config.default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
<no_anonymous_access>0</no_anonymous_access>
<jwt_secret>my_secret</jwt_secret>
<enabled_websockets>0</enabled_websockets>
<preflight_query_timeout_ms>3000</preflight_query_timeout_ms>
</global>
<calc_backend>
<type>rq</type>
Expand Down
15 changes: 0 additions & 15 deletions conf/config.rng
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,6 @@
<text />
</element>
</optional>
<element name="use_conc_toolbar">
<a:documentation>If true then a special toolbar allowing easier setting of
common concordance view options is shown</a:documentation>
<ref name="boolValues" />
</element>
<element name="anonymous_user_conc_login_prompt">
<a:documentation>If true then for anonymous user, on concordance page a prompt to log-in always appears</a:documentation>
<ref name="boolValues" />
Expand Down Expand Up @@ -415,16 +410,6 @@
</oneOrMore>
</element>
</optional>
<optional>
<element name="preflight_query_timeout_ms">
<a:documentation>
For preflight queries, this speicifies how long KonText client-side waits
for the query. In case it is not finished in the limit, the client will
consider the query a &quot;long one&quot; and warn user accordingly.
</a:documentation>
<data type="nonNegativeInteger" />
</element>
</optional>
</interleave>
</element>
<element name="calc_backend">
Expand Down
2 changes: 0 additions & 2 deletions lib/action/model/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ async def add_globals(self, app: Sanic, action_props: ActionProps, result: Dict[
result['locale'] = self._req.ui_lang
result['messages'] = []
result['uses_corp_instance'] = False
result['use_conc_toolbar'] = False
result['shuffle_min_result_warning'] = 0
result['multilevel_freq_dist_max_levels'] = 0
if action_props.page_model is None:
Expand All @@ -126,7 +125,6 @@ async def add_globals(self, app: Sanic, action_props: ActionProps, result: Dict[
result['issue_reporting_action'] = None
result['help_links'] = {}
result['_version'] = None
result['preflight_query_timeout_ms'] = settings.get_int('global', 'preflight_query_timeout_ms', 0)
return result

def init_menu(self, result):
Expand Down
38 changes: 24 additions & 14 deletions lib/action/model/concordance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.

import hashlib
import os
import re
import urllib.parse
from collections import OrderedDict
from typing import Any, Dict, List, Optional, Tuple, Union
import asyncio
import logging

import conclib
import plugins
Expand Down Expand Up @@ -290,6 +291,13 @@ async def store_unbound_query_chain(self, chain: List[Tuple[str, ConcFormArgs]])
new_ids, _ = await self._store_conc_params()
self._active_q_data = await qp.open(new_ids[-1])

async def _archive_conc(self, user_id, conc_id):
with plugins.runtime.QUERY_PERSISTENCE as qp:
try:
await qp.archive(user_id, conc_id)
except Exception as ex:
logging.getLogger(__name__).error('Failed to archive concordance {}: {}'.format(conc_id, ex))

async def _store_conc_params(self) -> Tuple[List[str], Optional[int]]:
"""
Stores concordance operation if the query_persistence plugin is installed
Expand All @@ -303,8 +311,15 @@ async def _store_conc_params(self) -> Tuple[List[str], Optional[int]]:
with plugins.runtime.QUERY_PERSISTENCE as qp:
prev_data = self._active_q_data if self._active_q_data is not None else {}
use_history, curr_data = self.export_query_data()
ans = [await qp.store(self.session_get('user', 'id'),
curr_data=curr_data, prev_data=self._active_q_data)]
user_id = self.session_get('user', 'id')
qp_store_id = await qp.store(user_id, curr_data=curr_data, prev_data=self._active_q_data)
ans = [qp_store_id]

# archive the concordance, it may take a bit longer, so we
# do this as a non-blocking operation
task = asyncio.create_task(self._archive_conc(user_id, qp_store_id))
task.add_done_callback(lambda r: None) # we need this to ensure completion

history_ts = await self._save_query_to_history(ans[0], curr_data) if use_history else None
lines_groups = prev_data.get('lines_groups', self._lines_groups.serialize())
for q_idx, op in self._auto_generated_conc_ops:
Expand All @@ -315,7 +330,12 @@ async def _store_conc_params(self) -> Tuple[List[str], Optional[int]]:
q=getattr(self.args, 'q')[:q_idx + 1],
corpora=self.get_current_aligned_corpora(), usesubcorp=self.args.usesubcorp,
lastop_form=op.to_dict(), user_id=self.session_get('user', 'id'))
ans.append(await qp.store(self.session_get('user', 'id'), curr_data=curr, prev_data=prev))
qp_store_id = await qp.store(self.session_get('user', 'id'), curr_data=curr, prev_data=prev)
# archive the concordance, it may take a bit longer, so we
# do this as a non-blocking operation
task = asyncio.create_task(self._archive_conc(user_id, qp_store_id))
task.add_done_callback(lambda r: None) # we need this to ensure completion
ans.append(qp_store_id)
return ans, history_ts

def select_current_aligned_corpora(self, active_only: bool) -> List[str]:
Expand Down Expand Up @@ -668,16 +688,6 @@ def go_to_restore_conc(self, return_action: str):
args.append(('next', return_action))
raise ImmediateRedirectException(self._req.create_url('restore_conc', args))

@property
def preflight_id(self) -> Optional[str]:
"""
Return an identifier for storing both preflight and regular query
under the same key for further analysis and processing.
"""
if len(self.args.q) > 0:
return hashlib.sha1(self.args.q[0].encode('utf-8')).hexdigest()
return None


class ConcPluginCtx(CorpusPluginCtx):
pass
18 changes: 0 additions & 18 deletions lib/action/model/corpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@

T = TypeVar('T')


PREFLIGHT_MIN_LARGE_CORPUS = 500_000_000
"""Specifies a minimum size of a corpus to be used along with preflight queries"""


async def empty_query_store(s, uh, res):
pass

Expand Down Expand Up @@ -515,14 +510,6 @@ async def _add_corpus_related_globals(self, result, maincorp):
result['sentence_struct'] = corp_info.sentence_struct
result['doc_struct'] = self.corp.get_conf('DOCSTRUCTURE')
result['simple_query_default_attrs'] = corp_info.simple_query_default_attrs
if corp_info.preflight_subcorpus:
result['conc_preflight'] = dict(
corpname=corp_info.preflight_subcorpus.corpus_name,
subc=corp_info.preflight_subcorpus.id,
threshold_ipm=self.corp.preflight_warn_ipm,
alt_corp=corp_info.alt_corp)
else:
result['conc_preflight'] = None
poslist = []
for tagset in corp_info.tagsets:
if tagset.ident == corp_info.default_tagset:
Expand Down Expand Up @@ -612,7 +599,6 @@ async def add_globals(self, app, action_props, result):
'global', 'shuffle_min_result_warning', 100000)

result['has_subcmixer'] = plugins.runtime.SUBCMIXER.exists
result['use_conc_toolbar'] = settings.get_bool('global', 'use_conc_toolbar')
with plugins.runtime.QUERY_PERSISTENCE as qp:
result['conc_url_ttl_days'] = qp.get_conc_ttl_days(self.session_get('user', 'id'))

Expand Down Expand Up @@ -720,10 +706,6 @@ async def attach_aligned_query_params(self, tpl_out: Dict[str, Any]) -> None:
al] = [{'n': self._req.translate(x.pos), 'v': x.pattern} for x in poslist]
tpl_out['input_languages'][al] = corp_info.metadata.default_virt_keyboard if corp_info.metadata.default_virt_keyboard else corp_info.collator_locale

async def create_preflight_subcorpus(self) -> str:
with plugins.runtime.SUBC_STORAGE as sc:
return await sc.create_preflight(self.subcpath, self.corp.corpname)


class CorpusPluginCtx(UserPluginCtx, AbstractCorpusPluginCtx):

Expand Down
2 changes: 1 addition & 1 deletion lib/conclib/calc/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def compute_conc(self, corp: AbstractKCorpus, q: Tuple[str, ...], cutoff: int) -
return ans_conc


class TaskRegistration(GeneralWorker):
class ConcRegistration(GeneralWorker):

def __init__(self, task_id: str):
super().__init__(task_id=task_id)
Expand Down
32 changes: 17 additions & 15 deletions lib/conclib/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import logging
import os
from typing import List, Optional, Tuple, Union
import uuid

import aiofiles.os
import bgcalc
Expand All @@ -30,7 +31,7 @@
from conclib.calc import (
check_result, del_silent, extract_manatee_error, find_cached_conc_base,
wait_for_conc)
from conclib.calc.base import GeneralWorker
from conclib.calc.base import GeneralWorker, ConcRegistration
from conclib.common import KConc
from conclib.empty import InitialConc
from conclib.errors import ConcCalculationStatusException
Expand All @@ -39,30 +40,33 @@
from plugin_types.conc_cache import ConcCacheStatus

TASK_TIME_LIMIT = settings.get_int('calc_backend', 'task_time_limit', 300)
CONC_REGISTER_TASK_LIMIT = 5 # task itself should be super-fast
CONC_REGISTER_WAIT_LIMIT = 20 # client may be forced to wait loger due to other tasks
CONC_BG_SYNC_ALIGNED_CORP_THRESHOLD = 50000000
CONC_BG_SYNC_SINGLE_CORP_THRESHOLD = 2000000000


async def _get_async_conc(corp, user_id, q, corp_cache_key, cutoff, minsize) -> KConc:
async def _get_async_conc(corp: AbstractKCorpus, user_id, q, cutoff, minsize) -> KConc:
"""
"""
cache_map = plugins.runtime.CONC_CACHE.instance.get_mapping(corp)
status = await cache_map.get_calc_status(corp_cache_key, q, cutoff)
status = await cache_map.get_calc_status(corp.cache_key, q, cutoff)
if not status or status.error:
worker = bgcalc.calc_backend_client(settings)
ans = await worker.send_task(
'conc_register', object.__class__,
(user_id, corp.portable_ident, corp_cache_key, q, cutoff, TASK_TIME_LIMIT),
time_limit=CONC_REGISTER_TASK_LIMIT)
await ans.get(timeout=CONC_REGISTER_WAIT_LIMIT)
conc_task_id = str(uuid.uuid1().hex.encode())
task = ConcRegistration(task_id=conc_task_id)
reg_args = await task.run(corp.portable_ident, corp.cache_key, q, cutoff)
if not reg_args.get('already_running', False):
worker.send_task_sync(
'conc_calculate', object.__class__,
args=(reg_args, user_id, corp.portable_ident, corp.cache_key, q, cutoff),
soft_time_limit=TASK_TIME_LIMIT)

conc_avail = await wait_for_conc(
cache_map=cache_map, corp_cache_key=corp_cache_key, q=q, cutoff=cutoff, minsize=minsize)
cache_map=cache_map, corp_cache_key=corp.cache_key, q=q, cutoff=cutoff, minsize=minsize)
if conc_avail:
return PyConc(corp, 'l', await cache_map.readable_cache_path(corp_cache_key, q, cutoff))
return PyConc(corp, 'l', await cache_map.readable_cache_path(corp.cache_key, q, cutoff))
else:
return InitialConc(corp, await cache_map.readable_cache_path(corp_cache_key, q, cutoff))
return InitialConc(corp, await cache_map.readable_cache_path(corp.cache_key, q, cutoff))


async def get_bg_conc(
Expand Down Expand Up @@ -230,9 +234,7 @@ async def get_conc(
calc_from = 1
# use Manatee asynchronous conc. calculation (= show 1st page once it's avail.)
if asnc and len(q) == 1:
conc = await _get_async_conc(
corp=corp, user_id=user_id, q=q, corp_cache_key=corp.cache_key,
cutoff=cutoff, minsize=minsize)
conc = await _get_async_conc(corp=corp, user_id=user_id, q=q, cutoff=cutoff, minsize=minsize)
# do the calc here and return (OK for small to mid-sized corpora without alignments)
else:
conc = await _get_sync_conc(
Expand Down
14 changes: 0 additions & 14 deletions lib/corplib/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,20 +220,6 @@ def subcorpus_id(self) -> Optional[str]:
"""
pass

@property
@abstractmethod
def preflight_warn_ipm(self) -> int:
"""
Specify a rounded instances per million (i.p.m.) threshold at which
a preflight search query should be deemed a warning, indicating that
the resulting computation may be excessively lengthy.
In case the calculation cannot be performed due to corpus size
issues (e.g. size == 0), the 1,000,000 (i.e. all tokens match)
should be returned.
"""
pass

@property
@abstractmethod
def subcorpus_name(self) -> Optional[str]:
Expand Down
13 changes: 0 additions & 13 deletions lib/corplib/corpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,6 @@
from corplib.subcorpus import SubcorpusIdent
from manatee import Corpus

PREFLIGHT_THRESHOLD_FREQ = 10_000_000
"""
Specifies a minimum preflight frequency we consider
too computationally demanding which leads to a message
asking user to consider a smaller alternative corpus.
"""


class KCorpus(AbstractKCorpus):
"""
Expand Down Expand Up @@ -151,12 +144,6 @@ def compile_arf(self, attr):
def compile_docf(self, attr, doc_attr):
return self._corp.compile_docf(attr, doc_attr)

@property
def preflight_warn_ipm(self):
if self.corp.size() > 0:
return round(PREFLIGHT_THRESHOLD_FREQ / self.corp.size() * 1_000_000)
return 1_000_000

@property
def subcorpus_id(self):
return None
Expand Down
8 changes: 0 additions & 8 deletions lib/corplib/fallback.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,6 @@ def get_structattrs(self) -> List[str]:
def get_structs(self) -> List[str]:
return []

@property
def preflight_warn_ipm(self):
return 1_000_000


class ErrorCorpus(EmptyCorpus):
"""
Expand Down Expand Up @@ -180,7 +176,3 @@ def subcorpus_name(self):
@property
def portable_ident(self):
return SubcorpusIdent(id=self.subcorpus_id, corpus_name=self._corpname) if self._usesubcorp else self._corpname

@property
def preflight_warn_ipm(self):
return 1_000_000
7 changes: 5 additions & 2 deletions lib/plugin_types/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class GetUserInfo(TypedDict):


class CorpusAccess(NamedTuple):
is_owner: bool
is_author: bool
has_read_access: bool
corpus_variant: str

Expand Down Expand Up @@ -126,6 +126,9 @@ def anonymous_user(self, plugin_ctx) -> UserInfo:
email=None,
api_key=None)

def anonymous_user_id(self):
return self._anonymous_id

def is_anonymous(self, user_id: int) -> bool:
return user_id == self._anonymous_id

Expand All @@ -142,7 +145,7 @@ def is_administrator(self, user_id: int) -> bool:
@abc.abstractmethod
async def corpus_access(self, user_dict: UserInfo, corpus_name: str) -> CorpusAccess:
"""
Return a 3-tuple (is owner, has read access, corpus variant)
Return a 3-tuple (is author, has read access, corpus variant)
"""

@abc.abstractmethod
Expand Down
2 changes: 0 additions & 2 deletions lib/plugin_types/corparch/corpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,6 @@ class CorpusInfo:
simple_query_default_attrs: List[str] = field(default_factory=list)
part_of_ml_corpus: bool = False
ml_position_filter: MLPositionFilter = MLPositionFilter.none
preflight_subcorpus: Optional[SubcorpusIdent] = None
alt_corp: Optional[str] = None

def localized_desc(self, lang) -> str:
if lang.split('_')[0] == 'cs':
Expand Down
21 changes: 21 additions & 0 deletions lib/plugin_types/integration_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,24 @@ async def on_response(self):
This is the right place to close the connection.
"""
pass

async def on_aio_task_enter(self):
"""
This is called by KonText each time it is about to create
a new asyncio task. In such case, any existing database connection
cannot be relied on. Calling this gives the backend a chance
to create a new connection within the current context.
See also on_aio_task_exit()
"""
pass

async def on_aio_task_exit(self):
"""
This is called by KonText each time an existing asyncio task
is about to finish. This gives the backend a chance to clean up
(close) the database connection.
See also on_aio_task_enter()
"""
pass
Loading

0 comments on commit eb634c4

Please sign in to comment.