Skip to content

Documentation for Sanetwork Module

SANetwork

Bases: SAMComponent

Network situational awareness component responsible for monitoring and managing the communication context within the federation.

This component handles
  • Tracking active and potential peer nodes.
  • Evaluating network conditions for situational awareness decisions.
  • Integrating with discovery and reasoning modules for dynamic topology updates.

Inherits from SAMComponent to participate in the broader Situational Awareness pipeline.

Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
class SANetwork(SAMComponent):
    """
    Network situational awareness component responsible for monitoring and managing
    the communication context within the federation.

    This component handles:
      - Tracking active and potential peer nodes.
      - Evaluating network conditions for situational awareness decisions.
      - Integrating with discovery and reasoning modules for dynamic topology updates.

    Inherits from SAMComponent to participate in the broader Situational Awareness pipeline.
    """

    NEIGHBOR_VERIFICATION_TIMEOUT = 30

    def __init__(self, config):
        self._neighbor_policy = config["neighbor_policy"]  # topology
        self._neighbor_policy = self._neighbor_policy.lower()
        self._strict_topology = config["strict_topology"]  # strict_topology
        print_msg_box(
            msg=f"Starting Network SA\nNeighbor Policy: {self._neighbor_policy}\nStrict: {self._strict_topology}",
            indent=2,
            title="Network SA module",
        )
        self._sar = config["sar"]  # sar
        self._addr = config["addr"]  # addr
        self._neighbor_policy = factory_NeighborPolicy(self._neighbor_policy)
        self._restructure_process_lock = Locker(name="restructure_process_lock", async_lock=True)
        self._restructure_cooldown = 0
        self._verbose = config["verbose"]  # verbose
        self._cm = CommunicationsManager.get_instance()
        self._sa_network_agent = SANetworkAgent(self)

        # Track verification tasks for proper cleanup during shutdown
        self._verification_tasks = set()
        self._verification_tasks_lock = asyncio.Lock()

    @property
    def sar(self) -> SAReasoner:
        """SA Reasoner"""
        return self._sar

    @property
    def cm(self):
        """Communication Manager"""
        return self._cm

    @property
    def np(self):
        """Neighbor Policy"""
        return self._neighbor_policy

    @property
    def sana(self):
        """SA Network Agent"""
        return self._sa_network_agent

    async def init(self):
        """
        Initialize the SANetwork component by deploying external connection services,
        subscribing to relevant events, starting beaconing, and configuring neighbor policies.

        Actions performed:
        1. If not an additional participant, start and subscribe to beacon and finish events.
        2. Otherwise, initialize ECS without running it.
        3. Build and apply the neighbor policy using current direct and undirected connections.
        4. Subscribe to node discovery and neighbor update events.
        5. Register this agent with the situational awareness network agent.
        """
        if not self.sar.is_additional_participant():
            logging.info("Deploying External Connection Service")
            await self.cm.start_external_connection_service()
            await EventManager.get_instance().subscribe_node_event(BeaconRecievedEvent, self.beacon_received)
            await EventManager.get_instance().subscribe_node_event(ExperimentFinishEvent, self.experiment_finish)
            await self.cm.start_beacon()
        else:
            logging.info("Deploying External Connection Service | No running")
            await self.cm.start_external_connection_service(run_service=False)

        logging.info("Building neighbor policy configuration..")
        await self.np.set_config([
            await self.cm.get_addrs_current_connections(only_direct=True, myself=False),
            await self.cm.get_addrs_current_connections(only_direct=False, only_undirected=False, myself=False),
            self._addr,
            self._strict_topology,
        ])

        await EventManager.get_instance().subscribe_node_event(NodeFoundEvent, self._process_node_found_event)
        await EventManager.get_instance().subscribe_node_event(UpdateNeighborEvent, self._process_update_neighbor_event)
        await self.sana.register_sa_agent()

    async def sa_component_actions(self):
        """
        Perform periodic situational awareness checks for network conditions.

        This method evaluates the external connection service status and analyzes
        the robustness of the current network topology.
        """
        logging.info("SA Network evaluating current scenario")
        await self._check_external_connection_service_status()
        await self._analize_topology_robustness()

    """                                                     ###############################
                                                            #       NEIGHBOR POLICY       #
                                                            ###############################
    """

    async def _process_node_found_event(self, nfe: NodeFoundEvent):
        """
        Handle an event indicating a new node has been discovered.

        Args:
            nfe (NodeFoundEvent): The event containing the discovered node's address.
        """
        node_addr = await nfe.get_event_data()
        await self.np.meet_node(node_addr)

    async def _process_update_neighbor_event(self, une: UpdateNeighborEvent):
        """
        Handle an update to the neighbor set, such as node join or leave.

        Args:
            une (UpdateNeighborEvent): The event containing the neighbor address and removal flag.
        """
        node_addr, removed = await une.get_event_data()
        if self._verbose:
            logging.info(f"Processing Update Neighbor Event, node addr: {node_addr}, remove: {removed}")
        await self.np.update_neighbors(node_addr, removed)

    async def meet_node(self, node):
        """
        Propose a meeting (connection) with a newly discovered node if it is not self.

        Args:
            node (str): The address of the node to meet.
        """
        if node != self._addr:
            await self.np.meet_node(node)

    async def get_nodes_known(self, neighbors_too=False, neighbors_only=False):
        """
        Retrieve the list of known nodes in the network.

        Args:
            neighbors_too (bool, optional): Include neighbors in the result. Defaults to False.
            neighbors_only (bool, optional): Return only neighbors. Defaults to False.

        Returns:
            set: Addresses of known nodes based on the provided filters.
        """
        return await self.np.get_nodes_known(neighbors_too, neighbors_only)

    async def neighbors_left(self):
        """
        Check whether any direct neighbor connections remain.

        Returns:
            bool: True if there are one or more direct neighbor connections, False otherwise.
        """
        return len(await self.cm.get_addrs_current_connections(only_direct=True, myself=False)) > 0

    async def accept_connection(self, source, joining=False):
        """
        Decide whether to accept an incoming connection request from a source node.

        Args:
            source (str): The address of the requesting node.
            joining (bool, optional): True if this is part of a join process. Defaults to False.

        Returns:
            bool: True if the connection should be accepted, False otherwise.
        """
        accepted = await self.np.accept_connection(source, joining)
        return accepted

    async def need_more_neighbors(self):
        """
        Determine if the network requires additional neighbor connections.

        Returns:
            bool: True if more neighbors are needed, False otherwise.
        """
        return await self.np.need_more_neighbors()

    async def get_actions(self):
        """
        Retrieve the set of situational awareness actions applicable to the current network state.

        Returns:
            list: Identifiers of available network actions.
        """
        return await self.np.get_actions()

    """                                                     ###############################
                                                            # EXTERNAL CONNECTION SERVICE #
                                                            ###############################
    """

    async def _check_external_connection_service_status(self):
        """
        Ensure the external connection service is running; if not, initialize and start beaconing.

        This method checks the ECS status, starts it if necessary,
        subscribes to beacon events, and initiates beacon transmission.
        """
        if not await self.cm.is_external_connection_service_running():
            logging.info("🔄 External Service not running | Starting service...")
            await self.cm.init_external_connection_service()
            await EventManager.get_instance().subscribe_node_event(BeaconRecievedEvent, self.beacon_received)
            await self.cm.start_beacon()

    async def experiment_finish(self, efe: ExperimentFinishEvent):
        """
        Handle the completion of an experiment by shutting down the external connection service.

        Args:
            efe (ExperimentFinishEvent): The event indicating the experiment has finished.
        """
        await self.cm.stop_external_connection_service()

    async def beacon_received(self, beacon_recieved_event: BeaconRecievedEvent):
        """
        Process a received beacon event by publishing a NodeFoundEvent for the given address.

        Extracts the address and geolocation from the beacon event and notifies
        the system that a new node has been discovered.

        Args:
            beacon_recieved_event (BeaconRecievedEvent): The event containing beacon data.
        """
        addr, geoloc = await beacon_recieved_event.get_event_data()
        latitude, longitude = geoloc
        nfe = NodeFoundEvent(addr)
        asyncio.create_task(EventManager.get_instance().publish_node_event(nfe))

    """                                                     ###############################
                                                            #    REESTRUCTURE TOPOLOGY    #
                                                            ###############################
    """

    def _update_restructure_cooldown(self):
        """
        Decrement or wrap the restructure cooldown counter.

        Uses modulo arithmetic to ensure the cooldown cycles correctly,
        preventing frequent restructuring operations.
        """
        if self._restructure_cooldown > 0:
            self._restructure_cooldown = (self._restructure_cooldown + 1) % RESTRUCTURE_COOLDOWN

    def _restructure_available(self):
        """
        Check if restructuring is currently allowed based on the cooldown.

        Returns:
            bool: True if cooldown is zero (restructure allowed), False otherwise.
        """
        if self._restructure_cooldown:
            if self._verbose:
                logging.info("Reestructure on cooldown")
        return self._restructure_cooldown == 0

    def get_restructure_process_lock(self):
        """
        Retrieve the asynchronous lock protecting the restructure process.

        Returns:
            asyncio.Lock: Lock to ensure only one restructure operation runs at a time.
        """
        return self._restructure_process_lock

    async def _analize_topology_robustness(self):
        """
        Analyze the current network topology to assess robustness and suggest SA actions.

        Performs the following checks:
        1. If no neighbors remain, suggest reconnection to the federation.
        2. If more neighbors are needed and restructuring is off cooldown, suggest removing or searching for neighbors.
        3. If excess neighbors exist, suggest disconnecting according to policy.
        4. Otherwise, suggest maintaining current connections.
        5. If a restructure is already in progress, suggest idling.

        Uses neighbor policy decisions and cooldown logic to produce situational awareness commands.
        """
        # TODO update the way of checking
        logging.info("🔄 Analizing node network robustness...")
        if not self._restructure_process_lock.locked():
            if not await self.neighbors_left():
                if self._verbose:
                    logging.info("No Neighbors left | reconnecting with Federation")
                await self.sana.create_and_suggest_action(
                    SACommandAction.RECONNECT, self.reconnect_to_federation, False, None
                )
            elif await self.np.need_more_neighbors() and self._restructure_available():
                if self._verbose:
                    logging.info("Suggesting to Remove neighbors according to policy...")
                if await self.np.any_leftovers_neighbors():
                    nodes_to_remove = await self.np.get_neighbors_to_remove()
                    await self.sana.create_and_suggest_action(
                        SACommandAction.DISCONNECT, self.cm.disconnect, True, nodes_to_remove
                    )
                if self._verbose:
                    logging.info("Insufficient Robustness | Upgrading robustness | Searching for more connections")
                self._update_restructure_cooldown()
                possible_neighbors = await self.np.get_posible_neighbors()
                possible_neighbors = await self.cm.apply_restrictions(possible_neighbors)
                if not possible_neighbors:
                    if self._verbose:
                        logging.info("All possible neighbors using nodes known are restricted...")
                else:
                    pass
                await self.sana.create_and_suggest_action(
                    SACommandAction.SEARCH_CONNECTIONS, self.upgrade_connection_robustness, False, possible_neighbors
                )
            elif await self.np.any_leftovers_neighbors():
                nodes_to_remove = await self.np.get_neighbors_to_remove()
                if self._verbose:
                    logging.info(f"Excess neighbors | removing: {list(nodes_to_remove)}")
                await self.sana.create_and_suggest_action(
                    SACommandAction.DISCONNECT, self.cm.disconnect, False, nodes_to_remove
                )
            else:
                if self._verbose:
                    logging.info("Sufficient Robustness | no actions required")
                await self.sana.create_and_suggest_action(
                    SACommandAction.MAINTAIN_CONNECTIONS,
                    self.cm.clear_unused_undirect_connections,
                    more_suggestions=False,
                )
        else:
            if self._verbose:
                logging.info("❗️ Reestructure/Reconnecting process already running...")
            await self.sana.create_and_suggest_action(SACommandAction.IDLE, more_suggestions=False)

    async def reconnect_to_federation(self):
        """
        Clear any connection restrictions and initiate a late‐connection discovery process
        to rejoin the federation.

        Steps:
        1. Acquire the restructure lock.
        2. Clear blacklist and recently disconnected restrictions.
        3. If known node addresses exist, use them for discovery; otherwise, perform a fresh discovery.
        4. Release the restructure lock.
        """
        logging.info("Going to reconnect with federation...")
        await self._restructure_process_lock.acquire_async()
        await self.cm.clear_restrictions()
        # If we got some refs, try to reconnect to them
        if len(await self.np.get_nodes_known()) > 0:
            if self._verbose:
                logging.info("Reconnecting | Addrs availables")
            await self.sar.sad.start_late_connection_process(
                connected=False, msg_type="discover_nodes", addrs_known=await self.np.get_nodes_known()
            )
        else:
            if self._verbose:
                logging.info("Reconnecting | NO Addrs availables")
            await self.sar.sad.start_late_connection_process(connected=False, msg_type="discover_nodes")
        await self._restructure_process_lock.release_async()

    async def upgrade_connection_robustness(self, possible_neighbors):
        """
        Attempt to strengthen network robustness by discovering or reconnecting to additional neighbors.

        Steps:
        1. Acquire the restructure lock.
        2. If possible_neighbors is non‐empty, use them for a targeted late‐connection discovery.
        3. Otherwise, perform a generic discovery of federation nodes.
        4. Release the restructure lock.

        Args:
            possible_neighbors (set): Addresses of candidate nodes for connection enhancement.
        """
        await self._restructure_process_lock.acquire_async()
        # If we got some refs, try to connect to them
        if possible_neighbors and len(possible_neighbors) > 0:
            if self._verbose:
                logging.info(f"Reestructuring | Addrs availables | addr list: {possible_neighbors}")
            await self.sar.sad.start_late_connection_process(
                connected=True, msg_type="discover_nodes", addrs_known=possible_neighbors
            )
        else:
            if self._verbose:
                logging.info("Reestructuring | NO Addrs availables")
            await self.sar.sad.start_late_connection_process(connected=True, msg_type="discover_nodes")
        await self._restructure_process_lock.release_async()

    async def stop_connections_with_federation(self):
        """
        Disconnect from all current federation neighbors after a short delay.

        1. Waits for a predefined sleep period (to allow in‐flight messages to complete).
        2. Blacklists each direct neighbor.
        3. Disconnects from each neighbor without mutual handshake.
        """
        await asyncio.sleep(10)
        logging.info("### DISCONNECTING FROM FEDERATON ###")
        neighbors = await self.np.get_nodes_known(neighbors_only=True)
        for n in neighbors:
            await self.cm.add_to_blacklist(n)
        for n in neighbors:
            await self.cm.disconnect(n, mutual_disconnection=False, forced=True)

    async def verify_neighbors_stablished(self, nodes: set):
        """
        Verify that a set of connection attempts has succeeded within a timeout.

        Args:
            nodes (set): The set of node addresses for which connections were attempted.

        Behavior:
        1. Sleeps for NEIGHBOR_VERIFICATION_TIMEOUT seconds.
        2. Compares the originally requested nodes against the currently known neighbors.
        3. Logs any addresses that failed to establish and instructs the policy to forget them.
        """
        if not nodes:
            return

        await asyncio.sleep(self.NEIGHBOR_VERIFICATION_TIMEOUT)
        logging.info("Verifyng all connections were stablished")
        nodes_to_forget = nodes.copy()
        neighbors = await self.np.get_nodes_known(neighbors_only=True)
        if neighbors:
            nodes_to_forget.difference_update(neighbors)
        logging.info(f"Connections dont stablished: {nodes_to_forget}")
        await self.forget_nodes(nodes_to_forget)

    async def create_verification_task(self, nodes: set):
        """
        Create and track a verification task for neighbor establishment.

        Args:
            nodes (set): The set of node addresses for which connections were attempted.

        Returns:
            asyncio.Task: The created verification task.
        """
        verification_task = asyncio.create_task(self.verify_neighbors_stablished(nodes))

        async with self._verification_tasks_lock:
            self._verification_tasks.add(verification_task)

        return verification_task

    async def forget_nodes(self, nodes_to_forget):
        """
        Instruct the neighbor policy to remove specified nodes from its known set.

        Args:
            nodes_to_forget (set): Addresses of nodes to be purged from policy memory.
        """
        await self.np.forget_nodes(nodes_to_forget)

    async def stop(self):
        """
        Stop the SANetwork component by releasing locks and clearing any pending operations.
        """
        logging.info("🛑  Stopping SANetwork...")

        # Cancel all verification tasks
        async with self._verification_tasks_lock:
            if self._verification_tasks:
                tasks_to_cancel = [task for task in self._verification_tasks if not task.done()]
                logging.info(f"🛑  Cancelling {len(tasks_to_cancel)} verification tasks...")
                for task in tasks_to_cancel:
                    task.cancel()
                    try:
                        await task
                    except asyncio.CancelledError:
                        pass
                self._verification_tasks.clear()
                logging.info("🛑  All verification tasks cancelled")

        # Release any held locks
        try:
            if self._restructure_process_lock.locked():
                self._restructure_process_lock.release()
        except Exception as e:
            logging.warning(f"Error releasing restructure_process_lock: {e}")

        logging.info("✅  SANetwork stopped successfully")

    """                                                     ###############################
                                                            #       SA NETWORK AGENT      #
                                                            ###############################
    """

