Skip to content

Documentation for Situationalawareness Module

ISADiscovery

Bases: ABC

Interface for Situational Awareness discovery components.

Defines methods for initializing discovery, handling late connection processes, and retrieving training-related information.

Source code in nebula/core/situationalawareness/situationalawareness.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class ISADiscovery(ABC):
    """
    Interface for Situational Awareness discovery components.

    Defines methods for initializing discovery, handling late connection processes,
    and retrieving training-related information.
    """

    @abstractmethod
    async def init(self, sa_reasoner):
        """
        Initialize the discovery component with a corresponding reasoner.

        Args:
            sa_reasoner (ISAReasoner): The reasoner instance to coordinate with.
        """
        raise NotImplementedError

    @abstractmethod
    async def start_late_connection_process(self, connected=False, msg_type="discover_join", addrs_known=None):
        """
        Begin the late-connection discovery process for situational awareness.

        Args:
            connected (bool, optional): Whether the node is already connected. Defaults to False.
            msg_type (str, optional): Type of discovery message to send. Defaults to "discover_join".
            addrs_known (list, optional): Known addresses to use instead of active discovery.
        """
        raise NotImplementedError

    @abstractmethod
    async def get_trainning_info(self):
        """
        Retrieve information necessary for training initialization.

        Returns:
            Any: Training information produced by the discovery component.
        """
        raise NotImplementedError

get_trainning_info() abstractmethod async

Retrieve information necessary for training initialization.

Returns:

Name Type Description
Any

Training information produced by the discovery component.

Source code in nebula/core/situationalawareness/situationalawareness.py
37
38
39
40
41
42
43
44
45
@abstractmethod
async def get_trainning_info(self):
    """
    Retrieve information necessary for training initialization.

    Returns:
        Any: Training information produced by the discovery component.
    """
    raise NotImplementedError

init(sa_reasoner) abstractmethod async

Initialize the discovery component with a corresponding reasoner.

Parameters:

Name Type Description Default
sa_reasoner ISAReasoner

The reasoner instance to coordinate with.

required
Source code in nebula/core/situationalawareness/situationalawareness.py
15
16
17
18
19
20
21
22
23
@abstractmethod
async def init(self, sa_reasoner):
    """
    Initialize the discovery component with a corresponding reasoner.

    Args:
        sa_reasoner (ISAReasoner): The reasoner instance to coordinate with.
    """
    raise NotImplementedError

start_late_connection_process(connected=False, msg_type='discover_join', addrs_known=None) abstractmethod async

Begin the late-connection discovery process for situational awareness.

Parameters:

Name Type Description Default
connected bool

Whether the node is already connected. Defaults to False.

False
msg_type str

Type of discovery message to send. Defaults to "discover_join".

'discover_join'
addrs_known list

Known addresses to use instead of active discovery.

None
Source code in nebula/core/situationalawareness/situationalawareness.py
25
26
27
28
29
30
31
32
33
34
35
@abstractmethod
async def start_late_connection_process(self, connected=False, msg_type="discover_join", addrs_known=None):
    """
    Begin the late-connection discovery process for situational awareness.

    Args:
        connected (bool, optional): Whether the node is already connected. Defaults to False.
        msg_type (str, optional): Type of discovery message to send. Defaults to "discover_join".
        addrs_known (list, optional): Known addresses to use instead of active discovery.
    """
    raise NotImplementedError

ISAReasoner

Bases: ABC

Interface for Situational Awareness reasoning components.

Defines methods for initializing the reasoner, accepting or rejecting connections, and querying known nodes and available actions.

