Skip to content

Commit

Permalink
Merge pull request #6320 from mzimandl/fullsearch
Browse files Browse the repository at this point in the history
Fulltext search improvements
  • Loading branch information
tomachalek authored Oct 11, 2024
2 parents b668bd4 + f46c007 commit d2ff7ea
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 79 deletions.
3 changes: 0 additions & 3 deletions lib/action/model/concordance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import urllib.parse
from collections import OrderedDict
from typing import Any, Dict, List, Optional, Tuple, Union
import asyncio
import logging
from sanic import Sanic

import conclib
Expand Down Expand Up @@ -302,7 +300,6 @@ async def _store_conc_params(self) -> Tuple[List[str], Optional[int]]:
ID of the stored operation (or the current ID of nothing was stored),
UNIX timestamp of stored history item (or None)
"""
application = Sanic.get_app('kontext')
with plugins.runtime.QUERY_PERSISTENCE as qp:
prev_data = self._active_q_data if self._active_q_data is not None else {}
use_history, curr_data = self.export_query_data()
Expand Down
1 change: 0 additions & 1 deletion lib/conclib/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import logging
import os
from typing import List, Optional, Tuple, Union
import uuid

import aiofiles.os
import bgcalc
Expand Down
6 changes: 3 additions & 3 deletions lib/plugin_types/query_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async def store(self, user_id: int, query_id: str, q_supertype: str) -> int:
"""

@abc.abstractmethod
async def make_persistent(self, user_id: int, query_id: str, q_supertype: str, created: Optional[int], name: str):
async def make_persistent(self, plugin_ctx, user_id: int, query_id: str, q_supertype: str, created: Optional[int], name: str):
"""
Finds (if implemented) a specific query history
record based on its respective concordance record.
Expand All @@ -68,14 +68,14 @@ async def make_persistent(self, user_id: int, query_id: str, q_supertype: str, c
"""

@abc.abstractmethod
async def make_transient(self, user_id: int, query_id: str, created: int, name: str):
async def make_transient(self, plugin_ctx, user_id: int, query_id: str, created: int, name: str):
"""
Remove name from the history item and let it be
removed once it gets too old
"""

@abc.abstractmethod
async def delete(self, user_id, query_id, created):
async def delete(self, plugin_ctx, user_id, query_id, created):
"""
Delete a named query from history.
Expand Down
6 changes: 3 additions & 3 deletions lib/plugins/default_query_history/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async def store(self, user_id, query_id, q_supertype):
await self._delete_old_records(user_id)
return ts

async def make_persistent(self, user_id, query_id, q_supertype, created, name):
async def make_persistent(self, plugin_ctx, user_id, query_id, q_supertype, created, name):
k = self._mk_key(user_id)
data = await self.db.list_get(k)
last_match_idx = -1
Expand All @@ -117,7 +117,7 @@ async def make_persistent(self, user_id, query_id, q_supertype, created, name):
await self.db.list_append(self._mk_key(user_id), item)
return True

async def make_transient(self, user_id, query_id, created, name):
async def make_transient(self, plugin_ctx, user_id, query_id, created, name):
k = self._mk_key(user_id)
data = await self.db.list_get(k)
for i, item in enumerate(data):
Expand All @@ -127,7 +127,7 @@ async def make_transient(self, user_id, query_id, created, name):
return True
return False

async def delete(self, user_id, query_id, created):
async def delete(self, plugin_ctx, user_id, query_id, created):
k = self._mk_key(user_id)
data = await self.db.list_get(k)
await self.db.remove(k)
Expand Down
10 changes: 5 additions & 5 deletions lib/plugins/mysql_query_history/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"""

import logging
from datetime import datetime
from datetime import datetime, timezone
from typing import Dict, Tuple

