Skip to content

Documentation for Sacommand Module

AggregationCommand

Bases: SACommand

Commands related to data aggregation.

Source code in nebula/core/situationalawareness/awareness/sautils/sacommand.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
class AggregationCommand(SACommand):
    """Commands related to data aggregation."""

    def __init__(
        self,
        action: SACommandAction,
        owner: "SAModuleAgent",
        target: dict,
        priority: SACommandPRIO = SACommandPRIO.MEDIUM,
        parallelizable=False,
    ):
        super().__init__(SACommandType.CONNECTIVITY, action, owner, target, priority, parallelizable)

    async def execute(self):
        await self._update_command_state(SACommandState.EXECUTED)
        return self._target

    async def conflicts_with(self, other: "AggregationCommand") -> bool:
        """Determines if two commands conflict with each other."""
        topologic_conflict = False
        weight_conflict = False

        if set(self._target.keys()) != set(other._target.keys()):
            topologic_conflict = True

        weight_conflict = any(
            abs(self._target[node][1] - other._target[node][1]) > 0
            for node in self._target.keys()
            if node in other._target.keys()
        )

        return weight_conflict and topologic_conflict

    """                                             ###############################
                                                    #     SA COMMAND FACTORY      #
                                                    ###############################
    """

conflicts_with(other) async

Determines if two commands conflict with each other.

Source code in nebula/core/situationalawareness/awareness/sautils/sacommand.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
async def conflicts_with(self, other: "AggregationCommand") -> bool:
    """Determines if two commands conflict with each other."""
    topologic_conflict = False
    weight_conflict = False

    if set(self._target.keys()) != set(other._target.keys()):
        topologic_conflict = True

    weight_conflict = any(
        abs(self._target[node][1] - other._target[node][1]) > 0
        for node in self._target.keys()
        if node in other._target.keys()
    )

    return weight_conflict and topologic_conflict

ConnectivityCommand

Bases: SACommand

Commands related to connectivity.

Source code in nebula/core/situationalawareness/awareness/sautils/sacommand.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
class ConnectivityCommand(SACommand):
    """Commands related to connectivity."""

    def __init__(
        self,
        action: SACommandAction,
        owner: "SAModuleAgent",
        target: str,
        priority: SACommandPRIO = SACommandPRIO.MEDIUM,
        parallelizable=False,
        action_function=None,
        *args,
    ):
        super().__init__(SACommandType.CONNECTIVITY, action, owner, target, priority, parallelizable)
        self._action_function = action_function
        self._args = args

    async def execute(self):
        """Executes the assigned action function with the given parameters."""
        await self._update_command_state(SACommandState.EXECUTED)
        if self._action_function:
            if asyncio.iscoroutinefunction(self._action_function):
                await self._action_function(*self._args)
            else:
                self._action_function(*self._args)

    async def conflicts_with(self, other: "ConnectivityCommand") -> bool:
        """Determines if two commands conflict with each other."""
        if await self._owner.get_agent() == await other._owner.get_agent():
            return False

        if self._target == other._target:
            conflict_pairs = [
                {SACommandAction.DISCONNECT, SACommandAction.DISCONNECT},
            ]
            return {self._action, other._action} in conflict_pairs
        else:
            conflict_pairs = [
                {SACommandAction.DISCONNECT, SACommandAction.RECONNECT},
                {SACommandAction.DISCONNECT, SACommandAction.MAINTAIN_CONNECTIONS},
                {SACommandAction.DISCONNECT, SACommandAction.SEARCH_CONNECTIONS},
            ]
            return {self._action, other._action} in conflict_pairs

conflicts_with(other) async

Determines if two commands conflict with each other.

Source code in nebula/core/situationalawareness/awareness/sautils/sacommand.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
async def conflicts_with(self, other: "ConnectivityCommand") -> bool:
    """Determines if two commands conflict with each other."""
    if await self._owner.get_agent() == await other._owner.get_agent():
        return False

    if self._target == other._target:
        conflict_pairs = [
            {SACommandAction.DISCONNECT, SACommandAction.DISCONNECT},
        ]
        return {self._action, other._action} in conflict_pairs
    else:
        conflict_pairs = [
            {SACommandAction.DISCONNECT, SACommandAction.RECONNECT},
            {SACommandAction.DISCONNECT, SACommandAction.MAINTAIN_CONNECTIONS},
            {SACommandAction.DISCONNECT, SACommandAction.SEARCH_CONNECTIONS},
        ]
        return {self._action, other._action} in conflict_pairs

