Skip to content

Commit

Permalink
Merge branch 'main' into support/3.2
Browse files Browse the repository at this point in the history
  • Loading branch information
dsuch committed Sep 2, 2023
2 parents d36c267 + c482127 commit 9abf09e
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 56 deletions.
15 changes: 14 additions & 1 deletion code/zato-common/src/zato/common/aux_server/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class AuxServerConfig:
conf_file_name: 'str'
crypto_manager: 'CryptoManager'
crypto_manager_class: 'type_[CryptoManager]'
parent_server_name: 'str'
parent_server_pid: 'int'

def __init__(self) -> 'None':
self.main = Bunch()
Expand Down Expand Up @@ -166,9 +168,15 @@ class AuxServer:
config_class: 'type_[AuxServerConfig]'
crypto_manager_class: 'type_[CryptoManager]'
has_credentials: 'bool' = True
parent_server_name: 'str'
parent_server_pid: 'int'

def __init__(self, config:'AuxServerConfig') -> 'None':
self.config = config

self.parent_server_name = config.parent_server_name
self.parent_server_pid = config.parent_server_pid

main = self.config.main

if main.crypto.use_tls:
Expand Down Expand Up @@ -217,7 +225,9 @@ def start(
username='', # type: str
password='', # type: str
callback_func=None, # type: callnone
server_type_suffix='' # type: str
server_type_suffix='', # type: str
parent_server_name='', # type: str
parent_server_pid=-1, # type: int
) -> 'None':

# Functionality that needs to run before configuration is created
Expand All @@ -239,6 +249,9 @@ def start(
class_.crypto_manager_class,
)

config.parent_server_name = parent_server_name
config.parent_server_pid = parent_server_pid

username = username or 'ipc.username.not.set.' + CryptoManager.generate_secret().decode('utf8')
password = password or 'ipc.password.not.set.' + CryptoManager.generate_secret().decode('utf8')

Expand Down
10 changes: 7 additions & 3 deletions code/zato-common/src/zato/common/ipc/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
if 0:
from zato.common.ipc.client import IPCResponse
from zato.common.typing_ import callable_
from zato.server.base.parallel import ParallelServer

# ################################################################################################################################
# ################################################################################################################################
Expand All @@ -34,12 +35,13 @@ class IPCAPI:
""" API through which IPC is performed.
"""
pid: 'int'
server: 'IPCServer'
parallel_server: 'ParallelServer'
username: 'str'
password: 'str'
on_message_callback: 'callable_'

def __init__(self) -> 'None':
def __init__(self, parallel_server:'ParallelServer') -> 'None':
self.parallel_server = parallel_server
self.username = IPC.Credentials.Username
self.password = ''

Expand Down Expand Up @@ -113,7 +115,9 @@ def invoke_by_pid(
cluster_name=cluster_name,
server_name=server_name,
server_pid=target_pid,
timeout=timeout
timeout=timeout,
source_server_name=self.parallel_server.name,
source_server_pid=self.parallel_server.pid,
)
return response

Expand Down
6 changes: 5 additions & 1 deletion code/zato-common/src/zato/common/ipc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,18 @@ def invoke(
cluster_name, # type: str
server_name, # type: str
server_pid, # type: int
timeout=90, # type: int
timeout=90, # type: int
source_server_name, # type: str
source_server_pid, # type: int
) -> 'IPCResponse':

# This is where we can find the IPC server to invoke ..
url = f'http://{self.host}:{self.port}/{url_path}'

