Skip to content

Commit

Permalink
Merge branch 'master' into release-0.18.x
Browse files Browse the repository at this point in the history
  • Loading branch information
tomachalek committed Sep 27, 2024
2 parents cd9e101 + 95fc181 commit 475d4be
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 17 deletions.
1 change: 1 addition & 0 deletions lib/action/model/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ async def add_globals(self, app: Sanic, action_props: ActionProps, result: Dict[
result['issue_reporting_action'] = None
result['help_links'] = {}
result['_version'] = None
result['supports_query_history_fulltext'] = False
return result

def init_menu(self, result):
Expand Down
2 changes: 1 addition & 1 deletion lib/conclib/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async def _get_async_conc(corp: AbstractKCorpus, user_id, q, cutoff, minsize) ->
status = await cache_map.get_calc_status(corp.cache_key, q, cutoff)
if not status or status.error:
worker = bgcalc.calc_backend_client(settings)
task = ConcRegistration(task_id=None) # task ID will be filled by worker
task = ConcRegistration(task_id=None) # task ID will be filled by workerurey
reg_args = await task.run(corp.portable_ident, corp.cache_key, q, cutoff)
if not reg_args.get('already_running', False):
worker.send_task_sync(
Expand Down
9 changes: 7 additions & 2 deletions lib/plugin_types/general_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,13 @@ async def get_instance(self, plugin_id):
"""
return self

async def subscribe_task(self, channel_id: str, handler: Callable[[str], Awaitable[bool]]):
async def subscribe_channel(self, channel_id: str, handler: Callable[[str], Awaitable[bool]]):
"""
Returns:
"""
raise NotImplementedError

async def publish(self, channel_id: str, msg: str):
async def publish_channel(self, channel_id: str, msg: str):
raise NotImplementedError
4 changes: 2 additions & 2 deletions lib/plugins/redis_db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ async def hash_set_map(self, key, mapping):
async def keys(self, pattern: str = '*'):
return [key.decode() for key in await self._redis.keys(pattern)]

async def subscribe_task(self, channel_id: str, handler: Callable[[str], Awaitable[bool]]):
async def subscribe_channel(self, channel_id: str, handler: Callable[[str], Awaitable[bool]]):
psub = self._redis.pubsub()
async with psub as channel:
await channel.subscribe(channel_id)
Expand All @@ -289,7 +289,7 @@ async def subscribe_task(self, channel_id: str, handler: Callable[[str], Awaitab
await channel.unsubscribe(channel_id)
await psub.close()

async def publish(self, channel_id: str, msg: str):
async def publish_channel(self, channel_id: str, msg: str):
await self._redis.publish(channel_id, msg)


Expand Down
61 changes: 55 additions & 6 deletions lib/plugins/ucnk_query_history/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.

import logging
from datetime import datetime, timezone
import ujson as json

import plugins
from plugin_types.query_persistence import AbstractQueryPersistence
from plugin_types.subc_storage import AbstractSubcArchive
from plugin_types.auth import AbstractAuth
from plugin_types.general_storage import KeyValueStorage
from plugins import inject
from plugins.mysql_query_history import MySqlQueryHistory
from plugins.mysql_integration_db import MySqlIntegrationDb
Expand All @@ -31,28 +34,74 @@


class UcnkQueryHistory(MySqlQueryHistory):
"""
UcnkQueryHistory is a modified version of MySqlQueryHistory containing
fulltext search in queries using an external service (Camus).
"""

def __init__(
self,
conf,
db: MySqlIntegrationDb,
kvdb: KeyValueStorage,
query_persistence: AbstractQueryPersistence,
subc_archive: AbstractSubcArchive,
auth: AbstractAuth):
super().__init__(conf, db, query_persistence, subc_archive, auth)
self._kvdb = kvdb
self._del_channel = conf.get('plugins', 'query_history').get(
'fulltext_deleting_channel', 'query_history_fulltext_del_channel'
)
self._del_chunk_size = int(conf.get('plugins', 'query_history').get(
'fulltext_num_delete_per_check', '500')
)

def supports_fulltext_search(self):
return True

async def delete_old_records(self):
"""
Deletes records older than ttl_days. Named records are
kept intact.
"""
async with self._db.connection() as conn:
async with await conn.cursor(dictionary=True) as cursor:
await self._db.begin_tx(cursor)
try:
await cursor.execute(
f'SELECT query_id, user_id, created FROM {self.TABLE_NAME} WHERE created < %s AND name IS NULL LIMIT %s',
(int(datetime.now(tz=timezone.utc).timestamp()) - self.ttl_days * 3600 * 24, self._del_chunk_size)
)
for row in await cursor.fetchall():
await cursor.execute(
f'DELETE FROM {self.TABLE_NAME} WHERE query_id = %s AND user_id = %s AND created = %s',
(row['query_id'], row['user_id'], row['created']))
await self._kvdb.publish_channel(self._del_channel, json.dumps(row))
await conn.commit_tx()
except Exception as ex:
await conn.rollback_tx()
raise ex


@inject(
plugins.runtime.INTEGRATION_DB,
plugins.runtime.QUERY_PERSISTENCE,
plugins.runtime.SUBC_STORAGE,
plugins.runtime.AUTH
plugins.runtime.AUTH,
plugins.runtime.DB
)
def create_instance(
conf,
integ_db: MySqlIntegrationDb,
query_persistence: AbstractQueryPersistence,
subc_archive: AbstractSubcArchive,
auth: AbstractAuth
auth: AbstractAuth,
kvdb: KeyValueStorage
):
plugin_conf = conf.get('plugins', 'auth')
if integ_db and integ_db.is_active and 'mysql_host' not in plugin_conf:
auth_plg_conf = conf.get('plugins', 'auth')
if integ_db and integ_db.is_active and 'mysql_host' not in auth_plg_conf:
db = integ_db
logging.getLogger(__name__).info(f'ucnk_query_history uses integration_db[{integ_db.info}]')
else:
db = AdhocDB(MySQLConf.from_conf(plugin_conf))
return UcnkQueryHistory(conf, db, query_persistence, subc_archive, auth)
db = AdhocDB(MySQLConf.from_conf(auth_plg_conf))
return UcnkQueryHistory(conf, db, kvdb, query_persistence, subc_archive, auth)
13 changes: 13 additions & 0 deletions lib/plugins/ucnk_query_history/config.rng
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@
<element name="ttl_days">
<data type="positiveInteger" />
</element>
<optional>
<element name="fulltext_deleting_channel">
<text />
</element>
<a:documentation>A Redis PUBSUB channel through which KonText sends requests
to delete items. The receiver is the Camus service. Both application must use
the same name for the channel.</a:documentation>
</optional>
<optional>
<element name="fulltext_num_delete_per_check">
<data type="positiveInteger" />
</element>
</optional>
<element name="fulltext_service_url">
<text />
</element>
Expand Down
10 changes: 5 additions & 5 deletions public/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async def signal_handler(msg: str):
return False # returns if receiver should stop

with plugins.runtime.DB as db:
await db.subscribe_task(SIGNAL_CHANNEL_ID, signal_handler)
await db.subscribe_channel(SIGNAL_CHANNEL_ID, signal_handler)
logging.getLogger(__name__).debug(
"Worker `%s` subscribed to `%s`", app.m.pid, SIGNAL_CHANNEL_ID)

Expand All @@ -238,7 +238,7 @@ async def signal_handler(msg: str):
try:
receiver.result()
except NotImplementedError:
logging.info("DB subscribe_task not implemented, crossworker signal handler disabled")
logging.info("DB subscribe_channel not implemented, crossworker signal handler disabled")
except Exception as e:
logging.error("Error while running receiver", exc_info=e)
app.ctx.receiver = receiver
Expand Down Expand Up @@ -319,7 +319,7 @@ async def soft_reset(req):
logging.getLogger(__name__).warning("expected = {}".format(application.ctx.soft_restart_token))
if req.args.get('key') == application.ctx.soft_restart_token:
with plugins.runtime.DB as db:
await db.publish(SIGNAL_CHANNEL_ID, 'kontext.internal.reset')
await db.publish_channel(SIGNAL_CHANNEL_ID, 'kontext.internal.reset')
return json(dict(ok=True))
else:
raise NotFound()
Expand All @@ -328,7 +328,7 @@ async def soft_reset(req):
async def sigusr1_handler():
logging.getLogger(__name__).warning('Caught signal SIGUSR1')
with plugins.runtime.DB as db:
await db.publish(SIGNAL_CHANNEL_ID, 'kontext.internal.reset')
await db.publish_channel(SIGNAL_CHANNEL_ID, 'kontext.internal.reset')


def get_locale(request: Request) -> str:
Expand Down Expand Up @@ -387,7 +387,7 @@ def get_locale(request: Request) -> str:

if args.soft_reset:
with plugins.runtime.DB as db:
asyncio.run(db.publish(SIGNAL_CHANNEL_ID, 'kontext.internal.reset'))
asyncio.run(db.publish_channel(SIGNAL_CHANNEL_ID, 'kontext.internal.reset'))
print('Soft reset signal published')
sys.exit(0)

Expand Down
9 changes: 8 additions & 1 deletion public/files/js/models/searchHistory/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ export class SearchHistoryModel extends StatefulModel<SearchHistoryModelState> {
[Actions.ToggleQueryHistoryWidget, MainMenuActions.ShowQueryHistory],
action => {
this.changeState(state => {
state.isBusy = true
state.isBusy = true;
});
if (this.isToggleWidgetAction(action)) {
this.performLoadAction(true);
Expand Down Expand Up @@ -415,6 +415,13 @@ export class SearchHistoryModel extends StatefulModel<SearchHistoryModelState> {
this.pageModel.showMessage('error', err);
},
});

} else {
this.changeState(
state => {
state.isBusy = false;
}
);
}
}

Expand Down

0 comments on commit 475d4be

Please sign in to comment.