execute() async

Executes the assigned action function with the given parameters.

Source code in nebula/core/situationalawareness/awareness/sautils/sacommand.py
215
216
217
218
219
220
221
222
async def execute(self):
    """Executes the assigned action function with the given parameters."""
    await self._update_command_state(SACommandState.EXECUTED)
    if self._action_function:
        if asyncio.iscoroutinefunction(self._action_function):
            await self._action_function(*self._args)
        else:
            self._action_function(*self._args)

SACommand

Base class for Situational Awareness (SA) module commands.

This class defines the core structure and behavior of commands that can be issued by SA agents. Each command has an associated type, action, target, priority, and execution state. Commands may also declare whether they can be executed in parallel. Subclasses must implement the actual logic for execution and conflict detection.

Attributes:

Name Type Description
command_type SACommandType

Type of the command (e.g., parameter update, structural change).

action SACommandAction

Specific action the command performs.

owner SAModuleAgent

Reference to the module or agent that issued the command.

target Any

Target of the command (e.g., node, parameter name).

priority SACommandPRIO

Priority level of the command.

parallelizable bool

Indicates whether the command can be run concurrently.

_state SACommandState

Internal state of the command (e.g., PENDING, DISCARDED).

_state_future Future

Future that resolves when the command changes state.

Source code in nebula/core/situationalawareness/awareness/sautils/sacommand.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class SACommand:
    """
    Base class for Situational Awareness (SA) module commands.

    This class defines the core structure and behavior of commands that can be
    issued by SA agents. Each command has an associated type, action, target,
    priority, and execution state. Commands may also declare whether they can be
    executed in parallel. Subclasses must implement the actual logic for execution
    and conflict detection.

    Attributes:
        command_type (SACommandType): Type of the command (e.g., parameter update, structural change).
        action (SACommandAction): Specific action the command performs.
        owner (SAModuleAgent): Reference to the module or agent that issued the command.
        target (Any): Target of the command (e.g., node, parameter name).
        priority (SACommandPRIO): Priority level of the command.
        parallelizable (bool): Indicates whether the command can be run concurrently.
        _state (SACommandState): Internal state of the command (e.g., PENDING, DISCARDED).
        _state_future (asyncio.Future): Future that resolves when the command changes state.
    """

    def __init__(
        self,
        command_type: SACommandType,
        action: SACommandAction,
        owner: "SAModuleAgent",
        target,
        priority: SACommandPRIO = SACommandPRIO.MEDIUM,
        parallelizable=False,
    ):
        self._command_type = command_type
        self._action = action
        self._owner = owner
        self._target = target  # Could be a node, parameter, etc.
        self._priority = priority
        self._parallelizable = parallelizable
        self._state = SACommandState.PENDING
        self._state_future = asyncio.get_event_loop().create_future()

    @abstractmethod
    async def execute(self):
        """
        Execute the command's action on the specified target.

        This method must be implemented by subclasses to define the actual logic
        of how the command affects the system. It may involve sending messages,
        modifying local or global state, or interacting with external components.
        """
        raise NotImplementedError

    @abstractmethod
    async def conflicts_with(self, other: "SACommand") -> bool:
        """
        Determine whether this command conflicts with another command.

        This method must be implemented by subclasses to define conflict logic,
        e.g., whether two commands target the same resource in incompatible ways.
        Used during arbitration to resolve simultaneous command suggestions.

        Parameters:
            other (SACommand): Another command instance to check for conflicts.

        Returns:
            bool: True if there is a conflict, False otherwise.
        """
        raise NotImplementedError

    async def discard_command(self):
        """
        Mark this command as discarded, updating its internal state accordingly.
        """
        await self._update_command_state(SACommandState.DISCARDED)

    def got_higher_priority_than(self, other_prio: SACommandPRIO):
        """
        Compare this command's priority against another.

        Args:
            other_prio (SACommandPRIO): The priority of the other command.

        Returns:
            bool: True if this command has higher priority, False otherwise.
        """
        return self._priority.value > other_prio.value

    def get_prio(self):
        """
        Retrieve the priority level of this command.

        Returns:
            SACommandPRIO: The command's priority enum.
        """
        return self._priority

    async def get_owner(self):
        """
        Asynchronously obtain the owner agent of this command.

        Returns:
            Agent: The agent instance that owns this command.
        """
        return await self._owner.get_agent()

    def get_action(self) -> SACommandAction:
        """
        Get the action associated with this command.

        Returns:
            SACommandAction: The command's action enum.
        """
        return self._action

    async def _update_command_state(self, sacs: SACommandState):
        """
        Update the command's state and resolve its completion future if pending.

        Args:
            sacs (SACommandState): The new state to assign to the command.
        """
        self._state = sacs
        if not self._state_future.done():
            self._state_future.set_result(sacs)

    def get_state_future(self) -> asyncio.Future:
        """
        Get the Future that will be completed when the command's state changes.

        Returns:
            asyncio.Future: Future that resolves to the command's final state.
        """
        return self._state_future

    def is_parallelizable(self):
        """
        Indicates whether this command can be executed in parallel with others.

        Returns:
            bool: True if parallel execution is allowed, False otherwise.
        """
        return self._parallelizable

    def __repr__(self):
        return (
            f"{self.__class__.__name__}(Type={self._command_type.value}, "
            f"Action={self._action.value}, Target={self._target}, Priority={self._priority.value})"
        )

    """                                             ###############################
                                                    #     SA COMMAND SUBCLASS     #
                                                    ###############################
    """

