Skip to content

Documentation for Distanceneighborpolicy Module

DistanceNeighborPolicy

Bases: NeighborPolicy

Source code in nebula/core/situationalawareness/awareness/sanetwork/neighborpolicies/distanceneighborpolicy.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
class DistanceNeighborPolicy(NeighborPolicy):
    # INFO: This value may change according to the needs of the federation
    MAX_DISTANCE_THRESHOLD = 200

    def __init__(self):
        self.max_neighbors = None
        self.nodes_known = set()
        self.neighbors = set()
        self.addr = None
        self.neighbors_lock = Locker(name="neighbors_lock", async_lock=True)
        self.nodes_known_lock = Locker(name="nodes_known_lock", async_lock=True)
        self.nodes_distances: dict[str, tuple[float, tuple[float, float]]] = None
        self.nodes_distances_lock = Locker("nodes_distances_lock", async_lock=True)
        self._verbose = False

    async def set_config(self, config):
        """
        Args:
            config[0] -> list of self neighbors
            config[1] -> list of nodes known on federation
            config[2] -> self addr
            config[3] -> stricted_topology
        """
        logging.info("Initializing Distance Topology Neighbor Policy")
        async with self.neighbors_lock:
            self.neighbors = config[0]
        for addr in config[1]:
            self.nodes_known.add(addr)
        self.addr

        await EventManager.get_instance().subscribe_addonevent(GPSEvent, self._udpate_distances)

    async def _udpate_distances(self, gpsevent: GPSEvent):
        async with self.nodes_distances_lock:
            distances = await gpsevent.get_event_data()
            self.nodes_distances = distances

    async def need_more_neighbors(self):
        async with self.neighbors_lock:
            async with self.nodes_distances_lock:
                if not self.nodes_distances:
                    return False

                closest_nodes: set[str] = {
                    nodo_id
                    for nodo_id, (distancia, _) in self.nodes_distances.items()
                    if distancia < self.MAX_DISTANCE_THRESHOLD
                }
                available_nodes = closest_nodes.difference(self.neighbors)
                if self._verbose:
                    logging.info(f"Available neighbors based on distance: {available_nodes}")
                return len(available_nodes) > 0

    async def accept_connection(self, source, joining=False):
        """
        return true if connection is accepted
        """
        async with self.neighbors_lock:
            ac = source not in self.neighbors
        return ac

    async def meet_node(self, node):
        """
        Update the list of nodes known on federation
        """
        async with self.nodes_known_lock:
            if node != self.addr:
                if node not in self.nodes_known:
                    logging.info(f"Update nodes known | addr: {node}")
                self.nodes_known.add(node)

    async def get_nodes_known(self, neighbors_too=False, neighbors_only=False):
        if neighbors_only:
            async with self.neighbors_lock:
                no = self.neighbors.copy()
                return no

        async with self.nodes_known_lock:
            nk = self.nodes_known.copy()
            if not neighbors_too:
                async with self.neighbors_lock:
                    nk = self.nodes_known - self.neighbors
        return nk

    async def forget_nodes(self, nodes, forget_all=False):
        async with self.nodes_known_lock:
            if forget_all:
                self.nodes_known.clear()
            else:
                for node in nodes:
                    self.nodes_known.discard(node)

    async def get_actions(self):
        """
        return list of actions to do in response to connection
            - First list represents addrs argument to LinkMessage to connect to
            - Second one represents the same but for disconnect from LinkMessage
        """
        return [await self._connect_to(), await self._disconnect_from()]

    async def _disconnect_from(self):
        return ""

    async def _connect_to(self):
        ct = ""
        async with self.neighbors_lock:
            ct = " ".join(self.neighbors)
        return ct

    async def update_neighbors(self, node, remove=False):
        if node == self.addr:
            return
        async with self.neighbors_lock:
            if remove:
                try:
                    self.neighbors.remove(node)
                    if self._verbose:
                        logging.info(f"Remove neighbor | addr: {node}")
                except KeyError:
                    pass
            else:
                self.neighbors.add(node)
                if self._verbose:
                    logging.info(f"Add neighbor | addr: {node}")

    async def get_posible_neighbors(self):
        """Return set of posible neighbors to connect to."""
        async with self.neighbors_lock:
            async with self.nodes_distances_lock:
                closest_nodes: set[str] = {
                    nodo_id
                    for nodo_id, (distancia, _) in self.nodes_distances.items()
                    if distancia < self.MAX_DISTANCE_THRESHOLD - 20
                }
                if self._verbose:
                    logging.info(f"Closest nodes: {closest_nodes}, neighbors: {self.neighbors}")
                available_nodes = closest_nodes.difference(self.neighbors)
                if self._verbose:
                    logging.info(f"Available neighbors based on distance: {available_nodes}")
                return available_nodes

    async def any_leftovers_neighbors(self):
        distant_nodes = set()
        async with self.neighbors_lock:
            async with self.nodes_distances_lock:
                if not self.nodes_distances:
                    return False

                distant_nodes: set[str] = {
                    nodo_id
                    for nodo_id, (distancia, _) in self.nodes_distances.items()
                    if distancia > self.MAX_DISTANCE_THRESHOLD
                }
                distant_nodes = self.neighbors.intersection(distant_nodes)
                if self._verbose:
                    logging.info(f"Distant neighbors based on distance: {distant_nodes}")
        return len(distant_nodes) > 0

    async def get_neighbors_to_remove(self):
        distant_nodes = set()
        async with self.neighbors_lock:
            async with self.nodes_distances_lock:
                distant_nodes: set[str] = {
                    nodo_id
                    for nodo_id, (distancia, _) in self.nodes_distances.items()
                    if distancia > self.MAX_DISTANCE_THRESHOLD
                }
                distant_nodes = self.neighbors.intersection(distant_nodes)
                if self._verbose:
                    logging.info(f"Remove neighbors based on distance: {distant_nodes}")
        return distant_nodes

    def stricted_topology_status(stricted_topology: bool):
        pass

