Skip to content

Documentation for Nebulaevents Module

AggregationEvent

Bases: NodeEvent

Source code in nebula/core/nebulaevents.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class AggregationEvent(NodeEvent):
    def __init__(self, updates: dict, expected_nodes: set, missing_nodes: set):
        """Event triggered when model aggregation is ready.

        Args:
            updates (dict): Dictionary containing model updates.
            expected_nodes (set): Set of nodes expected to participate in aggregation.
            missing_nodes (set): Set of nodes that did not send their update.
        """
        self._updates = updates
        self._expected_nodes = expected_nodes
        self._missing_nodes = missing_nodes

    def __str__(self):
        return "Aggregation Ready"

    def update_updates(self, new_updates: dict):
        """Allows an external module to update the updates dictionary."""
        self._updates = new_updates

    async def get_event_data(self) -> tuple[dict, set, set]:
        """Retrieves the aggregation event data.

        Returns:
            tuple[dict, set, set]:
                - updates (dict): Model updates.
                - expected_nodes (set): Expected nodes.
                - missing_nodes (set): Missing nodes.
        """
        return (self._updates, self._expected_nodes, self._missing_nodes)

    async def is_concurrent(self) -> bool:
        return False

__init__(updates, expected_nodes, missing_nodes)

Event triggered when model aggregation is ready.

Parameters:

Name Type Description Default
updates dict

Dictionary containing model updates.

required
expected_nodes set

Set of nodes expected to participate in aggregation.

required
missing_nodes set

Set of nodes that did not send their update.

required
Source code in nebula/core/nebulaevents.py
106
107
108
109
110
111
112
113
114
115
116
def __init__(self, updates: dict, expected_nodes: set, missing_nodes: set):
    """Event triggered when model aggregation is ready.

    Args:
        updates (dict): Dictionary containing model updates.
        expected_nodes (set): Set of nodes expected to participate in aggregation.
        missing_nodes (set): Set of nodes that did not send their update.
    """
    self._updates = updates
    self._expected_nodes = expected_nodes
    self._missing_nodes = missing_nodes

get_event_data() async

Retrieves the aggregation event data.

Returns:

Type Description
tuple[dict, set, set]

tuple[dict, set, set]: - updates (dict): Model updates. - expected_nodes (set): Expected nodes. - missing_nodes (set): Missing nodes.

Source code in nebula/core/nebulaevents.py
125
126
127
128
129
130
131
132
133
134
async def get_event_data(self) -> tuple[dict, set, set]:
    """Retrieves the aggregation event data.

    Returns:
        tuple[dict, set, set]:
            - updates (dict): Model updates.
            - expected_nodes (set): Expected nodes.
            - missing_nodes (set): Missing nodes.
    """
    return (self._updates, self._expected_nodes, self._missing_nodes)

update_updates(new_updates)

Allows an external module to update the updates dictionary.

Source code in nebula/core/nebulaevents.py
121
122
123
def update_updates(self, new_updates: dict):
    """Allows an external module to update the updates dictionary."""
    self._updates = new_updates

BeaconRecievedEvent

Bases: NodeEvent

Source code in nebula/core/nebulaevents.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
class BeaconRecievedEvent(NodeEvent):
    def __init__(self, source, geoloc):
        """
        Initializes an BeaconRecievedEvent.

        Args:
            source (str): The received beacon source.
            geoloc (tuple): The geolocalzition associated with the received beacon source.
        """
        self._source = source
        self._geoloc = geoloc

    def __str__(self):
        return "Beacon recieved"

    async def get_event_data(self) -> tuple[str, tuple[float, float]]:
        """
        Retrieves the event data.

        Returns:
            tuple[str, tuple[float, float]]: A tuple containing:
                - The beacon's source.
                - the device geolocalization (latitude, longitude).
        """
        return (self._source, self._geoloc)

    async def is_concurrent(self) -> bool:
        return True

__init__(source, geoloc)