conflicts_with(other) abstractmethod async

Determine whether this command conflicts with another command.

This method must be implemented by subclasses to define conflict logic, e.g., whether two commands target the same resource in incompatible ways. Used during arbitration to resolve simultaneous command suggestions.

Parameters:

Name Type Description Default
other SACommand

Another command instance to check for conflicts.

required

Returns:

Name Type Description
bool bool

True if there is a conflict, False otherwise.

Source code in nebula/core/situationalawareness/awareness/sautils/sacommand.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
@abstractmethod
async def conflicts_with(self, other: "SACommand") -> bool:
    """
    Determine whether this command conflicts with another command.

    This method must be implemented by subclasses to define conflict logic,
    e.g., whether two commands target the same resource in incompatible ways.
    Used during arbitration to resolve simultaneous command suggestions.

    Parameters:
        other (SACommand): Another command instance to check for conflicts.

    Returns:
        bool: True if there is a conflict, False otherwise.
    """
    raise NotImplementedError

discard_command() async

Mark this command as discarded, updating its internal state accordingly.

Source code in nebula/core/situationalawareness/awareness/sautils/sacommand.py
112
113
114
115
116
async def discard_command(self):
    """
    Mark this command as discarded, updating its internal state accordingly.
    """
    await self._update_command_state(SACommandState.DISCARDED)

execute() abstractmethod async

Execute the command's action on the specified target.

This method must be implemented by subclasses to define the actual logic of how the command affects the system. It may involve sending messages, modifying local or global state, or interacting with external components.

Source code in nebula/core/situationalawareness/awareness/sautils/sacommand.py
84
85
86
87
88
89
90
91
92
93
@abstractmethod
async def execute(self):
    """
    Execute the command's action on the specified target.

    This method must be implemented by subclasses to define the actual logic
    of how the command affects the system. It may involve sending messages,
    modifying local or global state, or interacting with external components.
    """
    raise NotImplementedError

get_action()

Get the action associated with this command.

Returns:

Name Type Description
SACommandAction SACommandAction

The command's action enum.

Source code in nebula/core/situationalawareness/awareness/sautils/sacommand.py
148
149
150
151
152
153
154
155
def get_action(self) -> SACommandAction:
    """
    Get the action associated with this command.

    Returns:
        SACommandAction: The command's action enum.
    """
    return self._action

get_owner() async

