Skip to content

Documentation for Propagator Module

InitialModelPropagation

Bases: PropagationStrategy

Propagation strategy for sending the initial model to all newly connected nodes.

Sends a fresh model initialized by the trainer with a default weight.

Source code in nebula/core/network/propagator.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class InitialModelPropagation(PropagationStrategy):
    """
    Propagation strategy for sending the initial model to all newly connected nodes.

    Sends a fresh model initialized by the trainer with a default weight.
    """

    def __init__(self, aggregator: "Aggregator", trainer: "Lightning", engine: "Engine"):
        """
        Args:
            aggregator (Aggregator): The aggregator coordinating model rounds.
            trainer (Lightning): The local trainer instance providing model parameters.
            engine (Engine): The engine managing rounds and connections.
        """
        self.aggregator = aggregator
        self.trainer = trainer
        self.engine = engine

    async def get_round(self):
        """
        Get the current training round number from the engine.

        Returns:
            int: The current round index.
        """
        return await self.engine.get_round()

    async def is_node_eligible(self, node: str) -> bool:
        """
        Determine if a node has not yet received the initial model.

        Args:
            node (str): The identifier of the target node.

        Returns:
            bool: True if the node is not already in the ready connections list.
        """
        return node not in self.engine.cm.get_ready_connections()

    def prepare_model_payload(self, node: str) -> tuple[Any, float] | None:
        """
        Prepare the initial model parameters and default weight.

        Args:
            node (str): The identifier of the target node (not used in payload).

        Returns:
            tuple[Any, float]: The initialized model parameters and default model weight.
        """
        return (
            self.trainer.get_model_parameters(initialize=True),
            self.trainer.DEFAULT_MODEL_WEIGHT,
        )

__init__(aggregator, trainer, engine)

Parameters:

Name Type Description Default
aggregator Aggregator

The aggregator coordinating model rounds.

required
trainer Lightning

The local trainer instance providing model parameters.

required
engine Engine

The engine managing rounds and connections.

required
Source code in nebula/core/network/propagator.py
62
63
64
65
66
67
68
69
70
71
def __init__(self, aggregator: "Aggregator", trainer: "Lightning", engine: "Engine"):
    """
    Args:
        aggregator (Aggregator): The aggregator coordinating model rounds.
        trainer (Lightning): The local trainer instance providing model parameters.
        engine (Engine): The engine managing rounds and connections.
    """
    self.aggregator = aggregator
    self.trainer = trainer
    self.engine = engine

get_round() async

Get the current training round number from the engine.

Returns:

Name Type Description
int

The current round index.

Source code in nebula/core/network/propagator.py
73
74
75
76
77
78
79
80
async def get_round(self):
    """
    Get the current training round number from the engine.

    Returns:
        int: The current round index.
    """
    return await self.engine.get_round()

is_node_eligible(node) async

Determine if a node has not yet received the initial model.

Parameters:

Name Type Description Default
node str

The identifier of the target node.

required

Returns:

Name Type Description
bool bool

True if the node is not already in the ready connections list.

Source code in nebula/core/network/propagator.py
82
83
84
85
86
87
88
89
90
91
92
async def is_node_eligible(self, node: str) -> bool:
    """
    Determine if a node has not yet received the initial model.

    Args:
        node (str): The identifier of the target node.

    Returns:
        bool: True if the node is not already in the ready connections list.
    """
    return node not in self.engine.cm.get_ready_connections()

prepare_model_payload(node)

Prepare the initial model parameters and default weight.

Parameters:

Name Type Description Default
node str

The identifier of the target node (not used in payload).

required

Returns:

Type Description
tuple[Any, float] | None

tuple[Any, float]: The initialized model parameters and default model weight.

Source code in nebula/core/network/propagator.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def prepare_model_payload(self, node: str) -> tuple[Any, float] | None:
    """
    Prepare the initial model parameters and default weight.

    Args:
        node (str): The identifier of the target node (not used in payload).

    Returns:
        tuple[Any, float]: The initialized model parameters and default model weight.
    """
    return (
        self.trainer.get_model_parameters(initialize=True),
        self.trainer.DEFAULT_MODEL_WEIGHT,
    )

PropagationStrategy

Bases: ABC

Abstract base class defining the interface for model propagation strategies.

Subclasses implement eligibility checks and payload preparation for sending model updates to specific nodes in the federation.