cm property

Communication Manager

np property

Neighbor Policy

sana property

SA Network Agent

sar property

SA Reasoner

accept_connection(source, joining=False) async

Decide whether to accept an incoming connection request from a source node.

Parameters:

Name Type Description Default
source str

The address of the requesting node.

required
joining bool

True if this is part of a join process. Defaults to False.

False

Returns:

Name Type Description
bool

True if the connection should be accepted, False otherwise.

Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
198
199
200
201
202
203
204
205
206
207
208
209
210
async def accept_connection(self, source, joining=False):
    """
    Decide whether to accept an incoming connection request from a source node.

    Args:
        source (str): The address of the requesting node.
        joining (bool, optional): True if this is part of a join process. Defaults to False.

    Returns:
        bool: True if the connection should be accepted, False otherwise.
    """
    accepted = await self.np.accept_connection(source, joining)
    return accepted

beacon_received(beacon_recieved_event) async

Process a received beacon event by publishing a NodeFoundEvent for the given address.

Extracts the address and geolocation from the beacon event and notifies the system that a new node has been discovered.

Parameters:

Name Type Description Default
beacon_recieved_event BeaconRecievedEvent

The event containing beacon data.

required
Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
async def beacon_received(self, beacon_recieved_event: BeaconRecievedEvent):
    """
    Process a received beacon event by publishing a NodeFoundEvent for the given address.

    Extracts the address and geolocation from the beacon event and notifies
    the system that a new node has been discovered.

    Args:
        beacon_recieved_event (BeaconRecievedEvent): The event containing beacon data.
    """
    addr, geoloc = await beacon_recieved_event.get_event_data()
    latitude, longitude = geoloc
    nfe = NodeFoundEvent(addr)
    asyncio.create_task(EventManager.get_instance().publish_node_event(nfe))