Initializes an BeaconRecievedEvent.

Parameters:

Name Type Description Default
source str

The received beacon source.

required
geoloc tuple

The geolocalzition associated with the received beacon source.

required
Source code in nebula/core/nebulaevents.py
253
254
255
256
257
258
259
260
261
262
def __init__(self, source, geoloc):
    """
    Initializes an BeaconRecievedEvent.

    Args:
        source (str): The received beacon source.
        geoloc (tuple): The geolocalzition associated with the received beacon source.
    """
    self._source = source
    self._geoloc = geoloc

get_event_data() async

Retrieves the event data.

Returns:

Type Description
tuple[str, tuple[float, float]]

tuple[str, tuple[float, float]]: A tuple containing: - The beacon's source. - the device geolocalization (latitude, longitude).

Source code in nebula/core/nebulaevents.py
267
268
269
270
271
272
273
274
275
276
async def get_event_data(self) -> tuple[str, tuple[float, float]]:
    """
    Retrieves the event data.

    Returns:
        tuple[str, tuple[float, float]]: A tuple containing:
            - The beacon's source.
            - the device geolocalization (latitude, longitude).
    """
    return (self._source, self._geoloc)

ExperimentFinishEvent

Bases: NodeEvent

Source code in nebula/core/nebulaevents.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class ExperimentFinishEvent(NodeEvent):
    def __init__(self):
        """Event triggered when experiment is going to finish."""
        pass

    def __str__(self):
        return "Experiment finished"

    async def get_event_data(self):
        pass

    async def is_concurrent(self):
        return False

__init__()

Event triggered when experiment is going to finish.

Source code in nebula/core/nebulaevents.py
91
92
93
def __init__(self):
    """Event triggered when experiment is going to finish."""
    pass

NodeFoundEvent

Bases: NodeEvent

Source code in nebula/core/nebulaevents.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
class NodeFoundEvent(NodeEvent):
    def __init__(self, node_addr):
        """Event triggered when a new node is found.

        Args:
            node_addr (str): Address of the neighboring node.
        """
        self._node_addr = node_addr

    def __str__(self):
        return f"Node addr: {self._node_addr} found"

    async def get_event_data(self) -> tuple[str, bool]:
        """Retrieves the node found event data.

        Returns:
            tuple[str, bool]:
                - node_addr (str): Address of the node found.
        """
        return self._node_addr

    async def is_concurrent(self) -> bool:
        return True

__init__(node_addr)

Event triggered when a new node is found.

Parameters:

Name Type Description Default
node_addr str

Address of the neighboring node.

required
Source code in nebula/core/nebulaevents.py
189
190
191
192
193
194
195
def __init__(self, node_addr):
    """Event triggered when a new node is found.

    Args:
        node_addr (str): Address of the neighboring node.
    """
    self._node_addr = node_addr

get_event_data() async

Retrieves the node found event data.

Returns:

Type Description
tuple[str, bool]

tuple[str, bool]: - node_addr (str): Address of the node found.

Source code in nebula/core/nebulaevents.py
200
201
202
203
204
205
206
207
async def get_event_data(self) -> tuple[str, bool]:
    """Retrieves the node found event data.

    Returns:
        tuple[str, bool]:
            - node_addr (str): Address of the node found.
    """
    return self._node_addr

RoundEndEvent

Bases: NodeEvent

Source code in nebula/core/nebulaevents.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class RoundEndEvent(NodeEvent):
    def __init__(self, round, end_time):
        """Event triggered when round is going to start.

        Args:
            round (int): Round number.
            end_time (time): Current time when round has ended.
        """
        self._round_end_time = end_time
        self._round = round

    def __str__(self):
        return "Round ending"

    async def get_event_data(self):
        """Retrieves the round start event data.

        Returns:
            tuple[int, float]:
                -round (int): Round number.
                -end_time (time): Current time when round has ended.
        """
        return (self._round, self._round_end_time)

    async def is_concurrent(self):
        return False