Source code in nebula/core/situationalawareness/situationalawareness.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class ISAReasoner(ABC):
    """
    Interface for Situational Awareness reasoning components.

    Defines methods for initializing the reasoner, accepting or rejecting connections,
    and querying known nodes and available actions.
    """

    @abstractmethod
    async def init(self, sa_discovery):
        """
        Initialize the reasoner with a corresponding discovery component.

        Args:
            sa_discovery (ISADiscovery): The discovery instance to coordinate with.
        """
        raise NotImplementedError

    @abstractmethod
    async def accept_connection(self, source, joining=False):
        """
        Decide whether to accept a connection from a given source node.

        Args:
            source (str): The address or identifier of the requesting node.
            joining (bool, optional): Whether the connection is part of a join process. Defaults to False.
        """
        raise NotImplementedError

    @abstractmethod
    def get_nodes_known(self, neighbors_too=False, neighbors_only=False):
        """
        Get the set of nodes known to the reasoner.

        Args:
            neighbors_too (bool, optional): Include neighbors in the result. Defaults to False.
            neighbors_only (bool, optional): Return only neighbors. Defaults to False.

        Returns:
            set: Identifiers of known nodes based on the provided filters.
        """
        raise NotImplementedError

    @abstractmethod
    def get_actions(self):
        """
        Get the list of situational awareness actions the reasoner can perform in
        response to late connections process.

        Returns:
            list: Available action identifiers.
        """
        raise NotImplementedError

accept_connection(source, joining=False) abstractmethod async

Decide whether to accept a connection from a given source node.

Parameters:

Name Type Description Default
source str

The address or identifier of the requesting node.

required
joining bool

Whether the connection is part of a join process. Defaults to False.

False
Source code in nebula/core/situationalawareness/situationalawareness.py
66
67
68
69
70
71
72
73
74
75
@abstractmethod
async def accept_connection(self, source, joining=False):
    """
    Decide whether to accept a connection from a given source node.

    Args:
        source (str): The address or identifier of the requesting node.
        joining (bool, optional): Whether the connection is part of a join process. Defaults to False.
    """
    raise NotImplementedError

get_actions() abstractmethod

Get the list of situational awareness actions the reasoner can perform in response to late connections process.

Returns:

Name Type Description
list

Available action identifiers.

Source code in nebula/core/situationalawareness/situationalawareness.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@abstractmethod
def get_actions(self):
    """
    Get the list of situational awareness actions the reasoner can perform in
    response to late connections process.

    Returns:
        list: Available action identifiers.
    """
    raise NotImplementedError

get_nodes_known(neighbors_too=False, neighbors_only=False) abstractmethod

Get the set of nodes known to the reasoner.

Parameters:

Name Type Description Default
neighbors_too bool

Include neighbors in the result. Defaults to False.

False
neighbors_only bool

Return only neighbors. Defaults to False.

False

Returns:

Name Type Description
set

Identifiers of known nodes based on the provided filters.

Source code in nebula/core/situationalawareness/situationalawareness.py
77
78
79
80
81
82
83
84
85
86
87
88
89
@abstractmethod
def get_nodes_known(self, neighbors_too=False, neighbors_only=False):
    """
    Get the set of nodes known to the reasoner.

    Args:
        neighbors_too (bool, optional): Include neighbors in the result. Defaults to False.
        neighbors_only (bool, optional): Return only neighbors. Defaults to False.

    Returns:
        set: Identifiers of known nodes based on the provided filters.
    """
    raise NotImplementedError

init(sa_discovery) abstractmethod async

Initialize the reasoner with a corresponding discovery component.

Parameters:

Name Type Description Default
sa_discovery ISADiscovery

The discovery instance to coordinate with.

required
Source code in nebula/core/situationalawareness/situationalawareness.py
56
57
58
59
60
61
62
63
64
@abstractmethod
async def init(self, sa_discovery):
    """
    Initialize the reasoner with a corresponding discovery component.

    Args:
        sa_discovery (ISADiscovery): The discovery instance to coordinate with.
    """
    raise NotImplementedError

SituationalAwareness

High-level coordinator for Situational Awareness in the DFL federation.

Manages discovery and reasoning components, wiring them together and exposing simple methods for initialization and late-connection handling.