accept_connection(source, joining=False) async

return true if connection is accepted

Source code in nebula/core/situationalawareness/awareness/sanetwork/neighborpolicies/distanceneighborpolicy.py
62
63
64
65
66
67
68
async def accept_connection(self, source, joining=False):
    """
    return true if connection is accepted
    """
    async with self.neighbors_lock:
        ac = source not in self.neighbors
    return ac

get_actions() async

return list of actions to do in response to connection - First list represents addrs argument to LinkMessage to connect to - Second one represents the same but for disconnect from LinkMessage

Source code in nebula/core/situationalawareness/awareness/sanetwork/neighborpolicies/distanceneighborpolicy.py
101
102
103
104
105
106
107
async def get_actions(self):
    """
    return list of actions to do in response to connection
        - First list represents addrs argument to LinkMessage to connect to
        - Second one represents the same but for disconnect from LinkMessage
    """
    return [await self._connect_to(), await self._disconnect_from()]

get_posible_neighbors() async

Return set of posible neighbors to connect to.

Source code in nebula/core/situationalawareness/awareness/sanetwork/neighborpolicies/distanceneighborpolicy.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
async def get_posible_neighbors(self):
    """Return set of posible neighbors to connect to."""
    async with self.neighbors_lock:
        async with self.nodes_distances_lock:
            closest_nodes: set[str] = {
                nodo_id
                for nodo_id, (distancia, _) in self.nodes_distances.items()
                if distancia < self.MAX_DISTANCE_THRESHOLD - 20
            }
            if self._verbose:
                logging.info(f"Closest nodes: {closest_nodes}, neighbors: {self.neighbors}")
            available_nodes = closest_nodes.difference(self.neighbors)
            if self._verbose:
                logging.info(f"Available neighbors based on distance: {available_nodes}")
            return available_nodes

meet_node(node) async

Update the list of nodes known on federation

Source code in nebula/core/situationalawareness/awareness/sanetwork/neighborpolicies/distanceneighborpolicy.py
70
71
72
73
74
75
76
77
78
async def meet_node(self, node):
    """
    Update the list of nodes known on federation
    """
    async with self.nodes_known_lock:
        if node != self.addr:
            if node not in self.nodes_known:
                logging.info(f"Update nodes known | addr: {node}")
            self.nodes_known.add(node)

set_config(config) async

Source code in nebula/core/situationalawareness/awareness/sanetwork/neighborpolicies/distanceneighborpolicy.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
async def set_config(self, config):
    """
    Args:
        config[0] -> list of self neighbors
        config[1] -> list of nodes known on federation
        config[2] -> self addr
        config[3] -> stricted_topology
    """
    logging.info("Initializing Distance Topology Neighbor Policy")
    async with self.neighbors_lock:
        self.neighbors = config[0]
    for addr in config[1]:
        self.nodes_known.add(addr)
    self.addr

    await EventManager.get_instance().subscribe_addonevent(GPSEvent, self._udpate_distances)