create_verification_task(nodes) async

Create and track a verification task for neighbor establishment.

Parameters:

Name Type Description Default
nodes set

The set of node addresses for which connections were attempted.

required

Returns:

Type Description

asyncio.Task: The created verification task.

Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
async def create_verification_task(self, nodes: set):
    """
    Create and track a verification task for neighbor establishment.

    Args:
        nodes (set): The set of node addresses for which connections were attempted.

    Returns:
        asyncio.Task: The created verification task.
    """
    verification_task = asyncio.create_task(self.verify_neighbors_stablished(nodes))

    async with self._verification_tasks_lock:
        self._verification_tasks.add(verification_task)

    return verification_task

experiment_finish(efe) async

Handle the completion of an experiment by shutting down the external connection service.

Parameters:

Name Type Description Default
efe ExperimentFinishEvent

The event indicating the experiment has finished.

required
Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
248
249
250
251
252
253
254
255
async def experiment_finish(self, efe: ExperimentFinishEvent):
    """
    Handle the completion of an experiment by shutting down the external connection service.

    Args:
        efe (ExperimentFinishEvent): The event indicating the experiment has finished.
    """
    await self.cm.stop_external_connection_service()

forget_nodes(nodes_to_forget) async

Instruct the neighbor policy to remove specified nodes from its known set.

