Skip to content

Commit 96d9bdc

Browse files
committed
Review
- breaking up the code in smaller modules - improving error handling - adding job queue inspection methods
1 parent 9542242 commit 96d9bdc

File tree

11 files changed

+791
-329
lines changed

11 files changed

+791
-329
lines changed

eoxs_wps_async/backend.py

Lines changed: 38 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
#
55
# Authors: Martin Paces <martin.paces@eox.at>
66
#-------------------------------------------------------------------------------
7-
# Copyright (C) 2016 EOX IT Services GmbH
7+
# Copyright (C) 2016-2023 EOX IT Services GmbH
88
#
99
# Permission is hereby granted, free of charge, to any person obtaining a copy
1010
# of this software and associated documentation files (the "Software"), to deal
@@ -26,85 +26,53 @@
2626
#-------------------------------------------------------------------------------
2727
# pylint: disable=unused-argument, too-many-arguments, no-self-use
2828

29-
from uuid import uuid4
3029
from logging import getLogger
31-
from eoxserver.services.ows.wps.exceptions import ServerBusy, NoApplicableCode
3230
from eoxs_wps_async.util import cached_property
3331
from eoxs_wps_async.config import get_wps_config
3432
from eoxs_wps_async.client import Client, ClientError
35-
from eoxs_wps_async.handler import (
36-
check_job_id, get_job_logger, get_response_url, get_response,
37-
is_valid_job_id,
38-
)
33+
from eoxs_wps_async.protocol import ClientProtocol
34+
from eoxs_wps_async.handler import get_response_url, get_response
3935

4036
LOGGER_NAME = "eoxserver.services.ows.wps"
4137

4238

43-
class WPSAsyncBackend():
39+
class WPSAsyncBackend:
4440
""" WPS asynchronous back-end. """
4541
supported_versions = ("1.0.0",)
4642

4743
def execute(self, process, raw_inputs, resp_form, extra_parts=None,
4844
job_id=None, version="1.0.0", **kwargs):
4945
""" Asynchronous process execution. """
50-
job_id = check_job_id(job_id or str(uuid4()))
51-
logger = get_job_logger(job_id, LOGGER_NAME)
52-
53-
response, *payload = self._request(
54-
"EXECUTE",
55-
job_id,
56-
process.identifier,
57-
raw_inputs,
58-
resp_form,
59-
extra_parts
60-
)
61-
62-
if response == "OK":
63-
return job_id
64-
if response == "BUSY":
65-
raise ServerBusy("The server is busy!")
66-
if response == "OWSEXC":
67-
raise payload[0]
68-
if response == "ERROR":
69-
raise NoApplicableCode(payload[0], "eoxs_wps_async.daemon")
70-
71-
return self._handle_unknown_response(response, logger)
46+
self._send_request(self._protocol.execute_request(
47+
job_id, process.identifier, raw_inputs, resp_form, extra_parts,
48+
))
49+
return job_id
7250

7351
def purge(self, job_id, process_id=None, **kwargs):
7452
""" Purge the job from the system by removing all the resources
7553
occupied by the job.
7654
If the optional process_id is provided then the discard() callback
7755
of the process is executed.
7856
"""
79-
job_id = check_job_id(job_id)
80-
logger = get_job_logger(job_id, LOGGER_NAME)
81-
82-
response, *payload = self._request("PURGE", job_id, process_id)
83-
84-
if response == "OK":
85-
return
86-
if response == "ERROR":
87-
raise ClientError(payload[0])
88-
89-
self._handle_unknown_response(response, logger)
57+
self._send_request(self._protocol.purge_request(job_id, process_id))
9058

9159
def list(self, job_ids=None, **kwargs):
9260
""" List current jobs. The list can be restricted by the given job_ids.
9361
"""
94-
logger = getLogger(LOGGER_NAME)
95-
96-
if job_ids is not None:
97-
# convert to a list and reject invalid job ids
98-
job_ids = [id_ for id_ in job_ids if is_valid_job_id(id_)]
62+
job_ids, *_ = self._send_request(self._protocol.list_request(job_ids))
63+
return job_ids
9964

100-
response, *payload = self._request("LIST", job_ids)
101-
102-
if response == "OK":
103-
return payload[0]
104-
if response == "ERROR":
105-
raise ClientError(payload[0])
65+
def list_queue(self, job_ids=None, **kwargs):
66+
""" List queued jobs. The list can be restricted by the given job_ids.
67+
"""
68+
jobs, *_ = self._send_request(self._protocol.list_queue_request(job_ids))
69+
return jobs
10670

107-
return self._handle_unknown_response(response, logger)
71+
def get_queue_size(self, **kwargs):
72+
""" Get number of queued jobs.
73+
"""
74+
size, *_ = self._send_request(self._protocol.get_queue_size_request())
75+
return size
10876

10977
def get_response_url(self, job_id):
11078
""" Return response URL for the given job identifier. """
@@ -116,11 +84,28 @@ def get_response(self, job_id):
11684
"""
11785
return get_response(job_id, self._conf)
11886

87+
def _send_request(self, request, logger=None):
88+
""" Send request and handle response. """
89+
90+
def _request(request):
91+
with self._client as client:
92+
client.send(request)
93+
return client.recv()
94+
95+
return self._protocol.handle_response(
96+
_request(request), logger=logger
97+
)
98+
11999
@cached_property
120100
def _conf(self):
121101
""" Get configuration. """
122102
return get_wps_config()
123103

104+
@cached_property
105+
def _protocol(self):
106+
""" Get client protocol instance. """
107+
return ClientProtocol(getLogger(LOGGER_NAME))
108+
124109
@property
125110
def _client(self):
126111
""" Get connection to the execution daemon. """
@@ -137,16 +122,6 @@ def _client(self):
137122
self._conf.socket_connection_timeout,
138123
)
139124

140-
def _request(self, *request):
141-
with self._client as client:
142-
client.send(request)
143-
return client.recv()
144-
145-
def _handle_unknown_response(self, response, logger):
146-
message = f"Unknown response! RESP={response!r}"
147-
logger.error(message)
148-
raise ValueError(message)
149-
150125

151126
# alias for backward compatibility
152127
WPSAsyncBackendBase = WPSAsyncBackend

0 commit comments

Comments
 (0)