Source code in nebula/core/situationalawareness/situationalawareness.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
class SituationalAwareness:
    """
    High-level coordinator for Situational Awareness in the DFL federation.

    Manages discovery and reasoning components, wiring them together
    and exposing simple methods for initialization and late-connection handling.
    """

    def __init__(self, config, engine):
        """
        Initialize Situational Awareness module by creating discovery and reasoner instances.

        Args:
            config (Config): Configuration containing situational awareness settings.
            engine (Engine): The core engine of the federation for coordination.
        """
        print_msg_box(
            msg="Starting Situational Awareness module...",
            indent=2,
            title="Situational Awareness module",
        )
        self._config = config
        selector = self._config.participant["situational_awareness"]["sa_discovery"]["candidate_selector"]
        selector = selector.lower()
        model_handler = config.participant["situational_awareness"]["sa_discovery"]["model_handler"]
        self._sad = factory_sa_discovery(
            "nebula",
            self._config.participant["mobility_args"]["additional_node"]["status"],
            selector,
            model_handler,
            engine=engine,
            verbose=config.participant["situational_awareness"]["sa_discovery"]["verbose"],
        )
        self._sareasoner = factory_sa_reasoner(
            "nebula",
            self._config,
        )

    @property
    def sad(self):
        """
        Access the Situational Awareness discovery component.

        Returns:
            ISADiscovery: The discovery instance.
        """
        return self._sad

    @property
    def sar(self):
        """
        Access the Situational Awareness reasoner component.

        Returns:
            ISAReasoner: The reasoner instance.
        """
        return self._sareasoner

    async def init(self):
        """
        Initialize both discovery and reasoner components, linking them together.
        """
        await self.sad.init(self.sar)
        await self.sar.init(self.sad)

    async def start_late_connection_process(self):
        """
        Start the late-connection process via the discovery component.
        """
        await self.sad.start_late_connection_process()

    async def get_trainning_info(self):
        """
        Retrieve training information from the discovery component.

        Returns:
            Any: Information relevant to training decisions.
        """
        return await self.sad.get_trainning_info()

    async def stop(self):
        """
        Stop both discovery and reasoner components if they implement a stop method.
        """
        sad_stop = getattr(self.sad, "stop", None)
        if callable(sad_stop):
            if asyncio.iscoroutinefunction(sad_stop):
                await sad_stop()
            else:
                sad_stop()
        sar_stop = getattr(self.sar, "stop", None)
        if callable(sar_stop):
            if asyncio.iscoroutinefunction(sar_stop):
                await sar_stop()
            else:
                sar_stop()

sad property

Access the Situational Awareness discovery component.

Returns:

Name Type Description
ISADiscovery

The discovery instance.

sar property

Access the Situational Awareness reasoner component.

Returns:

Name Type Description
ISAReasoner

The reasoner instance.

__init__(config, engine)

Initialize Situational Awareness module by creating discovery and reasoner instances.

Parameters:

Name Type Description Default
config Config

Configuration containing situational awareness settings.

required
engine Engine

The core engine of the federation for coordination.

required
Source code in nebula/core/situationalawareness/situationalawareness.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def __init__(self, config, engine):
    """
    Initialize Situational Awareness module by creating discovery and reasoner instances.

    Args:
        config (Config): Configuration containing situational awareness settings.
        engine (Engine): The core engine of the federation for coordination.
    """
    print_msg_box(
        msg="Starting Situational Awareness module...",
        indent=2,
        title="Situational Awareness module",
    )
    self._config = config
    selector = self._config.participant["situational_awareness"]["sa_discovery"]["candidate_selector"]
    selector = selector.lower()
    model_handler = config.participant["situational_awareness"]["sa_discovery"]["model_handler"]
    self._sad = factory_sa_discovery(
        "nebula",
        self._config.participant["mobility_args"]["additional_node"]["status"],
        selector,
        model_handler,
        engine=engine,
        verbose=config.participant["situational_awareness"]["sa_discovery"]["verbose"],
    )
    self._sareasoner = factory_sa_reasoner(
        "nebula",
        self._config,
    )

get_trainning_info() async

Retrieve training information from the discovery component.

Returns:

Name Type Description
Any

Information relevant to training decisions.

Source code in nebula/core/situationalawareness/situationalawareness.py
230
231
232
233
234
235
236
237
async def get_trainning_info(self):
    """
    Retrieve training information from the discovery component.

    Returns:
        Any: Information relevant to training decisions.
    """
    return await self.sad.get_trainning_info()

init() async

Initialize both discovery and reasoner components, linking them together.

