diff --git a/swagger_server/__main__.py b/swagger_server/__main__.py index 3d33dec9..a2c256ce 100644 --- a/swagger_server/__main__.py +++ b/swagger_server/__main__.py @@ -6,6 +6,7 @@ from queue import Queue import connexion +from sdx_pce.topology.manager import TopologyManager from swagger_server import encoder from swagger_server.messaging.rpc_queue_consumer import RpcConsumer @@ -20,7 +21,7 @@ def main(): if LOG_FILE: logging.basicConfig(filename=LOG_FILE, level=logging.INFO) else: - logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=logging.DEBUG) # Run swagger service app = connexion.App(__name__, specification_dir="./swagger/") @@ -36,8 +37,10 @@ def main(): db_instance = DbUtils() db_instance.initialize_db() + topology_manager = TopologyManager() + thread_queue = Queue() - rpc = RpcConsumer(thread_queue, "") + rpc = RpcConsumer(thread_queue, "", topology_manager) rpc.start_sdx_consumer(thread_queue, db_instance) diff --git a/swagger_server/controllers/connection_controller.py b/swagger_server/controllers/connection_controller.py index 17a37f1d..f8312b85 100644 --- a/swagger_server/controllers/connection_controller.py +++ b/swagger_server/controllers/connection_controller.py @@ -2,11 +2,8 @@ import logging import connexion -from sdx_pce.load_balancing.te_solver import TESolver -from sdx_pce.topology.temanager import TEManager -from swagger_server.messaging.topic_queue_producer import TopicQueueProducer -from swagger_server.models.simple_link import SimpleLink +from swagger_server.handlers.connection_handler import ConnectionHandler from swagger_server.utils.db_utils import DbUtils LOG_FORMAT = ( @@ -20,6 +17,7 @@ # Get DB connection and tables set up. db_instance = DbUtils() db_instance.initialize_db() +connection_handler = ConnectionHandler(db_instance) def is_json(myjson): @@ -89,118 +87,6 @@ def place_connection(body): db_instance.add_key_value_pair_to_db("connection_data", json.dumps(body)) logger.info("Saving to database complete.") - if db_instance.read_from_db("latest_topo"): - topo_val = db_instance.read_from_db("latest_topo")["latest_topo"] - topo_json = json.loads(topo_val) - logger.info(f"Read topology {topo_val}") - - num_domain_topos = 0 - - if db_instance.read_from_db("num_domain_topos"): - num_domain_topos = db_instance.read_from_db("num_domain_topos")[ - "num_domain_topos" - ] - - # Initializing TEManager with `None` topology data is a - # work-around for - # https://github.com/atlanticwave-sdx/sdx-controller/issues/145 - temanager = TEManager(topology_data=None, connection_data=body) - lc_domain_topo_dict = {} - - # Read LC-1, LC-2, LC-3, and LC-4 topologies because of - # https://github.com/atlanticwave-sdx/sdx-controller/issues/152 - for i in range(1, int(num_domain_topos) + 2): - lc = f"LC-{i}" - logger.debug(f"Reading {lc} from DB") - curr_topo = db_instance.read_from_db(lc) - if curr_topo is None: - logger.debug(f"Read {lc} from DB: {curr_topo}") - continue - else: - # Get the actual thing minus the Mongo ObjectID. - curr_topo_str = curr_topo.get(lc) - # Just log a substring, not the whole thing. - logger.debug(f"Read {lc} from DB: {curr_topo_str[0:50]}...") - - curr_topo_json = json.loads(curr_topo_str) - lc_domain_topo_dict[curr_topo_json["domain_name"]] = curr_topo_json[ - "lc_queue_name" - ] - logger.debug(f"Adding #{i} topology {curr_topo_json.get('id')} to TEManager") - temanager.add_topology(curr_topo_json) - - for num, val in enumerate(temanager.topology_manager.topology_list): - logger.info(f"TE topology #{num}: {val}") - - graph = temanager.generate_graph_te() - if graph is None: - return "Could not generate a graph", 400 - - traffic_matrix = temanager.generate_connection_te() - if traffic_matrix is None: - return "Could not generate a traffic matrix", 400 - - logger.info(f"Generated graph: '{graph}', traffic matrix: '{traffic_matrix}'") - - solver = TESolver(graph, traffic_matrix) - solution = solver.solve() - logger.debug(f"TESolver result: {solution}") - - if solution is None or solution.connection_map is None: - return "Could not solve the request", 400 - - breakdown = temanager.generate_connection_breakdown(solution) - logger.debug(f"-- BREAKDOWN: {json.dumps(breakdown)}") - - if breakdown is None: - return "Could not break down the solution", 400 - - link_connections_dict_json = ( - db_instance.read_from_db("link_connections_dict")["link_connections_dict"] - if db_instance.read_from_db("link_connections_dict") - else None - ) - - if link_connections_dict_json: - link_connections_dict = json.loads(link_connections_dict_json) - else: - link_connections_dict = {} - - for domain, link in breakdown.items(): - port_list = [] - for key in link.keys(): - if "uni_" in key and "port_id" in link[key]: - port_list.append(link[key]["port_id"]) - - if port_list: - simple_link = SimpleLink(port_list).to_string() - - if simple_link not in link_connections_dict: - link_connections_dict[simple_link] = [] - - if body not in link_connections_dict[simple_link]: - link_connections_dict[simple_link].append(body) - - db_instance.add_key_value_pair_to_db( - "link_connections_dict", json.dumps(link_connections_dict) - ) - - logger.debug(f"Attempting to publish domain: {domain}, link: {link}") - - # From "urn:ogf:network:sdx:topology:amlight.net", attempt to - # extract a string like "amlight". - domain_name = find_between(domain, "topology:", ".net") or f"{domain}" - exchange_name = "connection" - - logger.debug( - f"Publishing '{link}' with exchange_name: {exchange_name}, " - f"routing_key: {domain_name}" - ) - - producer = TopicQueueProducer( - timeout=5, exchange_name=exchange_name, routing_key=domain_name - ) - producer.call(json.dumps(link)) - producer.stop_keep_alive() + connection_handler.place_connection(body) return "Connection published" diff --git a/swagger_server/handlers/connection_handler.py b/swagger_server/handlers/connection_handler.py index 699f19fc..69136bb9 100644 --- a/swagger_server/handlers/connection_handler.py +++ b/swagger_server/handlers/connection_handler.py @@ -1,7 +1,12 @@ import json import logging +from sdx_pce.load_balancing.te_solver import TESolver +from sdx_pce.topology.temanager import TEManager + +from swagger_server.messaging.topic_queue_producer import TopicQueueProducer from swagger_server.models.simple_link import SimpleLink +from swagger_server.utils.parse_helper import ParseHelper logger = logging.getLogger(__name__) logging.getLogger("pika").setLevel(logging.WARNING) @@ -10,31 +15,149 @@ class ConnectionHandler: def __init__(self, db_instance): self.db_instance = db_instance - pass + self.parse_helper = ParseHelper() def remove_connection(self, connection): # call pce to remove connection pass + def _send_breakdown_to_lc(self, temanager, connection, solution): + breakdown = temanager.generate_connection_breakdown(solution) + logger.debug(f"-- BREAKDOWN: {json.dumps(breakdown)}") + + if breakdown is None: + return "Could not break down the solution", 400 + + link_connections_dict_json = ( + self.db_instance.read_from_db("link_connections_dict")[ + "link_connections_dict" + ] + if self.db_instance.read_from_db("link_connections_dict") + else None + ) + + if link_connections_dict_json: + link_connections_dict = json.loads(link_connections_dict_json) + else: + link_connections_dict = {} + + for domain, link in breakdown.items(): + port_list = [] + for key in link.keys(): + if "uni_" in key and "port_id" in link[key]: + port_list.append(link[key]["port_id"]) + + if port_list: + simple_link = SimpleLink(port_list).to_string() + + if simple_link not in link_connections_dict: + link_connections_dict[simple_link] = [] + + if connection not in link_connections_dict[simple_link]: + link_connections_dict[simple_link].append(connection) + + self.db_instance.add_key_value_pair_to_db( + "link_connections_dict", json.dumps(link_connections_dict) + ) + + logger.debug(f"Attempting to publish domain: {domain}, link: {link}") + + # From "urn:ogf:network:sdx:topology:amlight.net", attempt to + # extract a string like "amlight". + domain_name = ( + self.parse_helper.find_between(domain, "topology:", ".net") + or f"{domain}" + ) + exchange_name = "connection" + + logger.debug( + f"Publishing '{link}' with exchange_name: {exchange_name}, " + f"routing_key: {domain_name}" + ) + + producer = TopicQueueProducer( + timeout=5, exchange_name=exchange_name, routing_key=domain_name + ) + producer.call(json.dumps(link)) + producer.stop_keep_alive() + def place_connection(self, connection): # call pce to generate breakdown, and place connection - pass + num_domain_topos = 0 + + if self.db_instance.read_from_db("num_domain_topos"): + num_domain_topos = self.db_instance.read_from_db("num_domain_topos")[ + "num_domain_topos" + ] + + # Initializing TEManager with `None` topology data is a + # work-around for + # https://github.com/atlanticwave-sdx/sdx-controller/issues/145 + temanager = TEManager(topology_data=None, connection_data=connection) + lc_domain_topo_dict = {} + + # Read LC-1, LC-2, LC-3, and LC-4 topologies because of + # https://github.com/atlanticwave-sdx/sdx-controller/issues/152 + for i in range(1, int(num_domain_topos) + 2): + lc = f"LC-{i}" + logger.debug(f"Reading {lc} from DB") + curr_topo = self.db_instance.read_from_db(lc) + if curr_topo is None: + logger.debug(f"Read {lc} from DB: {curr_topo}") + continue + else: + # Get the actual thing minus the Mongo ObjectID. + curr_topo_str = curr_topo.get(lc) + # Just log a substring, not the whole thing. + logger.debug(f"Read {lc} from DB: {curr_topo_str[0:50]}...") + + curr_topo_json = json.loads(curr_topo_str) + lc_domain_topo_dict[curr_topo_json["domain_name"]] = curr_topo_json[ + "lc_queue_name" + ] + logger.debug( + f"Adding #{i} topology {curr_topo_json.get('id')} to TEManager" + ) + temanager.add_topology(curr_topo_json) + + for num, val in enumerate(temanager.topology_manager.topology_list): + logger.info(f"TE topology #{num}: {val}") + + graph = temanager.generate_graph_te() + if graph is None: + return "Could not generate a graph", 400 + + traffic_matrix = temanager.generate_connection_te() + if traffic_matrix is None: + return "Could not generate a traffic matrix", 400 + + logger.info(f"Generated graph: '{graph}', traffic matrix: '{traffic_matrix}'") + + solver = TESolver(graph, traffic_matrix) + solution = solver.solve() + logger.debug(f"TESolver result: {solution}") + + if solution is None or solution.connection_map is None: + return "Could not solve the request", 400 + + self._send_breakdown_to_lc(temanager, connection, solution) def handle_link_failure(self, msg_json): - logger.debug("Handling connections that contain failed link.") - if self.db_instance.read_from_db("link_connections_dict") is None: - logger.debug("No connection has been placed yet.") - return + logger.debug("---Handling connections that contain failed link.---") link_connections_dict_str = self.db_instance.read_from_db( "link_connections_dict" ) - if link_connections_dict_str: - link_connections_dict = json.loads( - link_connections_dict_str["link_connections_dict"] - ) - else: - logger.debug("Failed to retrieve link_connections_dict from DB.") + if ( + not link_connections_dict_str + or not link_connections_dict_str["link_connections_dict"] + ): + logger.debug("No connection has been placed yet.") + return + + link_connections_dict = json.loads( + link_connections_dict_str["link_connections_dict"] + ) for link in msg_json["link_failure"]: port_list = [] diff --git a/swagger_server/handlers/lc_message_handler.py b/swagger_server/handlers/lc_message_handler.py index 99817b8d..09b17393 100644 --- a/swagger_server/handlers/lc_message_handler.py +++ b/swagger_server/handlers/lc_message_handler.py @@ -27,7 +27,7 @@ def process_lc_json_msg( msg_version = msg_json["version"] lc_queue_name = msg_json["lc_queue_name"] - logger.debug("---lc_queue_name:---") + logger.debug("Processing LC message: lc_queue_name:") logger.debug(lc_queue_name) domain_name = self.parse_helper.find_between(msg_id, "topology:", ".net") @@ -42,6 +42,7 @@ def process_lc_json_msg( # Update existing topology if domain_name in domain_list: logger.info("Updating topo") + logger.debug(msg_json) self.manager.update_topology(msg_json) if "link_failure" in msg_json: logger.info("Processing link failure.") diff --git a/swagger_server/messaging/rpc_queue_consumer.py b/swagger_server/messaging/rpc_queue_consumer.py index 0ae83839..1128a695 100644 --- a/swagger_server/messaging/rpc_queue_consumer.py +++ b/swagger_server/messaging/rpc_queue_consumer.py @@ -6,7 +6,6 @@ from queue import Queue import pika -from sdx_pce.topology.manager import TopologyManager from swagger_server.handlers.lc_message_handler import LcMessageHandler from swagger_server.utils.parse_helper import ParseHelper @@ -19,7 +18,7 @@ class RpcConsumer(object): - def __init__(self, thread_queue, exchange_name): + def __init__(self, thread_queue, exchange_name, topology_manager): self.logger = logging.getLogger(__name__) self.connection = pika.BlockingConnection( pika.ConnectionParameters(host=MQ_HOST) @@ -31,6 +30,8 @@ def __init__(self, thread_queue, exchange_name): self.channel.queue_declare(queue=SUB_QUEUE) self._thread_queue = thread_queue + self.manager = topology_manager + def on_request(self, ch, method, props, message_body): response = message_body self._thread_queue.put(message_body) @@ -58,35 +59,50 @@ def start_consumer(self): def start_sdx_consumer(self, thread_queue, db_instance): MESSAGE_ID = 0 HEARTBEAT_ID = 0 - rpc = RpcConsumer(thread_queue, "") + rpc = RpcConsumer(thread_queue, "", self.manager) t1 = threading.Thread(target=rpc.start_consumer, args=()) t1.start() - manager = TopologyManager() - lc_message_handler = LcMessageHandler(db_instance, manager) + lc_message_handler = LcMessageHandler(db_instance, self.manager) parse_helper = ParseHelper() latest_topo = {} domain_list = [] - num_domain_topos = len(domain_list) - - if db_instance.read_from_db("domain_list") is not None: - domain_list = db_instance.read_from_db("domain_list")["domain_list"] - - if db_instance.read_from_db("num_domain_topos") is not None: - db_instance.add_key_value_pair_to_db("num_domain_topos", num_domain_topos) - for topo in range(1, num_domain_topos + 1): + num_domain_topos = 0 + # For testing + # db_instance.add_key_value_pair_to_db("link_connections_dict", {}) + + # This part reads from DB when SDX controller initially starts. + # It looks for domain_list, and num_domain_topos, if they are already in DB, + # Then use the existing ones from DB. + domain_list_from_db = db_instance.read_from_db("domain_list") + latest_topo_from_db = db_instance.read_from_db("latest_topo") + num_domain_topos_from_db = db_instance.read_from_db("num_domain_topos") + + if domain_list_from_db: + domain_list = domain_list_from_db["domain_list"] + logger.debug("Read domain_list from db: ") + logger.debug(domain_list) + + if latest_topo_from_db: + latest_topo = latest_topo_from_db["latest_topo"] + logger.debug("Read latest_topo from db: ") + logger.debug(latest_topo) + + if num_domain_topos_from_db: + num_domain_topos = num_domain_topos_from_db["num_domain_topos"] + logger.debug("Read num_domain_topos from db: ") + logger.debug(num_domain_topos) + for topo in range(1, num_domain_topos + 2): db_key = f"LC-{topo}" - logger.debug(f"Reading {db_key} from DB") topology = db_instance.read_from_db(db_key) - logger.debug(f"Read {db_key}: {topology}") - if topology is None: - continue - else: + + if topology: # Get the actual thing minus the Mongo ObjectID. topology = topology[db_key] - topo_json = json.loads(topology) - manager.add_topology(topo_json) + topo_json = json.loads(topology) + self.manager.add_topology(topo_json) + logger.debug(f"Read {db_key}: {topology}") while True: # Queue.get() will block until there's an item in the queue. @@ -110,23 +126,7 @@ def start_sdx_consumer(self, thread_queue, db_instance): logger.info("got message from MQ: " + str(msg)) else: db_instance.add_key_value_pair_to_db(str(MESSAGE_ID), msg) - logger.debug("Save to database complete.") - logger.debug("message ID:" + str(MESSAGE_ID)) - value = db_instance.read_from_db(str(MESSAGE_ID)) - logger.debug("got value from DB:") - logger.debug(value) + logger.debug( + "Save to database complete. message ID: " + str(MESSAGE_ID) + ) MESSAGE_ID += 1 - - -if __name__ == "__main__": - thread_queue = Queue() - rpc = RpcConsumer(thread_queue) - - t1 = threading.Thread(target=rpc.start_consumer, args=()) - t1.start() - - while True: - if not thread_queue.empty(): - print("-----thread-----got message: " + str(thread_queue.get())) - print("----------") - # rpc.start_consumer()