From f295a7988a88abda49180b85da3623bd5aff5ab4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thibault=20Cl=C3=A9rice?= Date: Wed, 20 Mar 2024 09:41:27 +0100 Subject: [PATCH] Extract search function from the view for more in-depth testing (ToDo one day) --- app/main/views/tokens.py | 104 +++------------ app/models/corpus.py | 118 ++++++++++++++++-- .../main/tokens_search_through_fields.html | 2 +- 3 files changed, 125 insertions(+), 99 deletions(-) diff --git a/app/main/views/tokens.py b/app/main/views/tokens.py index f7b2f90e..ff4d448d 100644 --- a/app/main/views/tokens.py +++ b/app/main/views/tokens.py @@ -2,20 +2,15 @@ stream_with_context from flask_login import current_user, login_required from slugify import slugify -from sqlalchemy.sql.elements import or_, and_ -from sqlalchemy.sql.expression import not_ -from sqlalchemy import func import math from csv import DictWriter from io import StringIO -from itertools import product -from typing import Dict, Optional, List, Tuple +from typing import Dict from .utils import render_template_with_nav_info, request_wants_json, requires_corpus_access from .. import main -from ... import db -from ...models import WordToken, Corpus, ChangeRecord, TokenHistory, Bookmark, CorpusCustomDictionary -from ...utils.forms import string_to_none, strip_or_none, column_search_filter, prepare_search_string +from ...models import WordToken, Corpus, ChangeRecord, TokenHistory, Bookmark +from ...utils.forms import string_to_none from ...utils.pagination import int_or from ...utils.tsv import TSV_CONFIG, stream_tsv from ...utils.response import stream_template @@ -274,88 +269,17 @@ def tokens_search_through_fields(corpus_id): # test suppression: if not corpus.has_access(current_user): abort(403) - # nom des colonnes disponibles pour le corpus (POS, form, etc) - columns = tuple(["form"] + [ - col if col == "POS" else col.lower() - for col in corpus.get_columns_headings() - ]) - - input_values: Dict[str, Optional[str]] = {} - - # make a dict with values split for each OR operator - fields: Dict[str, List[str]] = {} - source_dict: Dict[str, str] = request.form if request.method == "POST" else request.args - - for name in columns: - value: Optional[str] = strip_or_none(source_dict.get(name)) - input_values[name] = value - - # split values with the '|' OR operator but keep escaped '\|' ones - if value: - fields[name] = prepare_search_string(value) - - # all search combinations - flat_fields: List[List[Tuple[str, str]]] = [ - [ - (field, value) - for value in fields[field] - ] - for field in fields - ] - # Création combinaison de recherches possibles pipe product - # If source_dict = {"POS": "NOM|VER", "lemma": "mang*"} - # Then flat_fields = [[("POS", "NOM"), ("POS", "VER")], [("lemma", "mang*")]] - # And search_branches : - # [{"POS": "NOM", "lemma": "mang*"}, {"POS": "VER", "lemma": "mang*"}] - # * => flat_fields = [["a", "b"], ["c"]] - # product(*flat_fields) == product(flat_fields[0], flat_fields[1]) - search_branches: List[Dict[str, str]] = [dict(prod) for prod in product(*flat_fields)] - - value_filters = [] - case_insensitive = True - if 'caseBox' in source_dict: - case_insensitive = False - # for each branch filter (= OR clauses if any) - for search_branch in search_branches: - # filtre minimal = bon corpus (id) - branch_filters = [WordToken.corpus == corpus_id] - - # for each field (lemma, pos, form, morph) - for name, value in search_branch.items(): - # transformation couple clé valeur en filtre SQLalchemy - branch_filters.extend(column_search_filter(getattr(WordToken, name), value, case_sensitive=case_insensitive)) - - value_filters.append(branch_filters) - - if not value_filters: # If the search is empty, we only search for the corpus_id - value_filters.append([WordToken.corpus == corpus_id]) - - # there is at least one OR clause - # get sort arguments (sort per default by WordToken.order_id) - order_by_key = request.args.get("orderBy") - order_by = { - "order_id": WordToken.order_id, - "lemma": func.lower(WordToken.lemma), - "pos": func.lower(WordToken.POS), - "form": func.lower(WordToken.form), - "morph": func.lower(WordToken.morph), - } - if order_by_key not in order_by: - order_by_key = "order_id" - order_by = order_by.get(order_by_key) - - args = [] - - if len(value_filters) > 1: - and_filters = [and_(*branch_filters) for branch_filters in value_filters] - args = [or_(*and_filters)] - elif len(value_filters) == 1: - args = value_filters[0] - - tokens = WordToken.query.filter(*args).order_by( - order_by.desc() - if bool(int(request.args.get("desc", "0"))) # default sort order is ascending - else order_by + + form: Dict[str, str] = request.form if request.method == "POST" else request.args + token_dict: Dict[str, str] = { + key: value + for key, value in form.items() + if key not in {"caseBox", "page", "limit"} + } + tokens, order_by_key, input_values = corpus.token_search( + token_dict=token_dict, + case_sensitive='caseBox' not in form, + desc=int(request.args.get("desc", "0")) ) page = int_or(request.args.get("page"), 1) diff --git a/app/models/corpus.py b/app/models/corpus.py index 7300566e..57fd2bdb 100644 --- a/app/models/corpus.py +++ b/app/models/corpus.py @@ -2,22 +2,25 @@ import csv import io import enum -from typing import Iterable, Optional, Dict, List +from typing import Iterable, Optional, Dict, List, Tuple +from itertools import product # PIP Packages import unidecode +import sqlalchemy.exc from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.orm import backref -import sqlalchemy.exc -from sqlalchemy import func, literal, not_ +from sqlalchemy import func, literal, not_, or_, and_ from werkzeug.exceptions import BadRequest from flask import url_for + # Application imports -from .. import db -from ..utils import validate_length -from ..utils.forms import strip_or_none -from ..utils.tsv import TSV_CONFIG -from ..errors import MissingTokenColumnValue, NoTokensInput +from app import db +from app.utils import validate_length +from app.utils.tsv import TSV_CONFIG +from app.errors import MissingTokenColumnValue, NoTokensInput from app.utils import PreferencesUpdateError +from app.utils.forms import strip_or_none, column_search_filter, prepare_search_string + # Models from .user import User from .control_lists import ControlLists, AllowedPOS, AllowedMorph, AllowedLemma, PublicationStatus @@ -595,6 +598,105 @@ def insert_custom_dictionary_value(self, category: str, string: str) -> bool: )) db.session.commit() + def token_search( + self, + token_dict: Dict[str, str], + order_by_key: str = "order_id", + desc: bool = False, + case_sensitive: bool = False + ): + """ Perform a complex search on tokens, returns a query + + ToDo: Add a proximity filter that allows contextual search + ToDo: Add test outside of the interface + + :param token_dict: + :param order_by_key: + :param desc: + :param case_sensitive: + :return: tokens, order_by_key, input_values + + Corpus.token_search({"form": "d*"}) + """ + # nom des colonnes disponibles pour le corpus (POS, form, etc) + columns = tuple(["form"] + [ + col if col == "POS" else col.lower() + for col in self.get_columns_headings() + ]) + + # Cleaned up values from the input form + input_values: Dict[str, Optional[str]] = {} + + # make a dict with values split for each OR operator + fields: Dict[str, List[str]] = {} + + for name in columns: + value: Optional[str] = strip_or_none(token_dict.get(name)) + input_values[name] = value + + # split values with the '|' OR operator but keep escaped '\|' ones + if value: + fields[name] = prepare_search_string(value) + + # all search combinations + fields: List[List[Tuple[str, str]]] = [ + [ + (field, value) + for value in fields[field] + ] + for field in fields + ] + # Création combinaison de recherches possibles pipe product + # If source_dict = {"POS": "NOM|VER", "lemma": "mang*"} + # Then fields = [[("POS", "NOM"), ("POS", "VER")], [("lemma", "mang*")]] + # And search_branches : + # [{"POS": "NOM", "lemma": "mang*"}, {"POS": "VER", "lemma": "mang*"}] + # * => fields = [["a", "b"], ["c"]] + # product(*fields) == product(fields[0], fields[1]) + search_branches: List[Dict[str, str]] = [dict(prod) for prod in product(*fields)] + + value_filters = [] + # for each branch filter (= OR clauses if any) + for search_branch in search_branches: + # filtre minimal = bon corpus (id) + branch_filters = [WordToken.corpus == self.id] + + # for each field (lemma, pos, form, morph) + for name, value in search_branch.items(): + # transformation couple clé valeur en filtre SQLalchemy + branch_filters.extend( + column_search_filter(getattr(WordToken, name), value, case_sensitive=case_sensitive)) + + value_filters.append(branch_filters) + + if not value_filters: # If the search is empty, we only search for the corpus_id + value_filters.append([WordToken.corpus == self.id]) + + # there is at least one OR clause + # get sort arguments (sort per default by WordToken.order_id) + order_by = { + "order_id": WordToken.order_id, + "lemma": func.lower(WordToken.lemma), + "pos": func.lower(WordToken.POS), + "form": func.lower(WordToken.form), + "morph": func.lower(WordToken.morph), + } + if order_by_key not in order_by: + order_by_key = "order_id" + order_by = order_by.get(order_by_key) + + args = [] + + if len(value_filters) > 1: + and_filters = [and_(*branch_filters) for branch_filters in value_filters] + args = [or_(*and_filters)] + elif len(value_filters) == 1: + args = value_filters[0] + + tokens = WordToken.query.filter(*args).order_by(order_by.desc() if desc else order_by) + + return tokens, order_by_key, input_values + class WordToken(db.Model): """ A word token is a word from a corpus with primary annotation diff --git a/app/templates/main/tokens_search_through_fields.html b/app/templates/main/tokens_search_through_fields.html index f425dbec..2db2e862 100644 --- a/app/templates/main/tokens_search_through_fields.html +++ b/app/templates/main/tokens_search_through_fields.html @@ -59,7 +59,7 @@

{{ _('Corpus') }} {{ corpus.na - Deactivate case sensitivity (majuscule and minuscule are taken into consideration in the search). +
{{ _('* can be used to match partial words, eg.') }} ADV* {{ _('! can be used to negate a match, eg.') }} !PRE