Skip to content

Documentation for Htstrainingpolicy Module

HTSTrainingPolicy

Bases: TrainingPolicy

Implements a Hybrid Training Strategy (HTS) that combines multiple training policies (e.g., QDS, FRTS) to collaboratively decide on the evaluation and potential pruning of neighbors in a decentralized federated learning scenario.

Attributes:

Name Type Description
TRAINING_POLICY set

Names of training policy classes to instantiate and manage.

Source code in nebula/core/situationalawareness/awareness/satraining/trainingpolicy/htstrainingpolicy.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class HTSTrainingPolicy(TrainingPolicy):
    """
    Implements a Hybrid Training Strategy (HTS) that combines multiple training policies 
    (e.g., QDS, FRTS) to collaboratively decide on the evaluation and potential pruning 
    of neighbors in a decentralized federated learning scenario.

    Attributes:
        TRAINING_POLICY (set): Names of training policy classes to instantiate and manage.
    """

    TRAINING_POLICY = {
        "Quality-Driven Selection",
        "Fast Reboot Training Strategy",
    }

    def __init__(self, config):
        """
        Initializes the HTS policy with the node's address and verbosity level.
        It creates instances of each sub-policy listed in TRAINING_POLICY.

        Args:
            config (dict): Configuration dictionary with keys:
                - 'addr': Node's address
                - 'verbose': Enable verbose logging
        """
        self._addr = config["addr"]
        self._verbose = config["verbose"]
        self._training_policies : set[TrainingPolicy] = set()
        self._training_policies.add([factory_training_policy(x, config) for x in self.TRAINING_POLICY])

    def __str__(self):
        return "HTS"    

    @property
    def tps(self):
        return self._training_policies  

    async def init(self, config):
        for tp in self.tps:
            await tp.init(config)    

    async def update_neighbors(self, node, remove=False):
        pass

    async def get_evaluation_results(self):
        """
        Asynchronously calls the `get_evaluation_results` of each policy,
        and logs the nodes each policy would remove.

        Returns:
            None (future version may merge all evaluations).
        """
        nodes_to_remove = dict()
        for tp in self.tps:
            nodes_to_remove[tp] = await tp.get_evaluation_results()

        for tp, nodes in nodes_to_remove.items():
            logging.info(f"Training Policy: {tp}, nodes to remove: {nodes}")

        return None

__init__(config)

Initializes the HTS policy with the node's address and verbosity level. It creates instances of each sub-policy listed in TRAINING_POLICY.

Parameters:

Name Type Description Default
config dict

Configuration dictionary with keys: - 'addr': Node's address - 'verbose': Enable verbose logging

required
Source code in nebula/core/situationalawareness/awareness/satraining/trainingpolicy/htstrainingpolicy.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def __init__(self, config):
    """
    Initializes the HTS policy with the node's address and verbosity level.
    It creates instances of each sub-policy listed in TRAINING_POLICY.

    Args:
        config (dict): Configuration dictionary with keys:
            - 'addr': Node's address
            - 'verbose': Enable verbose logging
    """
    self._addr = config["addr"]
    self._verbose = config["verbose"]
    self._training_policies : set[TrainingPolicy] = set()
    self._training_policies.add([factory_training_policy(x, config) for x in self.TRAINING_POLICY])

get_evaluation_results() async

Asynchronously calls the get_evaluation_results of each policy, and logs the nodes each policy would remove.

Returns:

Type Description

None (future version may merge all evaluations).

Source code in nebula/core/situationalawareness/awareness/satraining/trainingpolicy/htstrainingpolicy.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
async def get_evaluation_results(self):
    """
    Asynchronously calls the `get_evaluation_results` of each policy,
    and logs the nodes each policy would remove.

    Returns:
        None (future version may merge all evaluations).
    """
    nodes_to_remove = dict()
    for tp in self.tps:
        nodes_to_remove[tp] = await tp.get_evaluation_results()

    for tp, nodes in nodes_to_remove.items():
        logging.info(f"Training Policy: {tp}, nodes to remove: {nodes}")

    return None