Skip to content

Commit

Permalink
Merge pull request #196 from atlanticwave-sdx/161-add-topo
Browse files Browse the repository at this point in the history
Enable adding connection after link failure
  • Loading branch information
congwang09 authored Sep 19, 2023
2 parents 9553444 + 63ab0ab commit b65b5b2
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 171 deletions.
7 changes: 5 additions & 2 deletions swagger_server/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from queue import Queue

import connexion
from sdx_pce.topology.manager import TopologyManager

from swagger_server import encoder
from swagger_server.messaging.rpc_queue_consumer import RpcConsumer
Expand All @@ -20,7 +21,7 @@ def main():
if LOG_FILE:
logging.basicConfig(filename=LOG_FILE, level=logging.INFO)
else:
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.DEBUG)

# Run swagger service
app = connexion.App(__name__, specification_dir="./swagger/")
Expand All @@ -36,8 +37,10 @@ def main():
db_instance = DbUtils()
db_instance.initialize_db()

topology_manager = TopologyManager()

thread_queue = Queue()
rpc = RpcConsumer(thread_queue, "")
rpc = RpcConsumer(thread_queue, "", topology_manager)
rpc.start_sdx_consumer(thread_queue, db_instance)


Expand Down
120 changes: 3 additions & 117 deletions swagger_server/controllers/connection_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@
import logging

import connexion
from sdx_pce.load_balancing.te_solver import TESolver
from sdx_pce.topology.temanager import TEManager

from swagger_server.messaging.topic_queue_producer import TopicQueueProducer
from swagger_server.models.simple_link import SimpleLink
from swagger_server.handlers.connection_handler import ConnectionHandler
from swagger_server.utils.db_utils import DbUtils