Source code in nebula/core/situationalawareness/situationalawareness.py
217
218
219
220
221
222
async def init(self):
    """
    Initialize both discovery and reasoner components, linking them together.
    """
    await self.sad.init(self.sar)
    await self.sar.init(self.sad)

start_late_connection_process() async

Start the late-connection process via the discovery component.

Source code in nebula/core/situationalawareness/situationalawareness.py
224
225
226
227
228
async def start_late_connection_process(self):
    """
    Start the late-connection process via the discovery component.
    """
    await self.sad.start_late_connection_process()

stop() async

Stop both discovery and reasoner components if they implement a stop method.

Source code in nebula/core/situationalawareness/situationalawareness.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
async def stop(self):
    """
    Stop both discovery and reasoner components if they implement a stop method.
    """
    sad_stop = getattr(self.sad, "stop", None)
    if callable(sad_stop):
        if asyncio.iscoroutinefunction(sad_stop):
            await sad_stop()
        else:
            sad_stop()
    sar_stop = getattr(self.sar, "stop", None)
    if callable(sar_stop):
        if asyncio.iscoroutinefunction(sar_stop):
            await sar_stop()
        else:
            sar_stop()

factory_sa_discovery(sa_discovery, additional, selector, model_handler, engine, verbose)

Factory function to create an ISADiscovery implementation.

Parameters:

Name Type Description Default
sa_discovery str

Identifier of the discovery backend (e.g., "nebula").

required
additional bool

Additional status of the node.

required
selector str

Candidate selector strategy name.

required
model_handler str

Model handler strategy name.

required
engine Engine

Reference to the engine.

required
verbose bool

Enable verbose logging or output.

required

Returns:

Name Type Description
ISADiscovery ISADiscovery

An instance of the requested discovery implementation.

Raises:

Type Description
Exception

If the specified discovery service identifier is not found.

Source code in nebula/core/situationalawareness/situationalawareness.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def factory_sa_discovery(sa_discovery, additional, selector, model_handler, engine, verbose) -> ISADiscovery:
    """
    Factory function to create an ISADiscovery implementation.

    Args:
        sa_discovery (str): Identifier of the discovery backend (e.g., "nebula").
        additional (bool): Additional status of the node.
        selector (str): Candidate selector strategy name.
        model_handler (str): Model handler strategy name.
        engine (Engine): Reference to the engine.
        verbose (bool): Enable verbose logging or output.

    Returns:
        ISADiscovery: An instance of the requested discovery implementation.

    Raises:
        Exception: If the specified discovery service identifier is not found.
    """
    from nebula.core.situationalawareness.discovery.federationconnector import FederationConnector

    DISCOVERY = {
        "nebula": FederationConnector,
    }
    sad = DISCOVERY.get(sa_discovery)
    if sad:
        return sad(additional, selector, model_handler, engine, verbose)
    else:
        raise Exception(f"SA Discovery service {sa_discovery} not found.")

factory_sa_reasoner(sa_reasoner, config)

Factory function to create an ISAReasoner implementation.

Parameters:

Name Type Description Default
sa_reasoner str

Identifier of the reasoner backend (e.g., "nebula").

required
config Config

The configuration object for initializing the reasoner.

required

Returns:

Name Type Description
ISAReasoner ISAReasoner

An instance of the requested reasoner implementation.

Raises:

Type Description
Exception

If the specified reasoner service identifier is not found.

Source code in nebula/core/situationalawareness/situationalawareness.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def factory_sa_reasoner(sa_reasoner, config) -> ISAReasoner:
    """
    Factory function to create an ISAReasoner implementation.

    Args:
        sa_reasoner (str): Identifier of the reasoner backend (e.g., "nebula").
        config (Config): The configuration object for initializing the reasoner.

    Returns:
        ISAReasoner: An instance of the requested reasoner implementation.

    Raises:
        Exception: If the specified reasoner service identifier is not found.
    """
    from nebula.core.situationalawareness.awareness.sareasoner import SAReasoner

    REASONER = {
        "nebula": SAReasoner,
    }
    sar = REASONER.get(sa_reasoner)
    if sar:
        return sar(config)
    else:
        raise Exception(f"SA Reasoner service {sa_reasoner} not found.")