Asynchronously obtain the owner agent of this command.

Returns:

Name Type Description
Agent

The agent instance that owns this command.

Source code in nebula/core/situationalawareness/awareness/sautils/sacommand.py
139
140
141
142
143
144
145
146
async def get_owner(self):
    """
    Asynchronously obtain the owner agent of this command.

    Returns:
        Agent: The agent instance that owns this command.
    """
    return await self._owner.get_agent()

get_prio()

Retrieve the priority level of this command.

Returns:

Name Type Description
SACommandPRIO

The command's priority enum.

Source code in nebula/core/situationalawareness/awareness/sautils/sacommand.py
130
131
132
133
134
135
136
137
def get_prio(self):
    """
    Retrieve the priority level of this command.

    Returns:
        SACommandPRIO: The command's priority enum.
    """
    return self._priority

get_state_future()

Get the Future that will be completed when the command's state changes.

Returns:

Type Description
Future

asyncio.Future: Future that resolves to the command's final state.

Source code in nebula/core/situationalawareness/awareness/sautils/sacommand.py
168
169
170
171
172
173
174
175
def get_state_future(self) -> asyncio.Future:
    """
    Get the Future that will be completed when the command's state changes.

    Returns:
        asyncio.Future: Future that resolves to the command's final state.
    """
    return self._state_future

got_higher_priority_than(other_prio)

Compare this command's priority against another.

Parameters:

Name Type Description Default
other_prio SACommandPRIO

The priority of the other command.

required

Returns:

Name Type Description
bool

True if this command has higher priority, False otherwise.

Source code in nebula/core/situationalawareness/awareness/sautils/sacommand.py
118
119
120
121
122
123
124
125
126
127
128
def got_higher_priority_than(self, other_prio: SACommandPRIO):
    """
    Compare this command's priority against another.

    Args:
        other_prio (SACommandPRIO): The priority of the other command.

    Returns:
        bool: True if this command has higher priority, False otherwise.
    """
    return self._priority.value > other_prio.value

is_parallelizable()

Indicates whether this command can be executed in parallel with others.

Returns:

Name Type Description
bool

True if parallel execution is allowed, False otherwise.

Source code in nebula/core/situationalawareness/awareness/sautils/sacommand.py
177
178
179
180
181
182
183
184
def is_parallelizable(self):
    """
    Indicates whether this command can be executed in parallel with others.

    Returns:
        bool: True if parallel execution is allowed, False otherwise.
    """
    return self._parallelizable

SACommandState

Bases: Enum

Source code in nebula/core/situationalawareness/awareness/sautils/sacommand.py
34
35
36
37
38
39
40
41
42
class SACommandState(Enum):
    PENDING = "pending"
    DISCARDED = "discarded"
    EXECUTED = "executed"

    """                                             ###############################
                                                    #      SA COMMAND CLASS       #
                                                    ###############################
    """

EXECUTED = 'executed' class-attribute instance-attribute

SA COMMAND CLASS

factory_sa_command(sacommand_type, *config)

Factory function to create situational awareness command instances.

Parameters:

Name Type Description Default
sacommand_type str

Identifier of the command type (e.g., "connectivity", "aggregation").

required
*config

Positional arguments to initialize the specific command class.

()

Returns:

Name Type Description
SACommand SACommand

An instance of the requested situational awareness command.

Raises:

Type Description
ValueError

If the provided command type is not recognized.

Source code in nebula/core/situationalawareness/awareness/sautils/sacommand.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
def factory_sa_command(sacommand_type, *config) -> SACommand:
    """
    Factory function to create situational awareness command instances.

    Args:
        sacommand_type (str): Identifier of the command type (e.g., "connectivity", "aggregation").
        *config: Positional arguments to initialize the specific command class.

    Returns:
        SACommand: An instance of the requested situational awareness command.

    Raises:
        ValueError: If the provided command type is not recognized.
    """
    options = {
        "connectivity": ConnectivityCommand,
        "aggregation": AggregationCommand,
    }

    cs = options.get(sacommand_type)
    if cs is None:
        raise ValueError(f"Unknown SACommand type: {sacommand_type}")
    return cs(*config)