Skip to content

Commit

Permalink
load CBM as part of controller startup
Browse files Browse the repository at this point in the history
  • Loading branch information
kthare10 committed Sep 20, 2024
1 parent 9801cc3 commit d128b3c
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 2 deletions.
44 changes: 44 additions & 0 deletions fabric_cf/actor/core/core/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
import traceback
from typing import TYPE_CHECKING

from fabric_cf.actor.fim.fim_helper import FimHelper
from fim.graph.networkx_property_graph_disjoint import NetworkXGraphImporterDisjoint
from fim.graph.resources.networkx_abqm import NetworkXABQMFactory
from fim.user import GraphFormat
from fim.user.topology import AdvertizedTopology

from fabric_cf.actor.fim.plugins.broker.aggregate_bqm_plugin import AggregatedBQMPlugin
from fim.pluggable import PluggableRegistry, PluggableType
from fim.slivers.base_sliver import BaseSliver
Expand Down Expand Up @@ -86,6 +92,8 @@ def __init__(self, *, identity: AuthToken = None, clock: ActorClock = None):
self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=2,
thread_name_prefix=self.__class__.__name__)
self.pluggable_registry = PluggableRegistry()
self.combined_broker_model_graph_id = None
self.combined_broker_model = None

def __getstate__(self):
state = self.__dict__.copy()
Expand Down Expand Up @@ -113,6 +121,9 @@ def __getstate__(self):
if hasattr(self, 'pluggable_registry'):
del state['pluggable_registry']

if hasattr(self, 'combined_broker_model'):
del state['combined_broker_model']

return state

def __setstate__(self, state):
Expand Down Expand Up @@ -290,6 +301,9 @@ def initialize(self, *, config: ActorConfig):
self.registry.set_slices_plugin(plugin=self.plugin)
self.registry.initialize()

# Load the combined broker model with level=0
self.load_combined_broker_model()

self.initialized = True

def process_redeeming(self):
Expand Down Expand Up @@ -459,6 +473,36 @@ def poa_info(self, *, poa: Poa, caller: AuthToken):

self.wrapper.poa_info(poa=poa, caller=caller)

def load_combined_broker_model(self):
if self.combined_broker_model_graph_id is None:
self.logger.debug("Creating an empty Combined Broker Model Graph")
from fabric_cf.actor.core.manage.management_utils import ManagementUtils
mgmt_actor = ManagementUtils.get_local_actor()
brokers = self.get_brokers()
broker = None
if brokers is not None:
broker = ID(uid=next(iter(brokers), None).get_guid())
if not broker:
self.logger.error("Unable to determine the Broker ID")
return

level = 0
graph_format = GraphFormat.GRAPHML
model = mgmt_actor.get_broker_query_model(broker=broker, level=level, graph_format=graph_format,
id_token=None)

if model is None or model.get_model() is None or model.get_model() == '':
self.logger.error(f"Resource(s) not found for level: {level} format: {graph_format}!")
return
self.combined_broker_model = FimHelper.get_neo4j_cbm_graph_from_string_direct(
graph_str=model.get_model(), ignore_validation=True)
else:
self.logger.debug(f"Loading an existing Combined Broker Model Graph: {self.combined_broker_model_graph_id}")
self.combined_broker_model = FimHelper.get_neo4j_cbm_graph(graph_id=self.combined_broker_model_graph_id)
self.combined_broker_model_graph_id = self.combined_broker_model.get_graph_id()
self.logger.debug(
f"Successfully loaded an Combined Broker Model Graph: {self.combined_broker_model_graph_id}")

@staticmethod
def get_management_object_class() -> str:
return ControllerManagementObject.__name__
Expand Down
2 changes: 1 addition & 1 deletion fabric_cf/actor/core/kernel/reservation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ def probe_join_state(self):

# This is a regular request for modifying network resources to an upstream broker.
self.sequence_ticket_out += 1
print(f"Issuing an extend ticket {sliver_to_str(sliver=self.get_requested_resources().get_sliver())}")
self.logger.debug(f"Issuing an extend ticket {sliver_to_str(sliver=self.get_requested_resources().get_sliver())}")
RPCManagerSingleton.get().extend_ticket(reservation=self)

# Update ASM with Reservation Info
Expand Down
1 change: 0 additions & 1 deletion fabric_cf/actor/core/policy/network_node_inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,6 @@ def __check_components(self, *, rid: ID, requested_components: AttachedComponent

for c in comps_to_remove:
self.logger.debug(f"Excluding component: {c.get_name()}")
print(f"Excluding component: {c.get_name()}")
graph_node.attached_components_info.remove_device(name=c.get_name())

self.logger.debug(f"requested_components: {requested_components.devices.values()} for reservation# {rid}")
Expand Down

0 comments on commit d128b3c

Please sign in to comment.