Source code in nebula/core/network/propagator.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class PropagationStrategy(ABC):
    """
    Abstract base class defining the interface for model propagation strategies.

    Subclasses implement eligibility checks and payload preparation for sending
    model updates to specific nodes in the federation.
    """

    @abstractmethod
    async def is_node_eligible(self, node: str) -> bool:
        """
        Determine whether a given node should receive the model payload.

        Args:
            node (str): The address or identifier of the target node.

        Returns:
            bool: True if the node is eligible to receive the payload, False otherwise.
        """
        pass

    @abstractmethod
    def prepare_model_payload(self, node: str) -> tuple[Any, float] | None:
        """
        Prepare the model data and weight for transmission to a node.

        Args:
            node (str): The address or identifier of the target node.

        Returns:
            tuple[Any, float] | None: A tuple containing the model object and its associated weight,
                                       or None if no payload should be sent.
        """
        pass

is_node_eligible(node) abstractmethod async

Determine whether a given node should receive the model payload.

Parameters:

Name Type Description Default
node str

The address or identifier of the target node.

required

Returns:

Name Type Description
bool bool

True if the node is eligible to receive the payload, False otherwise.

Source code in nebula/core/network/propagator.py
27
28
29
30
31
32
33
34
35
36
37
38
@abstractmethod
async def is_node_eligible(self, node: str) -> bool:
    """
    Determine whether a given node should receive the model payload.

    Args:
        node (str): The address or identifier of the target node.

    Returns:
        bool: True if the node is eligible to receive the payload, False otherwise.
    """
    pass

prepare_model_payload(node) abstractmethod

Prepare the model data and weight for transmission to a node.

Parameters:

Name Type Description Default
node str

The address or identifier of the target node.

required

Returns:

Type Description
tuple[Any, float] | None

tuple[Any, float] | None: A tuple containing the model object and its associated weight, or None if no payload should be sent.

Source code in nebula/core/network/propagator.py
40
41
42
43
44
45
46
47
48
49
50
51
52
@abstractmethod
def prepare_model_payload(self, node: str) -> tuple[Any, float] | None:
    """
    Prepare the model data and weight for transmission to a node.

    Args:
        node (str): The address or identifier of the target node.

    Returns:
        tuple[Any, float] | None: A tuple containing the model object and its associated weight,
                                   or None if no payload should be sent.
    """
    pass

Propagator

Service responsible for propagating messages throughout the federation network.

The Propagator performs
  • Broadcasting discovery or control messages to all relevant peers.
  • Managing propagation strategies (e.g., flood, gossip, or efficient spanning tree).
  • Tracking propagation state to avoid infinite loops or redundant sends.
  • Coordinating with the CommunicationsManager and Forwarder for message dispatch.

Designed to work asynchronously, ensuring timely and scalable message dissemination across dynamically changing network topologies.

