Source code for openoptics.DeviceManager

# Copyright (c) Max-Planck-Gesellschaft zur Förderung der Wissenschaften e.V.
# Developed at the Max Planck Institute for Informatics, Network and Cloud Systems Group
#
# Author: Yiming Lei (ylei@mpi-inf.mpg.de)
#
# This software is licensed for non-commercial scientific research purposes only.
#
# License text: Creative Commons NC BY SA 4.0
# https://creativecommons.org/licenses/by-nc-sa/4.0/deed.en

import sys

from openoptics.backends.base import BackendBase


[docs] class DeviceManager: """ OpenOptics DeviceManager. Monitor and configure the network at runtime. """ def __init__(self, backend: BackendBase, tor_ocs_ports, nb_queue, event_publisher=None): sys.path.insert(1, "../behavioral-model/targets/tor_switch") sys.path.insert(1, "../behavioral-model/tools") from tswitch_CLI import TorSwitchAPI import runtime_CLI self.switches = backend.get_tor_switches() self.optical_switches = backend.get_optical_switches() self.switch_clients = {} # ToR switch name -> custom thrift client services = TorSwitchAPI.get_thrift_services() for sw in self.switches: switch_client = runtime_CLI.thrift_connect( "localhost", sw.thrift_port, services )[0] self.switch_clients[sw.name] = switch_client # OCS switches: we only need the standard BMv2 runtime client for # `bm_counter_read` — hits/misses are scraped from P4 counters. self.ocs_standard_clients = {} # OCS switch name -> Standard client std_services = runtime_CLI.RuntimeAPI.get_thrift_services( runtime_CLI.PreType.SimplePreLAG ) for sw in self.optical_switches: try: standard_client = runtime_CLI.thrift_connect( "localhost", sw.thrift_port, std_services )[0] self.ocs_standard_clients[sw.name] = standard_client except Exception: # OCS absent / not reachable: skip, get_ocs_metric() returns {} for it. pass self.tor_ocs_ports = tor_ocs_ports self.nb_queue = nb_queue self._event_publisher = event_publisher self._ocs_counters_warned = False
[docs] def get_device_metric(self) -> dict: """ Get device metric (queue depth, loss rate, latency, ...) Return: A dict per switch with keys: - ``pq_depth``: ``{(port, queue): depth_in_packets}`` - ``pq_latency``: ``{(port, queue): (mean_us, max_us)}`` — only present for (port, queue) pairs that had at least one sample in the current window. - ``drop_ctr``: aggregate drop counter """ dict_device_metric = {} for sw_name, switch_client in self.switch_clients.items(): try: device_metric = switch_client.get_device_metric() dict_device_metric[sw_name] = { "pq_depth": {}, "pq_latency": {}, "drop_ctr": device_metric.drop_ctr, } for pq_metric in device_metric.port_queue_metrics: key = (pq_metric.port, pq_metric.queue) dict_device_metric[sw_name]["pq_depth"][key] = pq_metric.depth mean = getattr(pq_metric, "latency_us_mean", None) mx = getattr(pq_metric, "latency_us_max", None) # Drop any negative value — the latency fields are i32 and # a negative reading means a pre-fix BMv2 wrote UINT32_MAX # from a PHV-truncated timestamp. Not a real latency. if mean is not None and mx is not None and mean >= 0 and mx >= 0: dict_device_metric[sw_name]["pq_latency"][key] = (int(mean), int(mx)) except Exception: dict_device_metric[sw_name] = { "pq_depth": {}, "pq_latency": {}, "drop_ctr": 0, } return dict_device_metric
# Must match the counter names declared in # openoptics/backends/mininet/p4src/ocs/ocs.p4; if you rename them there, # rebuild ocs.json and update these constants together. _OCS_HIT_COUNTER = "MyIngress.ocs_hit_counter" _OCS_MISS_COUNTER = "MyIngress.ocs_miss_counter" def get_ocs_metric(self) -> dict: """ Scrape OCS schedule hit/miss counters from every optical switch. Returns: ``{sw_name: {port: (hits, misses)}}``. Every wired OCS ingress port is included even with (0, 0) so the dashboard chart renders from epoch start; cumulative counters naturally grow once traffic flows. A per-switch dict may be empty if the BMv2 target doesn't define the counters (old binary / stale ``ocs.json``); this is logged once per process so users know to rebuild. """ result: dict = {} nb_ports = len(self.tor_ocs_ports) * max(1, len(self.switches)) for sw_name, std_client in self.ocs_standard_clients.items(): per_port: dict = {} for port in range(nb_ports): try: hit = std_client.bm_counter_read(0, self._OCS_HIT_COUNTER, port) miss = std_client.bm_counter_read(0, self._OCS_MISS_COUNTER, port) except Exception: if not self._ocs_counters_warned: print( f"[OpenOptics] OCS '{sw_name}' does not expose " f"{self._OCS_HIT_COUNTER!r} / {self._OCS_MISS_COUNTER!r}. " f"Rebuild ocs.json with the current ocs.p4 to enable " f"schedule hit/miss metrics." ) self._ocs_counters_warned = True break # whole switch lacks the counters; stop scanning it per_port[port] = (int(hit.packets), int(miss.packets)) result[sw_name] = per_port return result
[docs] def set_active_queue(self, sw_name, active_qid): """ Set the active queue for a specific switch. Args: sw_name: The name of the switch to configure active_qid: The ID of the queue to set as active """ try: self.switch_clients[sw_name].set_active_queue(active_qid) except Exception: return if self._event_publisher is not None: self._event_publisher.emit(sw_name, active_qid)