Parameters:

Name Type Description Default
nodes_to_forget set

Addresses of nodes to be purged from policy memory.

required
Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
482
483
484
485
486
487
488
489
async def forget_nodes(self, nodes_to_forget):
    """
    Instruct the neighbor policy to remove specified nodes from its known set.

    Args:
        nodes_to_forget (set): Addresses of nodes to be purged from policy memory.
    """
    await self.np.forget_nodes(nodes_to_forget)

get_actions() async

Retrieve the set of situational awareness actions applicable to the current network state.

Returns:

Name Type Description
list

Identifiers of available network actions.

Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
221
222
223
224
225
226
227
228
async def get_actions(self):
    """
    Retrieve the set of situational awareness actions applicable to the current network state.

    Returns:
        list: Identifiers of available network actions.
    """
    return await self.np.get_actions()

get_nodes_known(neighbors_too=False, neighbors_only=False) async

Retrieve the list of known nodes in the network.

Parameters:

Name Type Description Default
neighbors_too bool

Include neighbors in the result. Defaults to False.

False
neighbors_only bool

Return only neighbors. Defaults to False.

False

Returns:

Name Type Description
set

Addresses of known nodes based on the provided filters.

Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
176
177
178
179
180
181
182
183
184
185
186
187
async def get_nodes_known(self, neighbors_too=False, neighbors_only=False):
    """
    Retrieve the list of known nodes in the network.

    Args:
        neighbors_too (bool, optional): Include neighbors in the result. Defaults to False.
        neighbors_only (bool, optional): Return only neighbors. Defaults to False.

    Returns:
        set: Addresses of known nodes based on the provided filters.
    """
    return await self.np.get_nodes_known(neighbors_too, neighbors_only)