Source code in nebula/core/network/propagator.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
class Propagator:
    """
    Service responsible for propagating messages throughout the federation network.

    The Propagator performs:
      - Broadcasting discovery or control messages to all relevant peers.
      - Managing propagation strategies (e.g., flood, gossip, or efficient spanning tree).
      - Tracking propagation state to avoid infinite loops or redundant sends.
      - Coordinating with the CommunicationsManager and Forwarder for message dispatch.

    Designed to work asynchronously, ensuring timely and scalable message dissemination
    across dynamically changing network topologies.
    """

    def __init__(self):
        self._cm = None
        self._running = asyncio.Event()

    @property
    def cm(self):
        """
        Lazy-load and return the CommunicationsManager instance for sending messages.

        Returns:
            CommunicationsManager: The singleton communications manager.
        """
        if not self._cm:
            from nebula.core.network.communications import CommunicationsManager

            self._cm = CommunicationsManager.get_instance()
            return self._cm
        else:
            return self._cm

    async def start(self):
        """
        Initialize the Propagator by retrieving core components and configuration,
        setting up propagation intervals, history buffer, and strategy instances.

        This method must be called before any propagation cycles to ensure that
        all dependencies (engine, trainer, aggregator, etc.) are available.
        """
        await EventManager.get_instance().subscribe_node_event(ModelPropagationEvent, self._propagate)
        self.engine: Engine = self.cm.engine
        self.config: Config = self.cm.get_config()
        self.addr = self.cm.get_addr()
        self.aggregator: Aggregator = self.engine.aggregator
        self.trainer: Lightning = self.engine._trainer

        self.status_history = deque(maxlen=self.config.participant["propagator_args"]["history_size"])

        self.interval = self.config.participant["propagator_args"]["propagate_interval"]
        self.model_interval = self.config.participant["propagator_args"]["propagate_model_interval"]
        self.early_stop = self.config.participant["propagator_args"]["propagation_early_stop"]
        self.stable_rounds_count = 0

        # Propagation strategies (adapt to the specific use case)
        self.strategies = {
            "initialization": InitialModelPropagation(self.aggregator, self.trainer, self.engine),
            "stable": StableModelPropagation(self.aggregator, self.trainer, self.engine),
        }
        print_msg_box(
            msg="Starting propagator functionality...\nModel propagation through the network",
            indent=2,
            title="Propagator",
        )
        self._running.set()

    async def get_round(self):
        """
        Retrieve the current federated learning round number.

        Returns:
            int: The current round index from the engine.
        """
        return await self.engine.get_round()

    def update_and_check_neighbors(self, strategy, eligible_neighbors):
        """
        Update the history of eligible neighbors and determine if propagation should continue.

        Appends the current list of eligible neighbors to a bounded history. If the history
        buffer fills with identical entries, propagation is halted to prevent redundant sends.

        Args:
            strategy (PropagationStrategy): The propagation strategy in use.
            eligible_neighbors (list): List of neighbor addresses eligible for propagation.

        Returns:
            bool: True if propagation should continue, False if it should stop due to repeated history.
        """
        # Update the status of eligible neighbors
        current_status = [n for n in eligible_neighbors]

        # Check if the deque is full and the new status is different from the last one
        if self.status_history and current_status != self.status_history[-1]:
            logging.info(
                f"Status History deque is full and the new status is different from the last one: {list(self.status_history)}"
            )
            self.status_history.append(current_status)
            return True

        # Add the current status to the deque
        logging.info(f"Adding current status to the deque: {current_status}")
        self.status_history.append(current_status)

        # If the deque is full and all elements are the same, stop propagation
        if len(self.status_history) == self.status_history.maxlen and all(
            s == self.status_history[0] for s in self.status_history
        ):
            logging.info(
                f"Propagator exited for {self.status_history.maxlen} equal rounds: {list(self.status_history)}"
            )
            return False

        return True

    def reset_status_history(self):
        """
        Clear the history buffer of neighbor eligibility statuses.

        This is typically done at the start of a new propagation cycle.
        """
        self.status_history.clear()

    async def _propagate(self, mpe: ModelPropagationEvent):
        """
        Execute a single propagation cycle using the specified strategy.

        1. Resets status history.
        2. Validates the strategy and current round.
        3. Identifies eligible neighbors.
        4. Updates history and checks for repeated statuses.
        5. Prepares and serializes the model payload.
        6. Sends the model message to each eligible neighbor.
        7. Waits for the configured interval before concluding.

        Args:
            strategy_id (str): Key identifying which propagation strategy to use
                            (e.g., "initialization" or "stable").

        Returns:
            bool: True if propagation occurred (payload sent), False if halted early.
        """
        eligible_neighbors, strategy_id = await mpe.get_event_data()

        self.reset_status_history()
        if strategy_id not in self.strategies:
            logging.info(f"Strategy {strategy_id} not found.")
            return False
        if await self.get_round() is None:
            logging.info("Propagation halted: round is not set.")
            return False

        strategy = self.strategies[strategy_id]
        logging.info(f"Starting model propagation with strategy: {strategy_id}")

        # current_connections = await self.cm.get_addrs_current_connections(only_direct=True)
        # eligible_neighbors = [
        #     neighbor_addr for neighbor_addr in current_connections if await strategy.is_node_eligible(neighbor_addr)
        # ]
        logging.info(f"Eligible neighbors for model propagation: {eligible_neighbors}")
        if not eligible_neighbors:
            logging.info("Propagation complete: No eligible neighbors.")
            return False

        logging.info("Checking repeated statuses during propagation")
        if not self.update_and_check_neighbors(strategy, eligible_neighbors):
            logging.info("Exiting propagation due to repeated statuses.")
            return False

        model_params, weight = strategy.prepare_model_payload(None)
        if model_params:
            serialized_model = (
                model_params if isinstance(model_params, bytes) else self.trainer.serialize_model(model_params)
            )
        else:
            serialized_model = None

        current_round = await self.get_round()
        round_number = -1 if strategy_id == "initialization" else current_round
        parameters = serialized_model
        message = self.cm.create_message("model", "", round_number, parameters, weight)
        for neighbor_addr in eligible_neighbors:
            logging.info(
                f"Sending model to {neighbor_addr} with round {await self.get_round()}: weight={weight} | size={sys.getsizeof(serialized_model) / (1024** 2) if serialized_model is not None else 0} MB"
            )
            asyncio.create_task(self.cm.send_message(neighbor_addr, message, "model"))
            # asyncio.create_task(self.cm.send_model(neighbor_addr, round_number, serialized_model, weight))

        await asyncio.sleep(self.interval)
        return True

    async def get_model_information(self, dest_addr, strategy_id: str, init=False):
        """
        Retrieve the serialized model payload and round metadata for making an offer to a node.

        Args:
            dest_addr (str): The address of the destination node.
            strategy_id (str): Key identifying which propagation strategy to use.
            init (bool, optional): If True, bypasses strategy and round validation (used for initial offers). Defaults to False.

        Returns:
            tuple(bytes, int, int) | None:
                A tuple containing:
                - serialized_model (bytes): The model payload ready for transmission.
                - total_rounds (int): The configured total number of rounds.
                - current_round (int): The current federated learning round.
                Returns None if the strategy is invalid, the round is unset, or no payload is prepared.
        """
        if not init:
            if strategy_id not in self.strategies:
                logging.info(f"Strategy {strategy_id} not found.")
                return None
            if await self.get_round() is None:
                logging.info("Propagation halted: round is not set.")
                return None

        strategy = self.strategies[strategy_id]
        logging.info(f"Preparing model information with strategy to make an offer: {strategy_id}")

        model_params, weight = strategy.prepare_model_payload(None)
        rounds = self.engine.total_rounds

        if model_params:
            serialized_model = (
                model_params if isinstance(model_params, bytes) else self.trainer.serialize_model(model_params)
            )
            return (serialized_model, rounds, await self.get_round())

        return None

    async def stop(self):
        logging.info("🌐  Stopping Propagator module...")
        self._running.clear()

    async def is_running(self):
        return self._running.is_set()