import plugins
Expand Down Expand Up @@ -93,7 +93,7 @@ def __init__(
self._page_num_records = int(conf.get('plugins', 'query_history')['page_num_records'])

async def store(self, user_id, query_id, q_supertype):
created = int(datetime.utcnow().timestamp())
created = int(datetime.now(timezone.utc).timestamp())
corpora = (await self._query_persistence.open(query_id))['corpora']
async with self._db.cursor() as cursor:
await cursor.executemany(
Expand All @@ -113,18 +113,18 @@ async def _update_name(self, user_id, query_id, created, new_name) -> bool:
)
return cursor.rowcount > 0

async def make_persistent(self, user_id, query_id, q_supertype, created, name) -> bool:
async def make_persistent(self, plugin_ctx, user_id, query_id, q_supertype, created, name) -> bool:
if await self._update_name(user_id, query_id, created, name):
await self._query_persistence.archive(query_id, True)
else:
c = await self.store(user_id, query_id, q_supertype)
await self._update_name(user_id, query_id, c, name)
return True

async def make_transient(self, user_id, query_id, created, name) -> bool:
async def make_transient(self, plugin_ctx, user_id, query_id, created, name) -> bool:
return await self._update_name(user_id, query_id, created, None)

async def delete(self, user_id, query_id, created):
async def delete(self, plugin_ctx, user_id, query_id, created):
async with self._db.cursor() as cursor:
await cursor.execute(
f'DELETE FROM {self.TABLE_NAME} WHERE user_id = %s AND query_id = %s AND created = %s',
Expand Down
6 changes: 3 additions & 3 deletions lib/plugins/null_query_history/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ def __init__(self):
async def store(self, user_id, query_id, q_supertype):
return int(datetime.utcnow().timestamp())

async def make_persistent(self, user_id, query_id, q_supertype, created, name) -> bool:
async def make_persistent(self, plugin_ctx, user_id, query_id, q_supertype, created, name) -> bool:
return True

async def make_transient(self, user_id, query_id, created, name) -> bool:
async def make_transient(self, plugin_ctx, user_id, query_id, created, name) -> bool:
return True

async def delete(self, user_id, query_id, created):
async def delete(self, plugin_ctx, user_id, query_id, created):
return 0

async def get_user_queries(
Expand Down
173 changes: 114 additions & 59 deletions lib/plugins/ucnk_query_history/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import logging
from datetime import datetime, timezone
from urllib.parse import quote
from urllib.parse import urljoin
from typing import Optional
from urllib.parse import urljoin, urlencode
import ujson as json

import plugins
Expand All @@ -32,6 +32,7 @@
from plugins import inject
from plugins.mysql_query_history import MySqlQueryHistory
from plugins.mysql_integration_db import MySqlIntegrationDb
from plugins.ucnk_query_persistence import UCNKQueryPersistence
from plugins.common.mysql.adhocdb import AdhocDB
from plugins.common.mysql import MySQLConf

Expand Down Expand Up @@ -84,6 +85,27 @@ def __init__(
def supports_fulltext_search(self):
return True

async def store(self, user_id, query_id, q_supertype):
created = await super().store(user_id, query_id, q_supertype)
if isinstance(self._query_persistence, UCNKQueryPersistence):
await self._query_persistence.queue_history(conc_id=query_id, user_id=user_id, created=created)
return created

async def make_persistent(self, plugin_ctx, user_id, query_id, q_supertype, created, name) -> bool:
done = await super().make_persistent(plugin_ctx, user_id, query_id, q_supertype, created, name)
await self._update_indexed_name(plugin_ctx, query_id, user_id, created, name)
return done

async def make_transient(self, plugin_ctx, user_id, query_id, created, name) -> bool:
done = await super().make_transient(plugin_ctx, user_id, query_id, created, name)
await self._update_indexed_name(plugin_ctx, query_id, user_id, created)
return done

async def delete(self, plugin_ctx, user_id, query_id, created):
done = await super().delete(plugin_ctx, user_id, query_id, created)
await self._delete_indexed_item(plugin_ctx, query_id, user_id, created)
return done

async def delete_old_records(self):
"""
Deletes records older than ttl_days. Named records are
Expand All @@ -108,86 +130,119 @@ async def delete_old_records(self):
raise ex

@staticmethod
def generate_query_string(
def _generate_query_string(
q_supertype: str,
user_id: int,
corpname: str,
full_search_args: FullSearchArgs
archived_only: bool,
full_search_args: Optional[FullSearchArgs]
) -> str:
parts = [f'+user_id:{user_id}']
if archived_only:
parts.append('+name:/.*/')

if q_supertype:
parts.append(make_bleve_field('query_supertype', q_supertype))

if corpname:
parts.append(make_bleve_field('corpora', corpname))

if full_search_args.subcorpus:
parts.append(make_bleve_field('subcorpus', full_search_args.subcorpus))

if full_search_args.any_property_value:
parts.append(make_bleve_field('_all', full_search_args.any_property_value))

else:
if q_supertype in ('conc', 'pquery'):
if full_search_args.posattr_name:
parts.append(make_bleve_field('pos_attr_names', full_search_args.posattr_name))
if full_search_args.posattr_value:
parts.append(make_bleve_field('pos_attr_values', full_search_args.posattr_value))
if full_search_args.structattr_name:
parts.append(make_bleve_field('struct_attr_names', full_search_args.structattr_name))
if full_search_args.structattr_value:
parts.append(make_bleve_field('struct_attr_values', full_search_args.structattr_value))

elif q_supertype == 'wlist':
if full_search_args.wl_pat:
parts.append(make_bleve_field('raw_query', full_search_args.wl_pat))
if full_search_args.wl_attr:
parts.append(make_bleve_field('pos_attr_names', full_search_args.wl_attr))
if full_search_args.wl_pfilter:
parts.append(make_bleve_field('pfilter_words', full_search_args.wl_pfilter))
if full_search_args.wl_nfilter:
parts.append(make_bleve_field('nfilter_words', full_search_args.wl_nfilter))

elif q_supertype == 'kwords':
if full_search_args.wl_attr:
parts.append(make_bleve_field('pos_attr_names', full_search_args.posattr_name))

return quote(' '.join(parts))
if full_search_args is not None:
if full_search_args.subcorpus:
parts.append(make_bleve_field('subcorpus', full_search_args.subcorpus))

if full_search_args.any_property_value:
parts.append(make_bleve_field('_all', full_search_args.any_property_value))

else:
if q_supertype in ('conc', 'pquery'):
if full_search_args.posattr_name:
parts.append(make_bleve_field('pos_attr_names', full_search_args.posattr_name))
if full_search_args.posattr_value:
parts.append(make_bleve_field('pos_attr_values', full_search_args.posattr_value))
if full_search_args.structattr_name:
parts.append(make_bleve_field('struct_attr_names', full_search_args.structattr_name))
if full_search_args.structattr_value:
parts.append(make_bleve_field('struct_attr_values', full_search_args.structattr_value))

elif q_supertype == 'wlist':
if full_search_args.wl_pat:
parts.append(make_bleve_field('raw_query', full_search_args.wl_pat))
if full_search_args.wl_attr:
parts.append(make_bleve_field('pos_attr_names', full_search_args.wl_attr))
if full_search_args.wl_pfilter:
parts.append(make_bleve_field('pfilter_words', full_search_args.wl_pfilter))
if full_search_args.wl_nfilter:
parts.append(make_bleve_field('nfilter_words', full_search_args.wl_nfilter))

elif q_supertype == 'kwords':
if full_search_args.wl_attr:
parts.append(make_bleve_field('pos_attr_names', full_search_args.posattr_name))

return ' '.join(parts)

async def get_user_queries(
self, plugin_ctx, user_id, corpus_factory, from_date=None, to_date=None, q_supertype=None, corpname=None,
archived_only=False, offset=0, limit=None, full_search_args=None):

if full_search_args is None:
return await super().get_user_queries(plugin_ctx, user_id, corpus_factory, from_date, to_date, q_supertype, corpname, archived_only, offset, limit)
return await super().get_user_queries(plugin_ctx, user_id, corpus_factory, from_date, to_date, q_supertype, corpname, archived_only, offset, limit, full_search_args)

params = {
'q': self._generate_query_string(q_supertype, user_id, corpname, archived_only, full_search_args),
'order': '-created' if full_search_args is None else '-_score,-created',
'limit': limit,
'fields': 'query_supertype,name',
}

q = self.generate_query_string(q_supertype, user_id, corpname, full_search_args)
async with plugin_ctx.request.ctx.http_client.get(
urljoin(self._fulltext_service_url, f'/indexer/search') + f'?q={q}') as resp:
url_query = urlencode(list(params.items()))
url = urljoin(self._fulltext_service_url, f'/indexer/search?{url_query}')
async with plugin_ctx.request.ctx.http_client.get(url) as resp:
index_data = await resp.json()
logging.debug(index_data['hits'])
return await self.get_user_queries_from_ids(plugin_ctx, corpus_factory, user_id, index_data['hits'])

async def get_user_queries_from_ids(self, plugin_ctx, corpus_factory, user_id, queries):
if len(queries) == 0:
return []
async with self._db.cursor() as cursor:
await cursor.execute(f'''
WITH q(sort, id) AS ( VALUES {', '.join('(%s, %s)' for _ in queries)} )
SELECT q.sort, t.query_id, t.created, t.name, t.q_supertype
FROM q
JOIN {self.TABLE_NAME} AS t ON q.id = t.query_id
WHERE user_id = %s
ORDER BY q.sort ASC, t.created DESC
''', [*(x for i, q in enumerate(queries) for x in (i, q['id'])), user_id])

rows = [item for item in await cursor.fetchall()]
full_data = await self._process_rows(plugin_ctx, corpus_factory, rows)

rows = []
for hit in index_data['hits']:
user_id, created, query_id, *_ = hit['id'].split('/')
rows.append({
'query_id': query_id,
'created': int(created),
'q_supertype': hit['fields']['query_supertype'],
'name': hit['fields']['name'],
})

full_data = await self._process_rows(plugin_ctx, corpus_factory, rows)
for i, item in enumerate(full_data):
item['idx'] = i

return full_data

async def _update_indexed_name(self, plugin_ctx, query_id, user_id, created, new_name = ""):
params = {
'queryId': query_id,
'userId': user_id,
'created': created,
'name': new_name,
}

url_query = urlencode(list(params.items()))
url = urljoin(self._fulltext_service_url, f'/indexer/update?{url_query}')
async with plugin_ctx.request.ctx.http_client.get(url) as resp:
if not resp.ok:
data = await resp.json()
raise Exception(f'Failed to update query in index: {data}')

async def _delete_indexed_item(self, plugin_ctx, query_id, user_id, created):
params = {
'queryId': query_id,
'userId': user_id,
'created': created,
}

url_query = urlencode(list(params.items()))
url = urljoin(self._fulltext_service_url, f'/indexer/delete?{url_query}')
async with plugin_ctx.request.ctx.http_client.get(url) as resp:
if not resp.ok:
data = await resp.json()
raise Exception(f'Failed to delete query from index: {data}')


@inject(
Expand Down
7 changes: 5 additions & 2 deletions lib/plugins/ucnk_query_persistence/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,12 @@ def __init__(
plugin_conf = settings.get('plugins', 'query_persistence')
self._archive_queue_key = plugin_conf['archive_queue_key']

async def archive(self, conc_id, explicit):
await self.db.list_append(self._archive_queue_key, dict(key=conc_id, explicit=explicit))
async def archive(self, conc_id: str, explicit: bool):
await self.db.list_append(self._archive_queue_key, dict(type="archive", key=conc_id, explicit=explicit))
return await self.db.get(mk_key(conc_id))

async def queue_history(self, conc_id: str, created: int, user_id: int, name: str = ""):
await self.db.list_append(self._archive_queue_key, dict(type="history", key=conc_id, created=created, user_id=user_id, name=name))

def export_tasks(self):
return tuple()
Expand Down
3 changes: 3 additions & 0 deletions lib/views/concordance.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ async def save_query(amodel: UserActionModel, req: KRequest, resp: KResponse):
await qp.archive(conc_id, True)

hsave = await qh.make_persistent(
amodel.plugin_ctx,
amodel.session_get('user', 'id'),
req.json['query_id'],
qp.stored_query_supertype(data),
Expand All @@ -559,6 +560,7 @@ async def unsave_query(amodel: UserActionModel, req: KRequest, resp: KResponse):
# not want to keep the query in their history
with plugins.runtime.QUERY_HISTORY as qh:
ans = await qh.make_transient(
amodel.plugin_ctx,
amodel.session_get('user', 'id'),
req.json['query_id'],
req.json['created'],
Expand All @@ -573,6 +575,7 @@ async def delete_query(amodel: UserActionModel, req: KRequest, resp: KResponse):
# remove query from history (respective results are kept)
with plugins.runtime.QUERY_HISTORY as qh:
ans = await qh.delete(
amodel.plugin_ctx,
amodel.session_get('user', 'id'),
req.json['query_id'],
int(req.json['created'])
Expand Down

0 comments on commit d2ff7ea

Please sign in to comment.