get_restructure_process_lock()

Retrieve the asynchronous lock protecting the restructure process.

Returns:

Type Description

asyncio.Lock: Lock to ensure only one restructure operation runs at a time.

Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
299
300
301
302
303
304
305
306
def get_restructure_process_lock(self):
    """
    Retrieve the asynchronous lock protecting the restructure process.

    Returns:
        asyncio.Lock: Lock to ensure only one restructure operation runs at a time.
    """
    return self._restructure_process_lock

init() async

Initialize the SANetwork component by deploying external connection services, subscribing to relevant events, starting beaconing, and configuring neighbor policies.

Actions performed: 1. If not an additional participant, start and subscribe to beacon and finish events. 2. Otherwise, initialize ECS without running it. 3. Build and apply the neighbor policy using current direct and undirected connections. 4. Subscribe to node discovery and neighbor update events. 5. Register this agent with the situational awareness network agent.

Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
async def init(self):
    """
    Initialize the SANetwork component by deploying external connection services,
    subscribing to relevant events, starting beaconing, and configuring neighbor policies.

    Actions performed:
    1. If not an additional participant, start and subscribe to beacon and finish events.
    2. Otherwise, initialize ECS without running it.
    3. Build and apply the neighbor policy using current direct and undirected connections.
    4. Subscribe to node discovery and neighbor update events.
    5. Register this agent with the situational awareness network agent.
    """
    if not self.sar.is_additional_participant():
        logging.info("Deploying External Connection Service")
        await self.cm.start_external_connection_service()
        await EventManager.get_instance().subscribe_node_event(BeaconRecievedEvent, self.beacon_received)
        await EventManager.get_instance().subscribe_node_event(ExperimentFinishEvent, self.experiment_finish)
        await self.cm.start_beacon()
    else:
        logging.info("Deploying External Connection Service | No running")
        await self.cm.start_external_connection_service(run_service=False)

    logging.info("Building neighbor policy configuration..")
    await self.np.set_config([
        await self.cm.get_addrs_current_connections(only_direct=True, myself=False),
        await self.cm.get_addrs_current_connections(only_direct=False, only_undirected=False, myself=False),
        self._addr,
        self._strict_topology,
    ])

    await EventManager.get_instance().subscribe_node_event(NodeFoundEvent, self._process_node_found_event)
    await EventManager.get_instance().subscribe_node_event(UpdateNeighborEvent, self._process_update_neighbor_event)
    await self.sana.register_sa_agent()

meet_node(node) async

Propose a meeting (connection) with a newly discovered node if it is not self.

Parameters:

Name Type Description Default
node str

The address of the node to meet.

required
Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
166
167
168
169
170
171
172
173
174
async def meet_node(self, node):
    """
    Propose a meeting (connection) with a newly discovered node if it is not self.

    Args:
        node (str): The address of the node to meet.
    """
    if node != self._addr:
        await self.np.meet_node(node)

need_more_neighbors() async

Determine if the network requires additional neighbor connections.

Returns:

Name Type Description
bool