__init__(round, end_time)

Event triggered when round is going to start.

Parameters:

Name Type Description Default
round int

Round number.

required
end_time time

Current time when round has ended.

required
Source code in nebula/core/nebulaevents.py
63
64
65
66
67
68
69
70
71
def __init__(self, round, end_time):
    """Event triggered when round is going to start.

    Args:
        round (int): Round number.
        end_time (time): Current time when round has ended.
    """
    self._round_end_time = end_time
    self._round = round

get_event_data() async

Retrieves the round start event data.

Returns:

Type Description

tuple[int, float]: -round (int): Round number. -end_time (time): Current time when round has ended.

Source code in nebula/core/nebulaevents.py
76
77
78
79
80
81
82
83
84
async def get_event_data(self):
    """Retrieves the round start event data.

    Returns:
        tuple[int, float]:
            -round (int): Round number.
            -end_time (time): Current time when round has ended.
    """
    return (self._round, self._round_end_time)

RoundStartEvent

Bases: NodeEvent

Source code in nebula/core/nebulaevents.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class RoundStartEvent(NodeEvent):
    def __init__(self, round, start_time, expected_nodes):
        """Event triggered when round is going to start.

        Args:
            round (int): Round number.
            start_time (time): Current time when round is going to start.
        """
        self._round_start_time = start_time
        self._round = round
        self._expected_nodes = expected_nodes

    def __str__(self):
        return "Round starting"

    async def get_event_data(self):
        """Retrieves the round start event data.

        Returns:
            tuple[int, float]:
                -round (int): Round number.
                -start_time (time): Current time when round is going to start.
        """
        return (self._round, self._round_start_time, self._expected_nodes)

    async def is_concurrent(self):
        return False

__init__(round, start_time, expected_nodes)

Event triggered when round is going to start.

Parameters:

Name Type Description Default
round int

Round number.

required
start_time time

Current time when round is going to start.

required
Source code in nebula/core/nebulaevents.py
34
35
36
37
38
39
40
41
42
43
def __init__(self, round, start_time, expected_nodes):
    """Event triggered when round is going to start.

    Args:
        round (int): Round number.
        start_time (time): Current time when round is going to start.
    """
    self._round_start_time = start_time
    self._round = round
    self._expected_nodes = expected_nodes

get_event_data() async

Retrieves the round start event data.

Returns:

Type Description

tuple[int, float]: -round (int): Round number. -start_time (time): Current time when round is going to start.

Source code in nebula/core/nebulaevents.py
48
49
50
51
52
53
54
55
56
async def get_event_data(self):
    """Retrieves the round start event data.

    Returns:
        tuple[int, float]:
            -round (int): Round number.
            -start_time (time): Current time when round is going to start.
    """
    return (self._round, self._round_start_time, self._expected_nodes)

UpdateNeighborEvent

Bases: NodeEvent

Source code in nebula/core/nebulaevents.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
class UpdateNeighborEvent(NodeEvent):
    def __init__(self, node_addr, removed=False, joining=False):
        """Event triggered when a neighboring node is updated.

        Args:
            node_addr (str): Address of the neighboring node.
            removed (bool, optional): Indicates whether the node was removed.
                                      Defaults to False.
        """
        self._node_addr = node_addr
        self._removed = removed
        self._joining_federation = joining

    def __str__(self):
        return f"Node addr: {self._node_addr}, removed: {self._removed}"

    async def get_event_data(self) -> tuple[str, bool]:
        """Retrieves the neighbor update event data.

        Returns:
            tuple[str, bool]:
                - node_addr (str): Address of the neighboring node.
                - removed (bool): Whether the node was removed.
        """
        return (self._node_addr, self._removed)

    async def is_concurrent(self) -> bool:
        return False

    def is_joining_federation(self):
        return self._joining_federation

__init__(node_addr, removed=False, joining=False)

Event triggered when a neighboring node is updated.

