diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d5568f549..1c49b5d00e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,17 @@ Versioned according to [Semantic Versioning](http://semver.org/). Changed: * ocrd_network: Use `ocrd-all-tool.json` bundled by core instead of download from website, #1257, #1260 + * `ocrd network client processing processor` supports blocking behavior with `--block` by polling job status, #1265, #1269 + +Added: + + * `ocrd network client workflow check-status` to get the status of a workflow job, #1269 + * `ocrd network client processing check-status` to get the status of a processing (processor) job, #1269 + * `ocrd network client workflow run` Run, optionally blocking, a workflow on the processing server, #1265, #1269 + * Environment variables `OCRD_NETWORK_CLIENT_POLLING_SLEEP` and `OCRD_NETWORK_CLIENT_POLLING_TIMEOUT` to control polling interval and timeout for `ocrd network client {processing processor,workflow run`, #1269 + * `ocrd network client discovery processors` to list the processors deployed in the processing server, #1269 + * `ocrd network client discovery processor` to get the `ocrd-tool.json` of a deployed processor, #1269 + * `ocrd network client processing check-log` to retrieve the log data for a processing job, #1269 * `ocrd workspace clone`/`Resolver.workspace_from_url`: with `clobber_mets=False`, raise a FileExistsError for existing mets.xml on disk, #563, #1268 * `ocrd workspace find --download`: print the the correct, up-to-date field, not `None`, #1202, #1266 diff --git a/Makefile b/Makefile index 0608b0b738..4997066d1b 100644 --- a/Makefile +++ b/Makefile @@ -286,7 +286,7 @@ network-module-test: assets INTEGRATION_TEST_IN_DOCKER = docker exec core_test network-integration-test: $(DOCKER_COMPOSE) --file tests/network/docker-compose.yml up -d - -$(INTEGRATION_TEST_IN_DOCKER) pytest -k 'test_integration_' -v --ignore-glob="$(TESTDIR)/network/*ocrd_all*.py" + -$(INTEGRATION_TEST_IN_DOCKER) pytest -k 'test_integration_' -v --ignore-glob="tests/network/*ocrd_all*.py" $(DOCKER_COMPOSE) --file tests/network/docker-compose.yml down --remove-orphans network-integration-test-cicd: diff --git a/src/ocrd/cli/__init__.py b/src/ocrd/cli/__init__.py index 9b80abeb4d..70d738f083 100644 --- a/src/ocrd/cli/__init__.py +++ b/src/ocrd/cli/__init__.py @@ -35,6 +35,10 @@ \b {config.describe('OCRD_MAX_PROCESSOR_CACHE')} \b +{config.describe('OCRD_NETWORK_CLIENT_POLLING_SLEEP')} +\b +{config.describe('OCRD_NETWORK_CLIENT_POLLING_TIMEOUT')} +\b {config.describe('OCRD_NETWORK_SERVER_ADDR_PROCESSING')} \b {config.describe('OCRD_NETWORK_SERVER_ADDR_WORKFLOW')} diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index 8086658e04..9c7f15c88f 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -1,9 +1,11 @@ import click -from typing import Optional - -from ocrd.decorators import parameter_option -from ocrd_network import Client +from json import dumps +from typing import List, Optional, Tuple +from ocrd.decorators.parameter_option import parameter_option, parameter_override_option from ocrd_utils import DEFAULT_METS_BASENAME +from ocrd_utils.introspect import set_json_key_value_overrides +from ocrd_utils.str import parse_json_string_or_file +from ..client import Client @click.group('client') @@ -23,6 +25,34 @@ def discovery_cli(): pass +@discovery_cli.command('processors') +@click.option('--address', + help='The address of the Processing Server. If not provided, ' + 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') +def check_deployed_processors(address: Optional[str]): + """ + Get a list of deployed processing workers/processor servers. + Each processor is shown only once regardless of the amount of deployed instances. + """ + client = Client(server_addr_processing=address) + processors_list = client.check_deployed_processors() + print(dumps(processors_list, indent=4)) + + +@discovery_cli.command('processor') +@click.option('--address', + help='The address of the Processing Server. If not provided, ' + 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') +@click.argument('processor_name', required=True, type=click.STRING) +def check_processor_ocrd_tool(address: Optional[str], processor_name: str): + """ + Get the json tool of a deployed processor specified with `processor_name` + """ + client = Client(server_addr_processing=address) + ocrd_tool = client.check_deployed_processor_ocrd_tool(processor_name=processor_name) + print(dumps(ocrd_tool, indent=4)) + + @client_cli.group('processing') def processing_cli(): """ @@ -31,56 +61,92 @@ def processing_cli(): pass -@processing_cli.command('processor') +@processing_cli.command('check-log') +@click.option('--address', + help='The address of the Processing Server. If not provided, ' + 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') +@click.option('-j', '--processing-job-id', required=True) +def check_processing_job_status(address: Optional[str], processing_job_id: str): + """ + Check the log of a previously submitted processing job. + """ + client = Client(server_addr_processing=address) + response = client.check_job_log(job_id=processing_job_id) + print(response._content.decode(encoding='utf-8')) + + +@processing_cli.command('check-status') +@click.option('--address', + help='The address of the Processing Server. If not provided, ' + 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') +@click.option('-j', '--processing-job-id', required=True) +def check_processing_job_status(address: Optional[str], processing_job_id: str): + """ + Check the status of a previously submitted processing job. + """ + client = Client(server_addr_processing=address) + job_status = client.check_job_status(processing_job_id) + assert job_status + print(f"Processing job status: {job_status}") + + +@processing_cli.command('run') @click.argument('processor_name', required=True, type=click.STRING) -@click.option('--address') +@click.option('--address', + help='The address of the Processing Server. If not provided, ' + 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') @click.option('-m', '--mets', required=True, default=DEFAULT_METS_BASENAME) @click.option('-I', '--input-file-grp', default='OCR-D-INPUT') @click.option('-O', '--output-file-grp', default='OCR-D-OUTPUT') @click.option('-g', '--page-id') @parameter_option +@parameter_override_option @click.option('--result-queue-name') @click.option('--callback-url') @click.option('--agent-type', default='worker') -def send_processing_request( - address: Optional[str], - processor_name: str, - mets: str, - input_file_grp: str, - output_file_grp: Optional[str], - page_id: Optional[str], - parameter: Optional[dict], - result_queue_name: Optional[str], - callback_url: Optional[str], - # TODO: This is temporally available to toggle - # between the ProcessingWorker/ProcessorServer - agent_type: Optional[str] +@click.option('-b', '--block', default=False, + help='If set, the client will block till job timeout, fail or success.') +def send_processing_job_request( + address: Optional[str], + processor_name: str, + mets: str, + input_file_grp: str, + output_file_grp: Optional[str], + page_id: Optional[str], + parameter: List[str], + parameter_override: List[Tuple[str, str]], + result_queue_name: Optional[str], + callback_url: Optional[str], + # TODO: This is temporally available to toggle + # between the ProcessingWorker/ProcessorServer + agent_type: Optional[str], + block: Optional[bool] ): + """ + Submit a processing job to the processing server. + """ req_params = { "path_to_mets": mets, "description": "OCR-D Network client request", "input_file_grps": input_file_grp.split(','), - "parameters": parameter if parameter else {}, - "agent_type": agent_type, + "agent_type": agent_type } if output_file_grp: req_params["output_file_grps"] = output_file_grp.split(',') if page_id: req_params["page_id"] = page_id + req_params["parameters"] = set_json_key_value_overrides(parse_json_string_or_file(*parameter), *parameter_override) if result_queue_name: req_params["result_queue_name"] = result_queue_name if callback_url: req_params["callback_url"] = callback_url - - client = Client( - server_addr_processing=address - ) - response = client.send_processing_request( - processor_name=processor_name, - req_params=req_params - ) - processing_job_id = response.get('job_id', None) + client = Client(server_addr_processing=address) + processing_job_id = client.send_processing_job_request( + processor_name=processor_name, req_params=req_params) + assert processing_job_id print(f"Processing job id: {processing_job_id}") + if block: + client.poll_job_status(job_id=processing_job_id) @client_cli.group('workflow') @@ -91,6 +157,44 @@ def workflow_cli(): pass +@workflow_cli.command('check-status') +@click.option('--address', help='The address of the Processing Server. If not provided, ' + 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') +@click.option('-j', '--workflow-job-id', required=True) +def check_workflow_job_status(address: Optional[str], workflow_job_id: str): + """ + Check the status of a previously submitted workflow job. + """ + client = Client(server_addr_processing=address) + job_status = client.check_workflow_status(workflow_job_id) + assert job_status + print(f"Workflow job status: {job_status}") + + +@workflow_cli.command('run') +@click.option('--address', help='The address of the Processing Server. If not provided, ' + 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') +@click.option('-m', '--path-to-mets', required=True) +@click.option('-w', '--path-to-workflow', required=True) +@click.option('-b', '--block', default=False, + help='If set, the client will block till job timeout, fail or success.') +def send_workflow_job_request( + address: Optional[str], + path_to_mets: str, + path_to_workflow: str, + block: Optional[bool] +): + """ + Submit a workflow job to the processing server. + """ + client = Client(server_addr_processing=address) + workflow_job_id = client.send_workflow_job_request(path_to_wf=path_to_workflow, path_to_mets=path_to_mets) + assert workflow_job_id + print(f"Workflow job id: {workflow_job_id}") + if block: + client.poll_workflow_status(job_id=workflow_job_id) + + @client_cli.group('workspace') def workspace_cli(): """ diff --git a/src/ocrd_network/client.py b/src/ocrd_network/client.py index 9fa0b3994a..8ec8e541ea 100644 --- a/src/ocrd_network/client.py +++ b/src/ocrd_network/client.py @@ -1,37 +1,63 @@ -from json import dumps, loads -from requests import post as requests_post +from typing import Optional from ocrd_utils import config, getLogger, LOG_FORMAT +from .client_utils import ( + get_ps_deployed_processors, + get_ps_deployed_processor_ocrd_tool, + get_ps_processing_job_log, + get_ps_processing_job_status, + get_ps_workflow_job_status, + poll_job_status_till_timeout_fail_or_success, + poll_wf_status_till_timeout_fail_or_success, + post_ps_processing_request, + post_ps_workflow_request, + verify_server_protocol +) -from .constants import NETWORK_PROTOCOLS - -# TODO: This is just a conceptual implementation and first try to -# trigger further discussions on how this should look like. class Client: def __init__( self, - server_addr_processing: str = config.OCRD_NETWORK_SERVER_ADDR_PROCESSING, - server_addr_workflow: str = config.OCRD_NETWORK_SERVER_ADDR_WORKFLOW, - server_addr_workspace: str = config.OCRD_NETWORK_SERVER_ADDR_WORKSPACE + server_addr_processing: Optional[str], + timeout: int = config.OCRD_NETWORK_CLIENT_POLLING_TIMEOUT, + wait: int = config.OCRD_NETWORK_CLIENT_POLLING_SLEEP ): self.log = getLogger(f"ocrd_network.client") + if not server_addr_processing: + server_addr_processing = config.OCRD_NETWORK_SERVER_ADDR_PROCESSING self.server_addr_processing = server_addr_processing - self.server_addr_workflow = server_addr_workflow - self.server_addr_workspace = server_addr_workspace - - def send_processing_request(self, processor_name: str, req_params: dict): verify_server_protocol(self.server_addr_processing) - req_url = f"{self.server_addr_processing}/processor/{processor_name}" - req_headers = {"Content-Type": "application/json; charset=utf-8"} - req_json = loads(dumps(req_params)) - self.log.info(f"Sending processing request to: {req_url}") - self.log.debug(req_json) - response = requests_post(url=req_url, headers=req_headers, json=req_json) - return response.json() - - -def verify_server_protocol(address: str): - for protocol in NETWORK_PROTOCOLS: - if address.startswith(protocol): - return - raise ValueError(f"Wrong/Missing protocol in the server address: {address}, must be one of: {NETWORK_PROTOCOLS}") + self.polling_timeout = timeout + self.polling_wait = wait + self.polling_tries = int(timeout / wait) + + def check_deployed_processors(self): + return get_ps_deployed_processors(ps_server_host=self.server_addr_processing) + + def check_deployed_processor_ocrd_tool(self, processor_name: str): + return get_ps_deployed_processor_ocrd_tool( + ps_server_host=self.server_addr_processing, processor_name=processor_name) + + def check_job_log(self, job_id: str): + return get_ps_processing_job_log(self.server_addr_processing, processing_job_id=job_id) + + def check_job_status(self, job_id: str): + return get_ps_processing_job_status(self.server_addr_processing, processing_job_id=job_id) + + def check_workflow_status(self, workflow_job_id: str): + return get_ps_workflow_job_status(self.server_addr_processing, workflow_job_id=workflow_job_id) + + def poll_job_status(self, job_id: str) -> str: + return poll_job_status_till_timeout_fail_or_success( + ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait) + + def poll_workflow_status(self, job_id: str) -> str: + return poll_wf_status_till_timeout_fail_or_success( + ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait) + + def send_processing_job_request(self, processor_name: str, req_params: dict) -> str: + return post_ps_processing_request( + ps_server_host=self.server_addr_processing, processor=processor_name, job_input=req_params) + + def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str): + return post_ps_workflow_request( + ps_server_host=self.server_addr_processing, path_to_wf=path_to_wf, path_to_mets=path_to_mets) diff --git a/src/ocrd_network/client_utils.py b/src/ocrd_network/client_utils.py new file mode 100644 index 0000000000..9b924c16a4 --- /dev/null +++ b/src/ocrd_network/client_utils.py @@ -0,0 +1,101 @@ +from requests import get as request_get, post as request_post +from time import sleep +from .constants import JobState, NETWORK_PROTOCOLS + + +def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int): + if job_type not in ["workflow", "processor"]: + raise ValueError(f"Unknown job type '{job_type}', expected 'workflow' or 'processor'") + job_state = JobState.unset + while tries > 0: + sleep(wait) + if job_type == "processor": + job_state = get_ps_processing_job_status(ps_server_host, job_id) + if job_type == "workflow": + job_state = get_ps_workflow_job_status(ps_server_host, job_id) + if job_state == JobState.success or job_state == JobState.failed: + break + tries -= 1 + return job_state + + +def poll_job_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState: + return _poll_endpoint_status(ps_server_host, job_id, "processor", tries, wait) + + +def poll_wf_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState: + return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait) + + +def get_ps_deployed_processors(ps_server_host: str): + request_url = f"{ps_server_host}/processor" + response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"}) + assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" + return response.json() + + +def get_ps_deployed_processor_ocrd_tool(ps_server_host: str, processor_name: str): + request_url = f"{ps_server_host}/processor/info/{processor_name}" + response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"}) + assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" + return response.json() + + +def get_ps_processing_job_log(ps_server_host: str, processing_job_id: str): + request_url = f"{ps_server_host}/processor/log/{processing_job_id}" + response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"}) + return response + + +def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> str: + request_url = f"{ps_server_host}/processor/job/{processing_job_id}" + response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"}) + assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" + job_state = response.json()["state"] + assert job_state + return job_state + + +def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> str: + request_url = f"{ps_server_host}/workflow/job-simple/{workflow_job_id}" + response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"}) + assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" + job_state = response.json()["state"] + assert job_state + return job_state + + +def post_ps_processing_request(ps_server_host: str, processor: str, job_input: dict) -> str: + request_url = f"{ps_server_host}/processor/run/{processor}" + response = request_post( + url=request_url, + headers={"accept": "application/json; charset=utf-8"}, + json=job_input + ) + assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" + processing_job_id = response.json()["job_id"] + assert processing_job_id + return processing_job_id + + +# TODO: Can be extended to include other parameters such as page_wise +def post_ps_workflow_request(ps_server_host: str, path_to_wf: str, path_to_mets: str) -> str: + request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise=True" + response = request_post( + url=request_url, + headers={"accept": "application/json; charset=utf-8"}, + files={"workflow": open(path_to_wf, "rb")} + ) + # print(response.json()) + # print(response.__dict__) + assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" + wf_job_id = response.json()["job_id"] + assert wf_job_id + return wf_job_id + + +def verify_server_protocol(address: str): + for protocol in NETWORK_PROTOCOLS: + if address.startswith(protocol): + return + raise ValueError(f"Wrong/Missing protocol in the server address: {address}, must be one of: {NETWORK_PROTOCOLS}") diff --git a/src/ocrd_network/processing_server.py b/src/ocrd_network/processing_server.py index e142802268..34c22e5cf6 100644 --- a/src/ocrd_network/processing_server.py +++ b/src/ocrd_network/processing_server.py @@ -651,7 +651,7 @@ async def list_processors(self) -> List[str]: # There is no caching on the Processing Server side processor_names_list = self.deployer.find_matching_network_agents( docker_only=False, native_only=False, worker_only=False, server_only=False, - str_names_only=True, unique_only=True + str_names_only=True, unique_only=True, sort=True ) return processor_names_list diff --git a/src/ocrd_network/runtime_data/deployer.py b/src/ocrd_network/runtime_data/deployer.py index 124c9fbbe2..b956904d07 100644 --- a/src/ocrd_network/runtime_data/deployer.py +++ b/src/ocrd_network/runtime_data/deployer.py @@ -35,7 +35,7 @@ def __init__(self, config_path: str) -> None: # TODO: Reconsider this. def find_matching_network_agents( self, worker_only: bool = False, server_only: bool = False, docker_only: bool = False, - native_only: bool = False, str_names_only: bool = False, unique_only: bool = False + native_only: bool = False, str_names_only: bool = False, unique_only: bool = False, sort: bool = False ) -> Union[List[str], List[object]]: """Finds and returns a list of matching data objects of type: `DataProcessingWorker` and `DataProcessorServer`. @@ -46,6 +46,7 @@ def find_matching_network_agents( :py:attr:`native_only` match only native network agents (DataProcessingWorker and DataProcessorServer) :py:attr:`str_names_only` returns the processor_name filed instead of the Data* object :py:attr:`unique_only` remove duplicate names from the matches + :py:attr:`sort` sort the result `worker_only` and `server_only` are mutually exclusive to each other `docker_only` and `native_only` are mutually exclusive to each other @@ -64,6 +65,10 @@ def find_matching_network_agents( msg = f"Value 'unique_only' is allowed only together with 'str_names_only'" self.log.exception(msg) raise ValueError(msg) + if sort and not str_names_only: + msg = f"Value 'sort' is allowed only together with 'str_names_only'" + self.log.exception(msg) + raise ValueError(msg) # Find all matching objects of type DataProcessingWorker or DataProcessorServer matched_objects = [] @@ -88,8 +93,12 @@ def find_matching_network_agents( matched_names = [match.processor_name for match in matched_objects] if not unique_only: return matched_names - # Removes any duplicate entries from matched names - return list(dict.fromkeys(matched_names)) + list_matched = list(dict.fromkeys(matched_names)) + if not sort: + # Removes any duplicate entries from matched names + return list_matched + list_matched.sort() + return list_matched def resolve_processor_server_url(self, processor_name) -> str: processor_server_url = '' diff --git a/src/ocrd_network/server_utils.py b/src/ocrd_network/server_utils.py index cc0c59ec67..9d8628170c 100644 --- a/src/ocrd_network/server_utils.py +++ b/src/ocrd_network/server_utils.py @@ -125,14 +125,13 @@ def request_processor_server_tool_json(logger: Logger, processor_server_base_url urljoin(base=processor_server_base_url, url="info"), headers={"Content-Type": "application/json"} ) - if response.status_code != 200: - message = f"Failed to retrieve tool json from: {processor_server_base_url}, code: {response.status_code}" - raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message) - return response.json() except Exception as error: message = f"Failed to retrieve ocrd tool json from: {processor_server_base_url}" raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) - + if response.status_code != 200: + message = f"Failed to retrieve tool json from: {processor_server_base_url}, code: {response.status_code}" + raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message) + return response.json() async def forward_job_to_processor_server( logger: Logger, job_input: PYJobInput, processor_server_base_url: str @@ -193,11 +192,14 @@ def parse_workflow_tasks(logger: Logger, workflow_content: str) -> List[Processo def raise_http_exception(logger: Logger, status_code: int, message: str, error: Exception = None) -> None: - logger.exception(f"{message} {error}") + if error: + message = f"{message} {error}" + logger.exception(f"{message}") raise HTTPException(status_code=status_code, detail=message) def validate_job_input(logger: Logger, processor_name: str, ocrd_tool: dict, job_input: PYJobInput) -> None: + # logger.warning(f"Job input: {job_input}") if bool(job_input.path_to_mets) == bool(job_input.workspace_id): message = ( "Wrong processing job input format. " @@ -210,12 +212,12 @@ def validate_job_input(logger: Logger, processor_name: str, ocrd_tool: dict, job raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message) try: report = ParameterValidator(ocrd_tool).validate(dict(job_input.parameters)) - if not report.is_valid: - message = f"Failed to validate processing job input against the tool json of processor: {processor_name}\n" - raise_http_exception(logger, status.HTTP_404_BAD_REQUEST, message + report.errors) except Exception as error: message = f"Failed to validate processing job input against the ocrd tool json of processor: {processor_name}" - raise_http_exception(logger, status.HTTP_404_BAD_REQUEST, message, error) + raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, message, error) + if report and not report.is_valid: + message = f"Failed to validate processing job input against the tool json of processor: {processor_name}\n" + raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, f"{message}{report.errors}") def validate_workflow(logger: Logger, workflow: str) -> None: diff --git a/src/ocrd_utils/config.py b/src/ocrd_utils/config.py index b3a3e9537d..063af930c8 100644 --- a/src/ocrd_utils/config.py +++ b/src/ocrd_utils/config.py @@ -145,6 +145,16 @@ def _ocrd_download_timeout_parser(val): description="Default address of Processing Server to connect to (for `ocrd network client processing`).", default=(True, '')) +config.add("OCRD_NETWORK_CLIENT_POLLING_SLEEP", + description="How many seconds to sleep before trying again.", + parser=int, + default=(True, 30)) + +config.add("OCRD_NETWORK_CLIENT_POLLING_TIMEOUT", + description="Timeout for a blocking ocrd network client (in seconds).", + parser=int, + default=(True, 3600)) + config.add("OCRD_NETWORK_SERVER_ADDR_WORKFLOW", description="Default address of Workflow Server to connect to (for `ocrd network client workflow`).", default=(True, '')) diff --git a/tests/network/config.py b/tests/network/config.py index 67c4ff24b7..e22cc6ce9d 100644 --- a/tests/network/config.py +++ b/tests/network/config.py @@ -55,6 +55,20 @@ parser=_ocrd_download_timeout_parser ) +test_config.add( + "OCRD_NETWORK_CLIENT_POLLING_SLEEP", + description="How many seconds to sleep before trying again.", + parser=int, + default=(True, 30) +) + +test_config.add( + "OCRD_NETWORK_CLIENT_POLLING_TIMEOUT", + description="Timeout for a blocking ocrd network client (in seconds).", + parser=int, + default=(True, 3600) +) + test_config.add( name="OCRD_NETWORK_SERVER_ADDR_PROCESSING", description="Default address of Processing Server to connect to (for `ocrd network client processing`).", diff --git a/tests/network/test_integration_5_processing_server.py b/tests/network/test_integration_5_processing_server.py index bce67bbe69..bf5fadee3c 100644 --- a/tests/network/test_integration_5_processing_server.py +++ b/tests/network/test_integration_5_processing_server.py @@ -1,10 +1,13 @@ from pathlib import Path from requests import get as request_get +from src.ocrd_network.client_utils import ( + poll_job_status_till_timeout_fail_or_success, poll_wf_status_till_timeout_fail_or_success, + post_ps_processing_request, post_ps_workflow_request) from src.ocrd_network.constants import AgentType, JobState from src.ocrd_network.logging_utils import get_processing_job_logging_file_path from tests.base import assets from tests.network.config import test_config -from tests.network.utils import poll_till_timeout_fail_or_success, post_ps_processing_request, post_ps_workflow_request + PROCESSING_SERVER_URL = test_config.PROCESSING_SERVER_URL @@ -40,10 +43,8 @@ def test_processing_server_processing_request(): "parameters": {} } test_processor = "ocrd-dummy" - processing_job_id = post_ps_processing_request(PROCESSING_SERVER_URL, test_processor, test_processing_job_input) - job_state = poll_till_timeout_fail_or_success( - test_url=f"{PROCESSING_SERVER_URL}/processor/job/{processing_job_id}", tries=10, wait=10 - ) + process_job_id = post_ps_processing_request(PROCESSING_SERVER_URL, test_processor, test_processing_job_input) + job_state = poll_job_status_till_timeout_fail_or_success(PROCESSING_SERVER_URL, process_job_id, tries=10, wait=10) assert job_state == JobState.success # Check the existence of the results locally @@ -58,9 +59,7 @@ def test_processing_server_workflow_request(): workspace_root = "kant_aufklaerung_1784/data" path_to_mets = assets.path_to(f"{workspace_root}/mets.xml") wf_job_id = post_ps_workflow_request(PROCESSING_SERVER_URL, path_to_dummy_wf, path_to_mets) - job_state = poll_till_timeout_fail_or_success( - test_url=f"{PROCESSING_SERVER_URL}/workflow/job-simple/{wf_job_id}", tries=30, wait=10 - ) + job_state = poll_wf_status_till_timeout_fail_or_success(PROCESSING_SERVER_URL, wf_job_id, tries=10, wait=10) assert job_state == JobState.success # Check the existence of the results locally diff --git a/tests/network/test_integration_6_client.py b/tests/network/test_integration_6_client.py new file mode 100644 index 0000000000..1a693ed0b1 --- /dev/null +++ b/tests/network/test_integration_6_client.py @@ -0,0 +1,38 @@ +from pathlib import Path +from src.ocrd_network.constants import AgentType, JobState +from tests.base import assets +from tests.network.config import test_config +from ocrd_network.client import Client + +PROCESSING_SERVER_URL = test_config.PROCESSING_SERVER_URL +timeout = test_config.OCRD_NETWORK_CLIENT_POLLING_TIMEOUT +wait = test_config.OCRD_NETWORK_CLIENT_POLLING_SLEEP + + +def test_client_processing_processor(): + workspace_root = "kant_aufklaerung_1784/data" + path_to_mets = assets.path_to(f"{workspace_root}/mets.xml") + client = Client(PROCESSING_SERVER_URL, timeout, wait) + req_params = { + "path_to_mets": path_to_mets, + "description": "OCR-D Network client request", + "input_file_grps": ["OCR-D-IMG"], + "output_file_grps": ["OCR-D-DUMMY-TEST-CLIENT"], + "parameters": {}, + "agent_type": AgentType.PROCESSING_WORKER + } + processing_job_id = client.send_processing_job_request(processor_name="ocrd-dummy", req_params=req_params) + assert processing_job_id + print(f"Processing job id: {processing_job_id}") + assert JobState.success == client.poll_job_status(processing_job_id) + + +def test_client_processing_workflow(): + workspace_root = "kant_aufklaerung_1784/data" + path_to_mets = assets.path_to(f"{workspace_root}/mets.xml") + # TODO: Improve the path resolution + path_to_dummy_wf = f"{Path(__file__).parent.resolve()}/dummy-workflow.txt" + client = Client(PROCESSING_SERVER_URL, timeout, wait) + wf_job_id = client.send_workflow_job_request(path_to_dummy_wf, path_to_mets) + print(f"Workflow job id: {wf_job_id}") + assert JobState.success == client.poll_workflow_status(wf_job_id) diff --git a/tests/network/test_integration_ocrd_all.py b/tests/network/test_integration_ocrd_all.py index d54d9f2fd5..bebfcf7623 100644 --- a/tests/network/test_integration_ocrd_all.py +++ b/tests/network/test_integration_ocrd_all.py @@ -1,6 +1,7 @@ +from src.ocrd_network.client_utils import poll_wf_status_till_timeout_fail_or_success, post_ps_workflow_request from src.ocrd_network.constants import JobState from tests.network.config import test_config -from tests.network.utils import poll_till_timeout_fail_or_success, post_ps_workflow_request + PROCESSING_SERVER_URL = test_config.PROCESSING_SERVER_URL @@ -11,9 +12,5 @@ def test_ocrd_all_workflow(): path_to_wf = "/ocrd-data/assets/ocrd_all-test-workflow.txt" path_to_mets = "/data/mets.xml" wf_job_id = post_ps_workflow_request(PROCESSING_SERVER_URL, path_to_wf, path_to_mets) - job_state = poll_till_timeout_fail_or_success( - test_url=f"{PROCESSING_SERVER_URL}/workflow/job-simple/{wf_job_id}", - tries=30, - wait=10 - ) + job_state = poll_wf_status_till_timeout_fail_or_success(PROCESSING_SERVER_URL, wf_job_id, tries=30, wait=10) assert job_state == JobState.success diff --git a/tests/network/utils.py b/tests/network/utils.py deleted file mode 100644 index dbf594a894..0000000000 --- a/tests/network/utils.py +++ /dev/null @@ -1,47 +0,0 @@ -from requests import get as request_get, post as request_post -from time import sleep -from src.ocrd_network.constants import JobState - - -def poll_till_timeout_fail_or_success(test_url: str, tries: int, wait: int) -> JobState: - job_state = JobState.unset - while tries > 0: - sleep(wait) - response = request_get(url=test_url) - assert response.status_code == 200, f"Processing server: {test_url}, {response.status_code}" - job_state = response.json()["state"] - if job_state == JobState.success or job_state == JobState.failed: - break - tries -= 1 - return job_state - - -def post_ps_processing_request(ps_server_host: str, test_processor: str, test_job_input: dict) -> str: - test_url = f"{ps_server_host}/processor/run/{test_processor}" - response = request_post( - url=test_url, - headers={"accept": "application/json"}, - json=test_job_input - ) - # print(response.json()) - # print(response.__dict__) - assert response.status_code == 200, f"Processing server: {test_url}, {response.status_code}" - processing_job_id = response.json()["job_id"] - assert processing_job_id - return processing_job_id - - -# TODO: Can be extended to include other parameters such as page_wise -def post_ps_workflow_request(ps_server_host: str, path_to_test_wf: str, path_to_test_mets: str) -> str: - test_url = f"{ps_server_host}/workflow/run?mets_path={path_to_test_mets}&page_wise=True" - response = request_post( - url=test_url, - headers={"accept": "application/json"}, - files={"workflow": open(path_to_test_wf, "rb")} - ) - # print(response.json()) - # print(response.__dict__) - assert response.status_code == 200, f"Processing server: {test_url}, {response.status_code}" - wf_job_id = response.json()["job_id"] - assert wf_job_id - return wf_job_id