True if more neighbors are needed, False otherwise.

Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
212
213
214
215
216
217
218
219
async def need_more_neighbors(self):
    """
    Determine if the network requires additional neighbor connections.

    Returns:
        bool: True if more neighbors are needed, False otherwise.
    """
    return await self.np.need_more_neighbors()

neighbors_left() async

Check whether any direct neighbor connections remain.

Returns:

Name Type Description
bool

True if there are one or more direct neighbor connections, False otherwise.

Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
189
190
191
192
193
194
195
196
async def neighbors_left(self):
    """
    Check whether any direct neighbor connections remain.

    Returns:
        bool: True if there are one or more direct neighbor connections, False otherwise.
    """
    return len(await self.cm.get_addrs_current_connections(only_direct=True, myself=False)) > 0

reconnect_to_federation() async

Clear any connection restrictions and initiate a late‐connection discovery process to rejoin the federation.

Steps: 1. Acquire the restructure lock. 2. Clear blacklist and recently disconnected restrictions. 3. If known node addresses exist, use them for discovery; otherwise, perform a fresh discovery. 4. Release the restructure lock.

Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
async def reconnect_to_federation(self):
    """
    Clear any connection restrictions and initiate a late‐connection discovery process
    to rejoin the federation.

    Steps:
    1. Acquire the restructure lock.
    2. Clear blacklist and recently disconnected restrictions.
    3. If known node addresses exist, use them for discovery; otherwise, perform a fresh discovery.
    4. Release the restructure lock.
    """
    logging.info("Going to reconnect with federation...")
    await self._restructure_process_lock.acquire_async()
    await self.cm.clear_restrictions()
    # If we got some refs, try to reconnect to them
    if len(await self.np.get_nodes_known()) > 0:
        if self._verbose:
            logging.info("Reconnecting | Addrs availables")
        await self.sar.sad.start_late_connection_process(
            connected=False, msg_type="discover_nodes", addrs_known=await self.np.get_nodes_known()
        )
    else:
        if self._verbose:
            logging.info("Reconnecting | NO Addrs availables")
        await self.sar.sad.start_late_connection_process(connected=False, msg_type="discover_nodes")
    await self._restructure_process_lock.release_async()

sa_component_actions() async

Perform periodic situational awareness checks for network conditions.

This method evaluates the external connection service status and analyzes the robustness of the current network topology.

Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
128
129
130
131
132
133
134
135
136
137
async def sa_component_actions(self):
    """
    Perform periodic situational awareness checks for network conditions.

    This method evaluates the external connection service status and analyzes
    the robustness of the current network topology.
    """
    logging.info("SA Network evaluating current scenario")
    await self._check_external_connection_service_status()
    await self._analize_topology_robustness()

stop() async

Stop the SANetwork component by releasing locks and clearing any pending operations.

Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
async def stop(self):
    """
    Stop the SANetwork component by releasing locks and clearing any pending operations.
    """
    logging.info("🛑  Stopping SANetwork...")

    # Cancel all verification tasks
    async with self._verification_tasks_lock:
        if self._verification_tasks:
            tasks_to_cancel = [task for task in self._verification_tasks if not task.done()]
            logging.info(f"🛑  Cancelling {len(tasks_to_cancel)} verification tasks...")
            for task in tasks_to_cancel:
                task.cancel()
                try:
                    await task
                except asyncio.CancelledError:
                    pass
            self._verification_tasks.clear()
            logging.info("🛑  All verification tasks cancelled")

    # Release any held locks
    try:
        if self._restructure_process_lock.locked():
            self._restructure_process_lock.release()
    except Exception as e:
        logging.warning(f"Error releasing restructure_process_lock: {e}")

    logging.info("✅  SANetwork stopped successfully")

stop_connections_with_federation() async

Disconnect from all current federation neighbors after a short delay.

  1. Waits for a predefined sleep period (to allow in‐flight messages to complete).
  2. Blacklists each direct neighbor.
  3. Disconnects from each neighbor without mutual handshake.
Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
async def stop_connections_with_federation(self):
    """
    Disconnect from all current federation neighbors after a short delay.

    1. Waits for a predefined sleep period (to allow in‐flight messages to complete).
    2. Blacklists each direct neighbor.
    3. Disconnects from each neighbor without mutual handshake.
    """
    await asyncio.sleep(10)
    logging.info("### DISCONNECTING FROM FEDERATON ###")
    neighbors = await self.np.get_nodes_known(neighbors_only=True)
    for n in neighbors:
        await self.cm.add_to_blacklist(n)
    for n in neighbors:
        await self.cm.disconnect(n, mutual_disconnection=False, forced=True)

upgrade_connection_robustness(possible_neighbors) async

Attempt to strengthen network robustness by discovering or reconnecting to additional neighbors.

Steps: 1. Acquire the restructure lock. 2. If possible_neighbors is non‐empty, use them for a targeted late‐connection discovery. 3. Otherwise, perform a generic discovery of federation nodes. 4. Release the restructure lock.

Parameters:

Name Type Description Default
possible_neighbors set

Addresses of candidate nodes for connection enhancement.