cm property

Lazy-load and return the CommunicationsManager instance for sending messages.

Returns:

Name Type Description
CommunicationsManager

The singleton communications manager.

get_model_information(dest_addr, strategy_id, init=False) async

Retrieve the serialized model payload and round metadata for making an offer to a node.

Parameters:

Name Type Description Default
dest_addr str

The address of the destination node.

required
strategy_id str

Key identifying which propagation strategy to use.

required
init bool

If True, bypasses strategy and round validation (used for initial offers). Defaults to False.

False

Returns:

Type Description

tuple(bytes, int, int) | None: A tuple containing: - serialized_model (bytes): The model payload ready for transmission. - total_rounds (int): The configured total number of rounds. - current_round (int): The current federated learning round. Returns None if the strategy is invalid, the round is unset, or no payload is prepared.

Source code in nebula/core/network/propagator.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
async def get_model_information(self, dest_addr, strategy_id: str, init=False):
    """
    Retrieve the serialized model payload and round metadata for making an offer to a node.

    Args:
        dest_addr (str): The address of the destination node.
        strategy_id (str): Key identifying which propagation strategy to use.
        init (bool, optional): If True, bypasses strategy and round validation (used for initial offers). Defaults to False.

    Returns:
        tuple(bytes, int, int) | None:
            A tuple containing:
            - serialized_model (bytes): The model payload ready for transmission.
            - total_rounds (int): The configured total number of rounds.
            - current_round (int): The current federated learning round.
            Returns None if the strategy is invalid, the round is unset, or no payload is prepared.
    """
    if not init:
        if strategy_id not in self.strategies:
            logging.info(f"Strategy {strategy_id} not found.")
            return None
        if await self.get_round() is None:
            logging.info("Propagation halted: round is not set.")
            return None

    strategy = self.strategies[strategy_id]
    logging.info(f"Preparing model information with strategy to make an offer: {strategy_id}")

    model_params, weight = strategy.prepare_model_payload(None)
    rounds = self.engine.total_rounds

    if model_params:
        serialized_model = (
            model_params if isinstance(model_params, bytes) else self.trainer.serialize_model(model_params)
        )
        return (serialized_model, rounds, await self.get_round())

    return None

