Skip to content

Documentation for Noderole Module

Role

Bases: Enum

This class defines the participant roles of the platform.

Source code in nebula/core/noderole.py
24
25
26
27
28
29
30
31
32
33
34
class Role(Enum):
    """
    This class defines the participant roles of the platform.
    """
    TRAINER = "trainer"
    AGGREGATOR = "aggregator"
    TRAINER_AGGREGATOR = "trainer_aggregator"
    PROXY = "proxy"
    IDLE = "idle"
    SERVER = "server"
    MALICIOUS = "malicious"

RoleBehavior

Bases: ABC

Abstract base class for defining the role-specific behavior of a node in CFL, DFL, or SDFL systems.

Each subclass encapsulates the logic needed for a particular node role (e.g., trainer, aggregator), providing custom implementations for role-related operations such as training cycles, update aggregation, and recovery strategies.

Attributes:

Name Type Description
_next_role Role

The role to which the node is expected to transition.

_next_role_locker Locker

An asynchronous lock to protect access to _next_role.

_source_to_notificate Optional[Any]

The source node to notify once a role change is applied.

Source code in nebula/core/noderole.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
class RoleBehavior(ABC):
    """
    Abstract base class for defining the role-specific behavior of a node in CFL, DFL, or SDFL systems.

    Each subclass encapsulates the logic needed for a particular node role (e.g., trainer, aggregator),
    providing custom implementations for role-related operations such as training cycles,
    update aggregation, and recovery strategies.

    Attributes:
        _next_role (Role): The role to which the node is expected to transition.
        _next_role_locker (Locker): An asynchronous lock to protect access to _next_role.
        _source_to_notificate (Optional[Any]): The source node to notify once a role change is applied.
    """
    def __init__(self):
        self._next_role: Role = None
        self._next_role_locker = Locker("next_role_locker", async_lock=True)
        self._source_to_notificate = None

    @abstractmethod
    def get_role(self):
        """
        Returns the Role enum value representing the current role of the node.
        """
        raise NotImplementedError

    @abstractmethod
    def get_role_name(self, effective=False):
        """
        Returns a string representation of the current role.

        Args:
            effective (bool): Whether to return the name of the current effective role when going as malicious.

        Returns:
            str: Name of the role.
        """
        raise NotImplementedError

    @abstractmethod
    async def extended_learning_cycle(self):
        """
        Performs the main learning or aggregation cycle associated with the current role.

        This method encapsulates all the logic tied to the behavior of the node in its current role,
        including training, aggregating updates, and coordinating with neighbors.
        """
        raise NotImplementedError

    @abstractmethod
    async def select_nodes_to_wait(self):
        """
        Determines which neighbors the node should wait for during the current cycle.

        This logic varies depending on whether the node is an aggregator, trainer, or other role.

        Returns:
            Set[Any]: A set of neighbor node identifiers to wait for.
        """
        raise NotImplementedError

    @abstractmethod
    async def resolve_missing_updates(self):
        """
        Defines the fallback strategy when expected model updates are not received.

        For example, an aggregator might default to a fresh model, while a trainer might proceed
        with its own local model.

        Returns:
            Any: The resolution outcome depending on the role's specific logic.
        """
        raise NotImplementedError

    async def set_next_role(self, role: Role, source_to_notificate = None):
        """
        Schedules a role change and optionally stores the source to notify upon completion.

        Args:
            role (Role): The new role to transition to.
            source_to_notificate (Optional[Any]): Identifier of the node that triggered the change.
        """
        async with self._next_role_locker:
            self._next_role = role
            self._source_to_notificate = source_to_notificate

    async def get_next_role(self) -> Role:
        """
        Retrieves and clears the next role value.

        Returns:
            Role: The next role to transition into.
        """
        async with self._next_role_locker:
            next_role = self._next_role
            self._next_role = None
        return next_role

    async def get_source_to_notificate(self):
        """
        Retrieves and clears the stored source to notify after a role change.

        Returns:
            Any: The source node identifier, or None if not set.
        """
        async with self._next_role_locker:
            source_to_notificate = self._source_to_notificate
            self._source_to_notificate = None
        return source_to_notificate

    async def update_role_needed(self):
        """
        Checks whether a role update is scheduled.

        Returns:
            bool: True if a role update is pending, False otherwise.
        """
        async with self._next_role_locker:
            updt_needed = self._next_role != None
        return updt_needed

extended_learning_cycle() abstractmethod async

Performs the main learning or aggregation cycle associated with the current role.

This method encapsulates all the logic tied to the behavior of the node in its current role, including training, aggregating updates, and coordinating with neighbors.

Source code in nebula/core/noderole.py
 92
 93
 94
 95
 96
 97
 98
 99