required
Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
async def upgrade_connection_robustness(self, possible_neighbors):
    """
    Attempt to strengthen network robustness by discovering or reconnecting to additional neighbors.

    Steps:
    1. Acquire the restructure lock.
    2. If possible_neighbors is non‐empty, use them for a targeted late‐connection discovery.
    3. Otherwise, perform a generic discovery of federation nodes.
    4. Release the restructure lock.

    Args:
        possible_neighbors (set): Addresses of candidate nodes for connection enhancement.
    """
    await self._restructure_process_lock.acquire_async()
    # If we got some refs, try to connect to them
    if possible_neighbors and len(possible_neighbors) > 0:
        if self._verbose:
            logging.info(f"Reestructuring | Addrs availables | addr list: {possible_neighbors}")
        await self.sar.sad.start_late_connection_process(
            connected=True, msg_type="discover_nodes", addrs_known=possible_neighbors
        )
    else:
        if self._verbose:
            logging.info("Reestructuring | NO Addrs availables")
        await self.sar.sad.start_late_connection_process(connected=True, msg_type="discover_nodes")
    await self._restructure_process_lock.release_async()

verify_neighbors_stablished(nodes) async

Verify that a set of connection attempts has succeeded within a timeout.

Parameters:

Name Type Description Default
nodes set

The set of node addresses for which connections were attempted.

required

Behavior: 1. Sleeps for NEIGHBOR_VERIFICATION_TIMEOUT seconds. 2. Compares the originally requested nodes against the currently known neighbors. 3. Logs any addresses that failed to establish and instructs the policy to forget them.

Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
async def verify_neighbors_stablished(self, nodes: set):
    """
    Verify that a set of connection attempts has succeeded within a timeout.

    Args:
        nodes (set): The set of node addresses for which connections were attempted.

    Behavior:
    1. Sleeps for NEIGHBOR_VERIFICATION_TIMEOUT seconds.
    2. Compares the originally requested nodes against the currently known neighbors.
    3. Logs any addresses that failed to establish and instructs the policy to forget them.
    """
    if not nodes:
        return

    await asyncio.sleep(self.NEIGHBOR_VERIFICATION_TIMEOUT)
    logging.info("Verifyng all connections were stablished")
    nodes_to_forget = nodes.copy()
    neighbors = await self.np.get_nodes_known(neighbors_only=True)
    if neighbors:
        nodes_to_forget.difference_update(neighbors)
    logging.info(f"Connections dont stablished: {nodes_to_forget}")
    await self.forget_nodes(nodes_to_forget)

SANetworkAgent

Bases: SAModuleAgent

Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
class SANetworkAgent(SAModuleAgent):
    def __init__(self, sanetwork: SANetwork):
        self._san = sanetwork

    async def get_agent(self) -> str:
        return "SANetwork_MainNetworkAgent"

    async def register_sa_agent(self):
        await SuggestionBuffer.get_instance().register_event_agents(RoundEndEvent, self)

    async def suggest_action(self, sac: SACommand):
        await SuggestionBuffer.get_instance().register_suggestion(RoundEndEvent, self, sac)

    async def notify_all_suggestions_done(self, event_type):
        await SuggestionBuffer.get_instance().notify_all_suggestions_done_for_agent(self, event_type)

    async def create_and_suggest_action(
        self, saca: SACommandAction, function: Callable = None, more_suggestions=False, *args
    ):
        """
        Create a situational awareness command based on the specified action and suggest it for arbitration.

        Depending on the SACommandAction provided, this method:
        - Instantiates the appropriate SACommand via the factory.
        - Submits the command to the arbitration process (`suggest_action`).
        - Optionally finalizes suggestion collection (`notify_all_suggestions_done`).
        - In some cases waits for execution.

        Args:
            saca (SACommandAction): The situational awareness action to suggest (e.g., SEARCH_CONNECTIONS, RECONNECT).
            function (Callable, optional): The function to execute if the command is chosen. Defaults to None.
            more_suggestions (bool, optional): If False, marks the end of suggestion gathering. Defaults to False.
            *args: Additional positional arguments passed to the SACommand constructor to be used as function parameters.
        """
        sac = None
        if saca == SACommandAction.MAINTAIN_CONNECTIONS:
            sac = factory_sa_command(
                "connectivity", SACommandAction.MAINTAIN_CONNECTIONS, self, "", SACommandPRIO.MEDIUM, False, function
            )
            await self.suggest_action(sac)
            await self.notify_all_suggestions_done(RoundEndEvent)
        elif saca == SACommandAction.SEARCH_CONNECTIONS:
            sac = factory_sa_command(
                "connectivity",
                SACommandAction.SEARCH_CONNECTIONS,
                self,
                "",
                SACommandPRIO.MEDIUM,
                True,
                function,
                *args,
            )
            await self.suggest_action(sac)
            if not more_suggestions:
                await self.notify_all_suggestions_done(RoundEndEvent)
            sa_command_state = await sac.get_state_future()  # By using 'await' we get future.set_result()
            if sa_command_state == SACommandState.EXECUTED:
                (nodes_to_forget,) = args
                await self._san.create_verification_task(nodes_to_forget)
        elif saca == SACommandAction.RECONNECT:
            sac = factory_sa_command(
                "connectivity", SACommandAction.RECONNECT, self, "", SACommandPRIO.HIGH, True, function
            )
            await self.suggest_action(sac)
            if not more_suggestions:
                await self.notify_all_suggestions_done(RoundEndEvent)
        elif saca == SACommandAction.DISCONNECT:
            nodes = args[0] if isinstance(args[0], set) else set(args)
            for node in nodes:
                sac = factory_sa_command(
                    "connectivity",
                    SACommandAction.DISCONNECT,
                    self,
                    node,
                    SACommandPRIO.HIGH,
                    True,
                    function,
                    node,
                    True,
                )
                # TODO Check executed state to ensure node is removed
                await self.suggest_action(sac)
            if not more_suggestions:
                await self.notify_all_suggestions_done(RoundEndEvent)
        elif saca == SACommandAction.IDLE:
            sac = factory_sa_command(
                "connectivity", SACommandAction.IDLE, self, "", SACommandPRIO.LOW, False, function, None
            )
            await self.suggest_action(sac)
            if not more_suggestions:
                await self.notify_all_suggestions_done(RoundEndEvent)