LOG_FORMAT = (
Expand All @@ -20,6 +17,7 @@
# Get DB connection and tables set up.
db_instance = DbUtils()
db_instance.initialize_db()
connection_handler = ConnectionHandler(db_instance)


def is_json(myjson):
Expand Down Expand Up @@ -89,118 +87,6 @@ def place_connection(body):
db_instance.add_key_value_pair_to_db("connection_data", json.dumps(body))
logger.info("Saving to database complete.")

if db_instance.read_from_db("latest_topo"):
topo_val = db_instance.read_from_db("latest_topo")["latest_topo"]
topo_json = json.loads(topo_val)
logger.info(f"Read topology {topo_val}")

num_domain_topos = 0

if db_instance.read_from_db("num_domain_topos"):
num_domain_topos = db_instance.read_from_db("num_domain_topos")[
"num_domain_topos"
]

# Initializing TEManager with `None` topology data is a
# work-around for
# https://github.com/atlanticwave-sdx/sdx-controller/issues/145
temanager = TEManager(topology_data=None, connection_data=body)
lc_domain_topo_dict = {}

# Read LC-1, LC-2, LC-3, and LC-4 topologies because of
# https://github.com/atlanticwave-sdx/sdx-controller/issues/152
for i in range(1, int(num_domain_topos) + 2):
lc = f"LC-{i}"
logger.debug(f"Reading {lc} from DB")
curr_topo = db_instance.read_from_db(lc)
if curr_topo is None:
logger.debug(f"Read {lc} from DB: {curr_topo}")
continue
else:
# Get the actual thing minus the Mongo ObjectID.
curr_topo_str = curr_topo.get(lc)
# Just log a substring, not the whole thing.
logger.debug(f"Read {lc} from DB: {curr_topo_str[0:50]}...")

curr_topo_json = json.loads(curr_topo_str)
lc_domain_topo_dict[curr_topo_json["domain_name"]] = curr_topo_json[
"lc_queue_name"
]
logger.debug(f"Adding #{i} topology {curr_topo_json.get('id')} to TEManager")
temanager.add_topology(curr_topo_json)

for num, val in enumerate(temanager.topology_manager.topology_list):
logger.info(f"TE topology #{num}: {val}")

graph = temanager.generate_graph_te()
if graph is None:
return "Could not generate a graph", 400

traffic_matrix = temanager.generate_connection_te()
if traffic_matrix is None:
return "Could not generate a traffic matrix", 400

logger.info(f"Generated graph: '{graph}', traffic matrix: '{traffic_matrix}'")

solver = TESolver(graph, traffic_matrix)
solution = solver.solve()
logger.debug(f"TESolver result: {solution}")

if solution is None or solution.connection_map is None:
return "Could not solve the request", 400

breakdown = temanager.generate_connection_breakdown(solution)
logger.debug(f"-- BREAKDOWN: {json.dumps(breakdown)}")

if breakdown is None:
return "Could not break down the solution", 400

link_connections_dict_json = (
db_instance.read_from_db("link_connections_dict")["link_connections_dict"]
if db_instance.read_from_db("link_connections_dict")
else None
)

if link_connections_dict_json:
link_connections_dict = json.loads(link_connections_dict_json)
else:
link_connections_dict = {}

for domain, link in breakdown.items():
port_list = []
for key in link.keys():
if "uni_" in key and "port_id" in link[key]:
port_list.append(link[key]["port_id"])

if port_list:
simple_link = SimpleLink(port_list).to_string()

if simple_link not in link_connections_dict:
link_connections_dict[simple_link] = []

if body not in link_connections_dict[simple_link]:
link_connections_dict[simple_link].append(body)

db_instance.add_key_value_pair_to_db(
"link_connections_dict", json.dumps(link_connections_dict)
)

logger.debug(f"Attempting to publish domain: {domain}, link: {link}")

# From "urn:ogf:network:sdx:topology:amlight.net", attempt to
# extract a string like "amlight".
domain_name = find_between(domain, "topology:", ".net") or f"{domain}"
exchange_name = "connection"

logger.debug(
f"Publishing '{link}' with exchange_name: {exchange_name}, "
f"routing_key: {domain_name}"
)

producer = TopicQueueProducer(
timeout=5, exchange_name=exchange_name, routing_key=domain_name
)
producer.call(json.dumps(link))
producer.stop_keep_alive()
connection_handler.place_connection(body)

return "Connection published"
147 changes: 135 additions & 12 deletions swagger_server/handlers/connection_handler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import json
import logging

from sdx_pce.load_balancing.te_solver import TESolver
from sdx_pce.topology.temanager import TEManager

from swagger_server.messaging.topic_queue_producer import TopicQueueProducer
from swagger_server.models.simple_link import SimpleLink
from swagger_server.utils.parse_helper import ParseHelper

logger = logging.getLogger(__name__)
logging.getLogger("pika").setLevel(logging.WARNING)
Expand All @@ -10,31 +15,149 @@
class ConnectionHandler:
def __init__(self, db_instance):
self.db_instance = db_instance
pass
self.parse_helper = ParseHelper()

def remove_connection(self, connection):
# call pce to remove connection
pass

def _send_breakdown_to_lc(self, temanager, connection, solution):
breakdown = temanager.generate_connection_breakdown(solution)
logger.debug(f"-- BREAKDOWN: {json.dumps(breakdown)}")

if breakdown is None:
return "Could not break down the solution", 400

link_connections_dict_json = (
self.db_instance.read_from_db("link_connections_dict")[
"link_connections_dict"
]
if self.db_instance.read_from_db("link_connections_dict")
else None
)

if link_connections_dict_json:
link_connections_dict = json.loads(link_connections_dict_json)
else:
link_connections_dict = {}

for domain, link in breakdown.items():
port_list = []
for key in link.keys():
if "uni_" in key and "port_id" in link[key]:
port_list.append(link[key]["port_id"])

if port_list:
simple_link = SimpleLink(port_list).to_string()

if simple_link not in link_connections_dict:
link_connections_dict[simple_link] = []

if connection not in link_connections_dict[simple_link]:
link_connections_dict[simple_link].append(connection)

self.db_instance.add_key_value_pair_to_db(
"link_connections_dict", json.dumps(link_connections_dict)
)

logger.debug(f"Attempting to publish domain: {domain}, link: {link}")

# From "urn:ogf:network:sdx:topology:amlight.net", attempt to
# extract a string like "amlight".
domain_name = (
self.parse_helper.find_between(domain, "topology:", ".net")
or f"{domain}"
)
exchange_name = "connection"

logger.debug(
f"Publishing '{link}' with exchange_name: {exchange_name}, "
f"routing_key: {domain_name}"
)

producer = TopicQueueProducer(
timeout=5, exchange_name=exchange_name, routing_key=domain_name
)
producer.call(json.dumps(link))
producer.stop_keep_alive()

def place_connection(self, connection):
# call pce to generate breakdown, and place connection
pass
num_domain_topos = 0

if self.db_instance.read_from_db("num_domain_topos"):
num_domain_topos = self.db_instance.read_from_db("num_domain_topos")[
"num_domain_topos"
]

# Initializing TEManager with `None` topology data is a
# work-around for
# https://github.com/atlanticwave-sdx/sdx-controller/issues/145
temanager = TEManager(topology_data=None, connection_data=connection)
lc_domain_topo_dict = {}

# Read LC-1, LC-2, LC-3, and LC-4 topologies because of
# https://github.com/atlanticwave-sdx/sdx-controller/issues/152
for i in range(1, int(num_domain_topos) + 2):
lc = f"LC-{i}"
logger.debug(f"Reading {lc} from DB")
curr_topo = self.db_instance.read_from_db(lc)
if curr_topo is None:
logger.debug(f"Read {lc} from DB: {curr_topo}")
continue
else:
# Get the actual thing minus the Mongo ObjectID.
curr_topo_str = curr_topo.get(lc)
# Just log a substring, not the whole thing.
logger.debug(f"Read {lc} from DB: {curr_topo_str[0:50]}...")

curr_topo_json = json.loads(curr_topo_str)
lc_domain_topo_dict[curr_topo_json["domain_name"]] = curr_topo_json[
"lc_queue_name"
]
logger.debug(
f"Adding #{i} topology {curr_topo_json.get('id')} to TEManager"
)
temanager.add_topology(curr_topo_json)

for num, val in enumerate(temanager.topology_manager.topology_list):
logger.info(f"TE topology #{num}: {val}")

graph = temanager.generate_graph_te()
if graph is None:
return "Could not generate a graph", 400

traffic_matrix = temanager.generate_connection_te()
if traffic_matrix is None:
return "Could not generate a traffic matrix", 400

logger.info(f"Generated graph: '{graph}', traffic matrix: '{traffic_matrix}'")

solver = TESolver(graph, traffic_matrix)
solution = solver.solve()
logger.debug(f"TESolver result: {solution}")

if solution is None or solution.connection_map is None:
return "Could not solve the request", 400

self._send_breakdown_to_lc(temanager, connection, solution)

def handle_link_failure(self, msg_json):
logger.debug("Handling connections that contain failed link.")
if self.db_instance.read_from_db("link_connections_dict") is None:
logger.debug("No connection has been placed yet.")
return
logger.debug("---Handling connections that contain failed link.---")
link_connections_dict_str = self.db_instance.read_from_db(
"link_connections_dict"
)

if link_connections_dict_str:
link_connections_dict = json.loads(
link_connections_dict_str["link_connections_dict"]
)
else:
logger.debug("Failed to retrieve link_connections_dict from DB.")
if (
not link_connections_dict_str
or not link_connections_dict_str["link_connections_dict"]
):
logger.debug("No connection has been placed yet.")
return

link_connections_dict = json.loads(
link_connections_dict_str["link_connections_dict"]
)

for link in msg_json["link_failure"]:
port_list = []
Expand Down
3 changes: 2 additions & 1 deletion swagger_server/handlers/lc_message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def process_lc_json_msg(
msg_version = msg_json["version"]

lc_queue_name = msg_json["lc_queue_name"]
logger.debug("---lc_queue_name:---")
logger.debug("Processing LC message: lc_queue_name:")
logger.debug(lc_queue_name)

domain_name = self.parse_helper.find_between(msg_id, "topology:", ".net")
Expand All @@ -42,6 +42,7 @@ def process_lc_json_msg(
# Update existing topology
if domain_name in domain_list:
logger.info("Updating topo")
logger.debug(msg_json)
self.manager.update_topology(msg_json)
if "link_failure" in msg_json:
logger.info("Processing link failure.")
Expand Down
Loading

0 comments on commit b65b5b2

Please sign in to comment.