100
@abstractmethod
async def extended_learning_cycle(self):
    """
    Performs the main learning or aggregation cycle associated with the current role.

    This method encapsulates all the logic tied to the behavior of the node in its current role,
    including training, aggregating updates, and coordinating with neighbors.
    """
    raise NotImplementedError

get_next_role() async

Retrieves and clears the next role value.

Returns:

Name Type Description
Role Role

The next role to transition into.

Source code in nebula/core/noderole.py
139
140
141
142
143
144
145
146
147
148
149
async def get_next_role(self) -> Role:
    """
    Retrieves and clears the next role value.

    Returns:
        Role: The next role to transition into.
    """
    async with self._next_role_locker:
        next_role = self._next_role
        self._next_role = None
    return next_role

get_role() abstractmethod

Returns the Role enum value representing the current role of the node.

Source code in nebula/core/noderole.py
72
73
74
75
76
77
@abstractmethod
def get_role(self):
    """
    Returns the Role enum value representing the current role of the node.
    """
    raise NotImplementedError

get_role_name(effective=False) abstractmethod

Returns a string representation of the current role.

Parameters:

Name Type Description Default
effective bool

Whether to return the name of the current effective role when going as malicious.

False

Returns:

Name Type Description
str

Name of the role.

Source code in nebula/core/noderole.py
79
80
81
82
83
84
85
86
87
88
89
90
@abstractmethod
def get_role_name(self, effective=False):
    """
    Returns a string representation of the current role.

    Args:
        effective (bool): Whether to return the name of the current effective role when going as malicious.

    Returns:
        str: Name of the role.
    """
    raise NotImplementedError

get_source_to_notificate() async

Retrieves and clears the stored source to notify after a role change.

Returns:

Name Type Description
Any

The source node identifier, or None if not set.

Source code in nebula/core/noderole.py
151
152
153
154
155
156
157
158
159
160
161
async def get_source_to_notificate(self):
    """
    Retrieves and clears the stored source to notify after a role change.

    Returns:
        Any: The source node identifier, or None if not set.
    """
    async with self._next_role_locker:
        source_to_notificate = self._source_to_notificate
        self._source_to_notificate = None
    return source_to_notificate

resolve_missing_updates() abstractmethod async

Defines the fallback strategy when expected model updates are not received.

For example, an aggregator might default to a fresh model, while a trainer might proceed with its own local model.

Returns:

Name Type Description
Any

The resolution outcome depending on the role's specific logic.

Source code in nebula/core/noderole.py
114
115
116
117
118
119
120
121
122
123
124
125
@abstractmethod
async def resolve_missing_updates(self):
    """
    Defines the fallback strategy when expected model updates are not received.

    For example, an aggregator might default to a fresh model, while a trainer might proceed
    with its own local model.

    Returns:
        Any: The resolution outcome depending on the role's specific logic.
    """
    raise NotImplementedError

select_nodes_to_wait() abstractmethod async

Determines which neighbors the node should wait for during the current cycle.

This logic varies depending on whether the node is an aggregator, trainer, or other role.

Returns:

Type Description

Set[Any]: A set of neighbor node identifiers to wait for.

Source code in nebula/core/noderole.py
102
103
104
105
106
107
108
109
110
111
112
@abstractmethod
async def select_nodes_to_wait(self):
    """
    Determines which neighbors the node should wait for during the current cycle.

    This logic varies depending on whether the node is an aggregator, trainer, or other role.

    Returns:
        Set[Any]: A set of neighbor node identifiers to wait for.
    """
    raise NotImplementedError

set_next_role(role, source_to_notificate=None) async

Schedules a role change and optionally stores the source to notify upon completion.

Parameters:

Name Type Description Default
role Role

The new role to transition to.

required
source_to_notificate Optional[Any]

Identifier of the node that triggered the change.

None
Source code in nebula/core/noderole.py
127
128
129
130
131
132
133
134
135
136
137
async def set_next_role(self, role: Role, source_to_notificate = None):
    """
    Schedules a role change and optionally stores the source to notify upon completion.

    Args:
        role (Role): The new role to transition to.
        source_to_notificate (Optional[Any]): Identifier of the node that triggered the change.
    """
    async with self._next_role_locker:
        self._next_role = role
        self._source_to_notificate = source_to_notificate

update_role_needed() async

Checks whether a role update is scheduled.

Returns:

Name Type Description
bool

True if a role update is pending, False otherwise.

Source code in nebula/core/noderole.py
163
164
165
166
167
168
169
170
171
172
async def update_role_needed(self):
    """
    Checks whether a role update is scheduled.

    Returns:
        bool: True if a role update is pending, False otherwise.
    """
    async with self._next_role_locker:
        updt_needed = self._next_role != None
    return updt_needed