Documentation for Aggregator Module¶
Aggregator
¶
Bases: ABC
Source code in nebula/core/aggregation/aggregator.py
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
|
us
property
¶
Federation type UpdateHandler (e.g. DFL-UpdateHandler, CFL-UpdateHandler...)
get_aggregation()
async
¶
Handles the aggregation process for a training round.
This method waits for all expected model updates from federation nodes or until a timeout occurs.
It uses an asynchronous lock to coordinate access and includes an early exit mechanism if all
updates are received before the timeout. Once the condition is satisfied, it releases the lock,
collects the updates, identifies any missing nodes, and publishes an AggregationEvent
.
Finally, it runs the aggregation algorithm and returns the result.
Returns:
Name | Type | Description |
---|---|---|
Any |
The result of the aggregation process, as returned by |
Raises:
Type | Description |
---|---|
TimeoutError
|
If the aggregation lock is not acquired within the defined timeout. |
CancelledError
|
If the aggregation lock acquisition is cancelled. |
Exception
|
For any other unexpected errors during the aggregation process. |
Source code in nebula/core/aggregation/aggregator.py
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
|
update_federation_nodes(federation_nodes)
async
¶
Updates the current set of nodes expected to participate in the upcoming aggregation round.
This method informs the update handler (us
) about the new set of federation nodes,
clears any pending models, and attempts to acquire the aggregation lock to prepare
for model aggregation. If the aggregation process is already running, it releases the lock
and tries again to ensure proper cleanup between rounds.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
federation_nodes
|
set
|
A set of addresses representing the nodes expected to contribute updates for the next aggregation round. |
required |
Raises:
Type | Description |
---|---|
Exception
|
If the aggregation process is already running and the lock cannot be released. |
Source code in nebula/core/aggregation/aggregator.py
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
|