get_round() async

Retrieve the current federated learning round number.

Returns:

Name Type Description
int

The current round index from the engine.

Source code in nebula/core/network/propagator.py
234
235
236
237
238
239
240
241
async def get_round(self):
    """
    Retrieve the current federated learning round number.

    Returns:
        int: The current round index from the engine.
    """
    return await self.engine.get_round()

reset_status_history()

Clear the history buffer of neighbor eligibility statuses.

This is typically done at the start of a new propagation cycle.

Source code in nebula/core/network/propagator.py
283
284
285
286
287
288
289
def reset_status_history(self):
    """
    Clear the history buffer of neighbor eligibility statuses.

    This is typically done at the start of a new propagation cycle.
    """
    self.status_history.clear()

start() async

Initialize the Propagator by retrieving core components and configuration, setting up propagation intervals, history buffer, and strategy instances.

This method must be called before any propagation cycles to ensure that all dependencies (engine, trainer, aggregator, etc.) are available.

Source code in nebula/core/network/propagator.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
async def start(self):
    """
    Initialize the Propagator by retrieving core components and configuration,
    setting up propagation intervals, history buffer, and strategy instances.

    This method must be called before any propagation cycles to ensure that
    all dependencies (engine, trainer, aggregator, etc.) are available.
    """
    await EventManager.get_instance().subscribe_node_event(ModelPropagationEvent, self._propagate)
    self.engine: Engine = self.cm.engine
    self.config: Config = self.cm.get_config()
    self.addr = self.cm.get_addr()
    self.aggregator: Aggregator = self.engine.aggregator
    self.trainer: Lightning = self.engine._trainer

    self.status_history = deque(maxlen=self.config.participant["propagator_args"]["history_size"])

    self.interval = self.config.participant["propagator_args"]["propagate_interval"]
    self.model_interval = self.config.participant["propagator_args"]["propagate_model_interval"]
    self.early_stop = self.config.participant["propagator_args"]["propagation_early_stop"]
    self.stable_rounds_count = 0

    # Propagation strategies (adapt to the specific use case)
    self.strategies = {
        "initialization": InitialModelPropagation(self.aggregator, self.trainer, self.engine),
        "stable": StableModelPropagation(self.aggregator, self.trainer, self.engine),
    }
    print_msg_box(
        msg="Starting propagator functionality...\nModel propagation through the network",
        indent=2,
        title="Propagator",
    )
    self._running.set()

update_and_check_neighbors(strategy, eligible_neighbors)

Update the history of eligible neighbors and determine if propagation should continue.

Appends the current list of eligible neighbors to a bounded history. If the history buffer fills with identical entries, propagation is halted to prevent redundant sends.

Parameters:

Name Type Description Default
strategy PropagationStrategy

The propagation strategy in use.

required
eligible_neighbors list

List of neighbor addresses eligible for propagation.

required

Returns:

Name Type Description
bool

True if propagation should continue, False if it should stop due to repeated history.

Source code in nebula/core/network/propagator.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def update_and_check_neighbors(self, strategy, eligible_neighbors):
    """
    Update the history of eligible neighbors and determine if propagation should continue.

    Appends the current list of eligible neighbors to a bounded history. If the history
    buffer fills with identical entries, propagation is halted to prevent redundant sends.

    Args:
        strategy (PropagationStrategy): The propagation strategy in use.
        eligible_neighbors (list): List of neighbor addresses eligible for propagation.

    Returns:
        bool: True if propagation should continue, False if it should stop due to repeated history.
    """
    # Update the status of eligible neighbors
    current_status = [n for n in eligible_neighbors]

    # Check if the deque is full and the new status is different from the last one
    if self.status_history and current_status != self.status_history[-1]:
        logging.info(
            f"Status History deque is full and the new status is different from the last one: {list(self.status_history)}"
        )
        self.status_history.append(current_status)
        return True

    # Add the current status to the deque
    logging.info(f"Adding current status to the deque: {current_status}")
    self.status_history.append(current_status)

    # If the deque is full and all elements are the same, stop propagation
    if len(self.status_history) == self.status_history.maxlen and all(
        s == self.status_history[0] for s in self.status_history
    ):
        logging.info(
            f"Propagator exited for {self.status_history.maxlen} equal rounds: {list(self.status_history)}"
        )
        return False

    return True

StableModelPropagation

Bases: PropagationStrategy