Parameters:

Name Type Description Default
node_addr str

Address of the neighboring node.

required
removed bool

Indicates whether the node was removed. Defaults to False.

False
Source code in nebula/core/nebulaevents.py
141
142
143
144
145
146
147
148
149
150
151
def __init__(self, node_addr, removed=False, joining=False):
    """Event triggered when a neighboring node is updated.

    Args:
        node_addr (str): Address of the neighboring node.
        removed (bool, optional): Indicates whether the node was removed.
                                  Defaults to False.
    """
    self._node_addr = node_addr
    self._removed = removed
    self._joining_federation = joining

get_event_data() async

Retrieves the neighbor update event data.

Returns:

Type Description
tuple[str, bool]

tuple[str, bool]: - node_addr (str): Address of the neighboring node. - removed (bool): Whether the node was removed.

Source code in nebula/core/nebulaevents.py
156
157
158
159
160
161
162
163
164
async def get_event_data(self) -> tuple[str, bool]:
    """Retrieves the neighbor update event data.

    Returns:
        tuple[str, bool]:
            - node_addr (str): Address of the neighboring node.
            - removed (bool): Whether the node was removed.
    """
    return (self._node_addr, self._removed)

UpdateReceivedEvent

Bases: NodeEvent

Source code in nebula/core/nebulaevents.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
class UpdateReceivedEvent(NodeEvent):
    def __init__(self, decoded_model, weight, source, round, local=False):
        """
        Initializes an UpdateReceivedEvent.

        Args:
            decoded_model (Any): The received model update.
            weight (float): The weight associated with the received update.
            source (str): The identifier or address of the node that sent the update.
            round (int): The round number in which the update was received.
            local (bool): Local update
        """
        self._source = source
        self._round = round
        self._model = decoded_model
        self._weight = weight
        self._local = local

    def __str__(self):
        return f"Update received from source: {self._source}, round: {self._round}"

    async def get_event_data(self) -> tuple[object, int, str, int, bool]:
        """
        Retrieves the event data.

        Returns:
            tuple[Any, float, str, int, bool]: A tuple containing:
                - The received model update.
                - The weight associated with the update.
                - The source node identifier.
                - The round number of the update.
                - If the update is local
        """
        return (self._model, self._weight, self._source, self._round, self._local)

    async def is_concurrent(self) -> bool:
        return False

__init__(decoded_model, weight, source, round, local=False)

Initializes an UpdateReceivedEvent.

Parameters:

Name Type Description Default
decoded_model Any

The received model update.

required
weight float

The weight associated with the received update.

required
source str

The identifier or address of the node that sent the update.

required
round int

The round number in which the update was received.

required
local bool

Local update

False
Source code in nebula/core/nebulaevents.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def __init__(self, decoded_model, weight, source, round, local=False):
    """
    Initializes an UpdateReceivedEvent.

    Args:
        decoded_model (Any): The received model update.
        weight (float): The weight associated with the received update.
        source (str): The identifier or address of the node that sent the update.
        round (int): The round number in which the update was received.
        local (bool): Local update
    """
    self._source = source
    self._round = round
    self._model = decoded_model
    self._weight = weight
    self._local = local

get_event_data() async

Retrieves the event data.

Returns:

Type Description
tuple[object, int, str, int, bool]

tuple[Any, float, str, int, bool]: A tuple containing: - The received model update. - The weight associated with the update. - The source node identifier. - The round number of the update. - If the update is local

Source code in nebula/core/nebulaevents.py
234
235
236
237
238
239
240
241
242
243
244
245
246
async def get_event_data(self) -> tuple[object, int, str, int, bool]:
    """
    Retrieves the event data.

    Returns:
        tuple[Any, float, str, int, bool]: A tuple containing:
            - The received model update.
            - The weight associated with the update.
            - The source node identifier.
            - The round number of the update.
            - If the update is local
    """
    return (self._model, self._weight, self._source, self._round, self._local)