# .. prepare the full request ..
data = dumps({
'source_server_name': source_server_name,
'source_server_pid': source_server_pid,
'action': SERVER_IPC.INVOKE.value,
'username': self.username,
'password': self.password,
Expand Down
8 changes: 5 additions & 3 deletions code/zato-common/src/zato/common/util/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1982,7 +1982,8 @@ def wait_for_dict_key_by_get_func(

def _predicate_dict_key(*_ignored_args, **_ignored_kwargs) -> 'any_':
try:
return get_key_func(key)
value = get_key_func(key)
return value
except KeyError:
return False

Expand All @@ -2001,9 +2002,10 @@ def wait_for_dict_key(
# using the dict's own .get method.

def _predicate_dict_key(*_ignored_args, **_ignored_kwargs) -> 'any_':
return _dict.get(key)
value = _dict.get(key)
return value

return wait_for_dict_key_by_get_func(_predicate_dict_key, timeout, interval)
return wait_for_dict_key_by_get_func(_predicate_dict_key, key, timeout, interval)

# ################################################################################################################################

Expand Down
2 changes: 1 addition & 1 deletion code/zato-server/src/zato/server/base/parallel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def __init__(self) -> 'None':
self.crypto_use_tls = False
self.pid = -1
self.sync_internal = False
self.ipc_api = IPCAPI()
self.ipc_api = IPCAPI(self)
self.fifo_response_buffer_size = -1
self.is_first_worker = False
self.process_idx = -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,10 +459,8 @@ def dispatch(
_exc = stack_format(e, style='color', show_vals='like_source', truncate_vals=5000,
add_summary=True, source_lines=20) if stack_format else _format_exc # type: str

# TODO: This should be configurable. Some people may want such
# things to be on DEBUG whereas for others ERROR will make most sense
# in given circumstances.
logger.error(
# Log what happened
logger.info(
'Caught an exception, cid:`%s`, status_code:`%s`, `%s`', cid, status_code, _exc)

try:
Expand Down
78 changes: 60 additions & 18 deletions code/zato-server/src/zato/server/pubsub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from zato.common.odb.model import WebSocketClientPubSubKeys
from zato.common.odb.query.pubsub.queue import set_to_delete
from zato.common.typing_ import cast_, dict_, optional
from zato.common.util.api import spawn_greenlet, wait_for_dict_key_by_get_func
from zato.common.util.api import spawn_greenlet, wait_for_dict_key, wait_for_dict_key_by_get_func
from zato.common.util.time_ import datetime_from_ms, utcnow_as_ms
from zato.server.pubsub.core.endpoint import EndpointAPI
from zato.server.pubsub.core.trigger import NotifyPubSubTasksTrigger
Expand Down Expand Up @@ -251,12 +251,36 @@ def get_all_subscriptions(self) -> 'strsubdict':
"""
return self.subscriptions_by_sub_key

# ################################################################################################################################

def _wait_for_sub_key(self, sub_key:'str') -> 'None':

# Log what we are about to do
logger.info('Waiting for sub_key -> %s', sub_key)

# Make sure the key is there.
wait_for_dict_key(self.subscriptions_by_sub_key, sub_key, timeout=180)

# ################################################################################################################################

def _get_subscription_by_sub_key(self, sub_key:'str') -> 'Subscription':
""" Low-level implementation of self.get_subscription_by_sub_key, must be called with self.lock held.
"""
return self.subscriptions_by_sub_key[sub_key]
# Make sure the key is there ..
# self._wait_for_sub_key(sub_key)

# .. get the subscription ..
sub = self.subscriptions_by_sub_key.get(sub_key)

# .. and return it to the caller if it exists ..
if sub:
return sub

# .. otherwise, raise an error ..
else:
msg = 'No such subscription `{}` among `{}`'.format(sub_key, sorted(self.subscriptions_by_sub_key))
logger.info(msg)
raise KeyError(msg)

# ################################################################################################################################

Expand Down Expand Up @@ -914,7 +938,10 @@ def add_wsx_client_pubsub_keys(
'channel_name': channel_name,
'pub_client_id': pub_client_id,
'endpoint_type': PUBSUB.ENDPOINT_TYPE.WEB_SOCKETS.id,
'wsx_info': wsx_info
'wsx_info': wsx_info,
'source': 'pubsub.PubSub',
'source_server_name': self.server.name,
'source_server_pid': self.server.pid,
})

# ################################################################################################################################
Expand Down Expand Up @@ -982,33 +1009,48 @@ def format_sk_servers(self, default:'str'='---', sub_pattern_matched:'str'=defau
def _set_sub_key_server(
self,
config, # type: stranydict
*,
ignore_missing_sub_key, # type: bool
_endpoint_type=PUBSUB.ENDPOINT_TYPE # type: type_[PUBSUB.ENDPOINT_TYPE]
) -> 'None':
""" Low-level implementation of self.set_sub_key_server - must be called with self.lock held.
"""
sub = self._get_subscription_by_sub_key(config['sub_key'])
config['endpoint_id'] = sub.endpoint_id
config['endpoint_name'] = self.endpoint_api.get_by_id(sub.endpoint_id)
self.sub_key_servers[config['sub_key']] = SubKeyServer(config)

endpoint_type = config['endpoint_type']
try:
# Try to see if we have such a subscription ..
sub = self._get_subscription_by_sub_key(config['sub_key'])
except KeyError:

# .. if we do not, it may be because it was already deleted
# before we have been invoked and this may be potentially ignored.

if not ignore_missing_sub_key:
raise

else:

config['endpoint_id'] = sub.endpoint_id
config['endpoint_name'] = self.endpoint_api.get_by_id(sub.endpoint_id)
self.sub_key_servers[config['sub_key']] = SubKeyServer(config)

endpoint_type = config['endpoint_type']

config['wsx'] = int(endpoint_type == _endpoint_type.WEB_SOCKETS.id)
config['srv'] = int(endpoint_type == _endpoint_type.SERVICE.id)
config['wsx'] = int(endpoint_type == _endpoint_type.WEB_SOCKETS.id)
config['srv'] = int(endpoint_type == _endpoint_type.SERVICE.id)

sks_table = self.format_sk_servers()
msg = 'Set sk_server{}for sub_key `%(sub_key)s` (wsx/srv:%(wsx)s/%(srv)s) - `%(server_name)s:%(server_pid)s`, ' + \
'current sk_servers:\n{}'
msg = msg.format(' ' if config['server_pid'] else ' (no PID) ', sks_table)
sks_table = self.format_sk_servers()
msg = 'Set sk_server{}for sub_key `%(sub_key)s` (wsx/srv:%(wsx)s/%(srv)s) - `%(server_name)s:%(server_pid)s`, ' + \
'current sk_servers:\n{}'
msg = msg.format(' ' if config['server_pid'] else ' (no PID) ', sks_table)

logger.info(msg, config)
logger_zato.info(msg, config)
logger.info(msg, config)
logger_zato.info(msg, config)

# ################################################################################################################################

def set_sub_key_server(self, config:'anydict') -> 'None':
with self.lock:
self._set_sub_key_server(config)
self._set_sub_key_server(config, ignore_missing_sub_key=True)

# ################################################################################################################################

Expand Down Expand Up @@ -1129,7 +1171,7 @@ def add_missing_server_for_sub_key(
config['server_pid'] = self.get_server_pid_for_sub_key(data.server_name, sub_key)

# OK, set up the server with what we found above
self._set_sub_key_server(config)
self._set_sub_key_server(config, ignore_missing_sub_key=False)

# ################################################################################################################################

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ def handle(self):
'sub_key': config['sub_key'],
'endpoint_type': config['endpoint_type'],
'task_delivery_interval': config['task_delivery_interval'],
'source': 'delivery.CreateDeliveryTask',
'source_server_name': self.server.name,
'source_server_pid': self.server.pid,
}

# Update in-RAM state of workers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
from zato.server.service import AsIs, Int, List
from zato.server.service.internal import AdminService, AdminSIO, GetListAdminSIO

len_keys = 'subscriptions_by_topic', 'subscriptions_by_sub_key', 'sub_key_servers', 'endpoints', 'topics', \
'sec_id_to_endpoint_id', 'ws_channel_id_to_endpoint_id', 'service_id_to_endpoint_id', \
len_keys = 'subscriptions_by_topic', 'subscriptions_by_sub_key', 'sub_key_servers', \
'pubsub_tool_by_sub_key', 'pubsub_tools'

# ################################################################################################################################
Expand Down Expand Up @@ -49,6 +48,11 @@ def handle(self):
'endpoint_meta_max_history': self.pubsub.endpoint_meta_max_history,
'data_prefix_len': self.pubsub.data_prefix_len,
'data_prefix_short_len': self.pubsub.data_prefix_short_len,
'endpoints': self.pubsub.endpoint_api.endpoints,
'sec_id_to_endpoint_id': self.pubsub.endpoint_api.sec_id_to_endpoint_id,
'ws_channel_id_to_endpoint_id': self.pubsub.endpoint_api.ws_channel_id_to_endpoint_id,
'service_id_to_endpoint_id': self.pubsub.endpoint_api.service_id_to_endpoint_id,
'topics': self.pubsub.topic_api.topics,
}

for key in len_keys:
Expand Down
4 changes: 2 additions & 2 deletions code/zato-server/test/zato/pubsub/test_pubapi_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ def test_wsx_services_invoker(self) -> 'None':
service_name = 'helpers.pubsub.pubapi-invoker'

# Run the test now
self.run_zato_service_test(service_name)
_ = self.run_zato_service_test(service_name)

# ################################################################################################################################
# ################################################################################################################################

if __name__ == '__main__':
main()
_ = main()

# ################################################################################################################################
# ################################################################################################################################
41 changes: 20 additions & 21 deletions code/zato-web-admin/src/zato/admin/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
from zato.admin.web.views.pubsub import endpoint as pubsub_endpoint
from zato.admin.web.views.pubsub import message as pubsub_message
from zato.admin.web.views.pubsub import subscription as pubsub_subscription
from zato.admin.web.views.pubsub.task import sync as pubsub_task_sync
from zato.admin.web.views.pubsub.task import delivery as pubsub_task
from zato.admin.web.views.pubsub.task.delivery import message as pubsub_task_message
from zato.admin.web.views.pubsub.task.delivery import server as pubsub_task_delivery_server
Expand Down Expand Up @@ -1572,32 +1571,32 @@
login_required(pubsub_task_message.get), name='pubsub-task-message'),

# PubSub objects / tools
url(r'^zato/pubsub/task/sync/$',
login_required(pubsub_task_sync.Index()), name=pubsub_task_sync.Index.url_name),

# url(r'^zato/pubsub/task/sync/$',
# login_required(pubsub_task_sync.Index()), name=pubsub_task_sync.Index.url_name),
#
# PubSub tools - dict keys
url(r'^zato/pubsub/task/sync/dict-keys/(?P<dict_name>.*)/cluster/(?P<cluster>.*)/(?P<server_name>.*)/(?P<server_pid>.*)/$',
login_required(pubsub_task_sync.SubscriptionDictKeys()), name=pubsub_task_sync.SubscriptionDictKeys.url_name),

# url(r'^zato/pubsub/task/sync/dict-keys/(?P<dict_name>.*)/cluster/(?P<cluster>.*)/(?P<server_name>.*)/(?P<server_pid>.*)/$',
# login_required(pubsub_task_sync.SubscriptionDictKeys()), name=pubsub_task_sync.SubscriptionDictKeys.url_name),
#
# PubSub tools - dict values - subscriptions
url(r'^zato/pubsub/task/sync/dict-values/sub/(?P<dict_name>.*)/cluster/(?P<cluster>.*)/(?P<server_name>.*)/(?P<server_pid>.*)/$',
login_required(pubsub_task_sync.DictValuesSubscriptions()), name=pubsub_task_sync.DictValuesSubscriptions.url_name),

# url(r'^zato/pubsub/task/sync/dict-values/sub/(?P<dict_name>.*)/cluster/(?P<cluster>.*)/(?P<server_name>.*)/(?P<server_pid>.*)/$',
# login_required(pubsub_task_sync.DictValuesSubscriptions()), name=pubsub_task_sync.DictValuesSubscriptions.url_name),
#
# PubSub tools - dict values - sub key servers
url(r'^zato/pubsub/task/sync/dict-values/sks/(?P<dict_name>.*)/cluster/(?P<cluster>.*)/(?P<server_name>.*)/(?P<server_pid>.*)/$',
login_required(pubsub_task_sync.DictValuesSubKeyServer()), name=pubsub_task_sync.DictValuesSubKeyServer.url_name),

# url(r'^zato/pubsub/task/sync/dict-values/sks/(?P<dict_name>.*)/cluster/(?P<cluster>.*)/(?P<server_name>.*)/(?P<server_pid>.*)/$',
# login_required(pubsub_task_sync.DictValuesSubKeyServer()), name=pubsub_task_sync.DictValuesSubKeyServer.url_name),
#
# PubSub tools - dict values - endpoints
url(r'^zato/pubsub/task/sync/dict-values/endpoint/(?P<dict_name>.*)/cluster/(?P<cluster>.*)/(?P<server_name>.*)/(?P<server_pid>.*)/$',
login_required(pubsub_task_sync.DictValuesEndpoints()), name=pubsub_task_sync.DictValuesEndpoints.url_name),

# url(r'^zato/pubsub/task/sync/dict-values/endpoint/(?P<dict_name>.*)/cluster/(?P<cluster>.*)/(?P<server_name>.*)/(?P<server_pid>.*)/$',
# login_required(pubsub_task_sync.DictValuesEndpoints()), name=pubsub_task_sync.DictValuesEndpoints.url_name),
#
# PubSub tools - dict values - topics
url(r'^zato/pubsub/task/sync/dict-values/topic/(?P<dict_name>.*)/cluster/(?P<cluster>.*)/(?P<server_name>.*)/(?P<server_pid>.*)/$',
login_required(pubsub_task_sync.DictValuesTopics()), name=pubsub_task_sync.DictValuesTopics.url_name),

# url(r'^zato/pubsub/task/sync/dict-values/topic/(?P<dict_name>.*)/cluster/(?P<cluster>.*)/(?P<server_name>.*)/(?P<server_pid>.*)/$',
# login_required(pubsub_task_sync.DictValuesTopics()), name=pubsub_task_sync.DictValuesTopics.url_name),
#
# PubSub tools - event list
url(r'^zato/pubsub/task/sync/event-list/cluster/(?P<cluster>.*)/(?P<server_name>.*)/(?P<server_pid>.*)/$',
login_required(pubsub_task_sync.EventList()), name=pubsub_task_sync.EventList.url_name),
# url(r'^zato/pubsub/task/sync/event-list/cluster/(?P<cluster>.*)/(?P<server_name>.*)/(?P<server_pid>.*)/$',
# login_required(pubsub_task_sync.EventList()), name=pubsub_task_sync.EventList.url_name),

# Per-server delivery tasks

Expand Down

0 comments on commit 9abf09e

Please sign in to comment.