Propagation strategy for sending model updates after the initial round.

Sends the latest trained model to neighbors.

Source code in nebula/core/network/propagator.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class StableModelPropagation(PropagationStrategy):
    """
    Propagation strategy for sending model updates after the initial round.

    Sends the latest trained model to neighbors.
    """

    def __init__(self, aggregator: "Aggregator", trainer: "Lightning", engine: "Engine"):
        """
        Args:
            aggregator (Aggregator): The aggregator coordinating model rounds.
            trainer (Lightning): The local trainer instance providing model parameters and weight.
            engine (Engine): The engine managing rounds, connections, and addresses.
        """
        self.aggregator = aggregator
        self.trainer = trainer
        self.engine = engine
        self.addr = self.engine.get_addr()

    async def get_round(self):
        """
        Get the current training round number from the engine.

        Returns:
            int: The current round index.
        """
        return await self.engine.get_round()

    async def is_node_eligible(self, node: str) -> bool:
        """
        Determine if a node requires a model update based on aggregation state.

        Args:
            node (str): The identifier of the target node.

        Returns:
            bool: True if the node is pending aggregation or its last federated round
                  is less than the current round.
        """
        return (node not in self.aggregator.get_nodes_pending_models_to_aggregate()) or (
            self.engine.cm.connections[node].get_federated_round() < await self.get_round()
        )

    def prepare_model_payload(self, node: str) -> tuple[Any, float] | None:
        """
        Prepare the current model parameters and their corresponding weight.

        Args:
            node (str): The identifier of the target node (not used in payload).

        Returns:
            tuple[Any, float]: The model parameters and model weight for propagation.
        """
        return self.trainer.get_model_parameters(), self.trainer.get_model_weight()

__init__(aggregator, trainer, engine)

Parameters:

Name Type Description Default
aggregator Aggregator

The aggregator coordinating model rounds.

required
trainer Lightning

The local trainer instance providing model parameters and weight.

required
engine Engine

The engine managing rounds, connections, and addresses.

required
Source code in nebula/core/network/propagator.py
117
118
119
120
121
122
123
124
125
126
127
def __init__(self, aggregator: "Aggregator", trainer: "Lightning", engine: "Engine"):
    """
    Args:
        aggregator (Aggregator): The aggregator coordinating model rounds.
        trainer (Lightning): The local trainer instance providing model parameters and weight.
        engine (Engine): The engine managing rounds, connections, and addresses.
    """
    self.aggregator = aggregator
    self.trainer = trainer
    self.engine = engine
    self.addr = self.engine.get_addr()

get_round() async

Get the current training round number from the engine.

Returns:

Name Type Description
int

The current round index.

Source code in nebula/core/network/propagator.py
129
130
131
132
133
134
135
136
async def get_round(self):
    """
    Get the current training round number from the engine.

    Returns:
        int: The current round index.
    """
    return await self.engine.get_round()

is_node_eligible(node) async

Determine if a node requires a model update based on aggregation state.

Parameters:

Name Type Description Default
node str

The identifier of the target node.

required

Returns:

Name Type Description
bool bool

True if the node is pending aggregation or its last federated round is less than the current round.

Source code in nebula/core/network/propagator.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
async def is_node_eligible(self, node: str) -> bool:
    """
    Determine if a node requires a model update based on aggregation state.

    Args:
        node (str): The identifier of the target node.

    Returns:
        bool: True if the node is pending aggregation or its last federated round
              is less than the current round.
    """
    return (node not in self.aggregator.get_nodes_pending_models_to_aggregate()) or (
        self.engine.cm.connections[node].get_federated_round() < await self.get_round()
    )

prepare_model_payload(node)

Prepare the current model parameters and their corresponding weight.

Parameters:

Name Type Description Default
node str

The identifier of the target node (not used in payload).

required

Returns:

Type Description
tuple[Any, float] | None

tuple[Any, float]: The model parameters and model weight for propagation.

Source code in nebula/core/network/propagator.py
153
154
155
156
157
158
159
160
161
162
163
def prepare_model_payload(self, node: str) -> tuple[Any, float] | None:
    """
    Prepare the current model parameters and their corresponding weight.

    Args:
        node (str): The identifier of the target node (not used in payload).

    Returns:
        tuple[Any, float]: The model parameters and model weight for propagation.
    """
    return self.trainer.get_model_parameters(), self.trainer.get_model_weight()