create_and_suggest_action(saca, function=None, more_suggestions=False, *args) async

Create a situational awareness command based on the specified action and suggest it for arbitration.

Depending on the SACommandAction provided, this method: - Instantiates the appropriate SACommand via the factory. - Submits the command to the arbitration process (suggest_action). - Optionally finalizes suggestion collection (notify_all_suggestions_done). - In some cases waits for execution.

Parameters:

Name Type Description Default
saca SACommandAction

The situational awareness action to suggest (e.g., SEARCH_CONNECTIONS, RECONNECT).

required
function Callable

The function to execute if the command is chosen. Defaults to None.

None
more_suggestions bool

If False, marks the end of suggestion gathering. Defaults to False.

False
*args

Additional positional arguments passed to the SACommand constructor to be used as function parameters.

()
Source code in nebula/core/situationalawareness/awareness/sanetwork/sanetwork.py
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
async def create_and_suggest_action(
    self, saca: SACommandAction, function: Callable = None, more_suggestions=False, *args
):
    """
    Create a situational awareness command based on the specified action and suggest it for arbitration.

    Depending on the SACommandAction provided, this method:
    - Instantiates the appropriate SACommand via the factory.
    - Submits the command to the arbitration process (`suggest_action`).
    - Optionally finalizes suggestion collection (`notify_all_suggestions_done`).
    - In some cases waits for execution.

    Args:
        saca (SACommandAction): The situational awareness action to suggest (e.g., SEARCH_CONNECTIONS, RECONNECT).
        function (Callable, optional): The function to execute if the command is chosen. Defaults to None.
        more_suggestions (bool, optional): If False, marks the end of suggestion gathering. Defaults to False.
        *args: Additional positional arguments passed to the SACommand constructor to be used as function parameters.
    """
    sac = None
    if saca == SACommandAction.MAINTAIN_CONNECTIONS:
        sac = factory_sa_command(
            "connectivity", SACommandAction.MAINTAIN_CONNECTIONS, self, "", SACommandPRIO.MEDIUM, False, function
        )
        await self.suggest_action(sac)
        await self.notify_all_suggestions_done(RoundEndEvent)
    elif saca == SACommandAction.SEARCH_CONNECTIONS:
        sac = factory_sa_command(
            "connectivity",
            SACommandAction.SEARCH_CONNECTIONS,
            self,
            "",
            SACommandPRIO.MEDIUM,
            True,
            function,
            *args,
        )
        await self.suggest_action(sac)
        if not more_suggestions:
            await self.notify_all_suggestions_done(RoundEndEvent)
        sa_command_state = await sac.get_state_future()  # By using 'await' we get future.set_result()
        if sa_command_state == SACommandState.EXECUTED:
            (nodes_to_forget,) = args
            await self._san.create_verification_task(nodes_to_forget)
    elif saca == SACommandAction.RECONNECT:
        sac = factory_sa_command(
            "connectivity", SACommandAction.RECONNECT, self, "", SACommandPRIO.HIGH, True, function
        )
        await self.suggest_action(sac)
        if not more_suggestions:
            await self.notify_all_suggestions_done(RoundEndEvent)
    elif saca == SACommandAction.DISCONNECT:
        nodes = args[0] if isinstance(args[0], set) else set(args)
        for node in nodes:
            sac = factory_sa_command(
                "connectivity",
                SACommandAction.DISCONNECT,
                self,
                node,
                SACommandPRIO.HIGH,
                True,
                function,
                node,
                True,
            )
            # TODO Check executed state to ensure node is removed
            await self.suggest_action(sac)
        if not more_suggestions:
            await self.notify_all_suggestions_done(RoundEndEvent)
    elif saca == SACommandAction.IDLE:
        sac = factory_sa_command(
            "connectivity", SACommandAction.IDLE, self, "", SACommandPRIO.LOW, False, function, None
        )
        await self.suggest_action(sac)
        if not more_suggestions:
            await self.notify_all_suggestions_done(RoundEndEvent)