9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182 | class DistanceNeighborPolicy(NeighborPolicy):
# INFO: This value may change according to the needs of the federation
MAX_DISTANCE_THRESHOLD = 200
def __init__(self):
self.max_neighbors = None
self.nodes_known = set()
self.neighbors = set()
self.addr = None
self.neighbors_lock = Locker(name="neighbors_lock", async_lock=True)
self.nodes_known_lock = Locker(name="nodes_known_lock", async_lock=True)
self.nodes_distances: dict[str, tuple[float, tuple[float, float]]] = None
self.nodes_distances_lock = Locker("nodes_distances_lock", async_lock=True)
self._verbose = False
async def set_config(self, config):
"""
Args:
config[0] -> list of self neighbors
config[1] -> list of nodes known on federation
config[2] -> self addr
config[3] -> stricted_topology
"""
logging.info("Initializing Distance Topology Neighbor Policy")
async with self.neighbors_lock:
self.neighbors = config[0]
for addr in config[1]:
self.nodes_known.add(addr)
self.addr
await EventManager.get_instance().subscribe_addonevent(GPSEvent, self._udpate_distances)
async def _udpate_distances(self, gpsevent: GPSEvent):
async with self.nodes_distances_lock:
distances = await gpsevent.get_event_data()
self.nodes_distances = distances
async def need_more_neighbors(self):
async with self.neighbors_lock:
async with self.nodes_distances_lock:
if not self.nodes_distances:
return False
closest_nodes: set[str] = {
nodo_id
for nodo_id, (distancia, _) in self.nodes_distances.items()
if distancia < self.MAX_DISTANCE_THRESHOLD
}
available_nodes = closest_nodes.difference(self.neighbors)
if self._verbose:
logging.info(f"Available neighbors based on distance: {available_nodes}")
return len(available_nodes) > 0
async def accept_connection(self, source, joining=False):
"""
return true if connection is accepted
"""
async with self.neighbors_lock:
ac = source not in self.neighbors
return ac
async def meet_node(self, node):
"""
Update the list of nodes known on federation
"""
async with self.nodes_known_lock:
if node != self.addr:
if node not in self.nodes_known:
logging.info(f"Update nodes known | addr: {node}")
self.nodes_known.add(node)
async def get_nodes_known(self, neighbors_too=False, neighbors_only=False):
if neighbors_only:
async with self.neighbors_lock:
no = self.neighbors.copy()
return no
async with self.nodes_known_lock:
nk = self.nodes_known.copy()
if not neighbors_too:
async with self.neighbors_lock:
nk = self.nodes_known - self.neighbors
return nk
async def forget_nodes(self, nodes, forget_all=False):
async with self.nodes_known_lock:
if forget_all:
self.nodes_known.clear()
else:
for node in nodes:
self.nodes_known.discard(node)
async def get_actions(self):
"""
return list of actions to do in response to connection
- First list represents addrs argument to LinkMessage to connect to
- Second one represents the same but for disconnect from LinkMessage
"""
return [await self._connect_to(), await self._disconnect_from()]
async def _disconnect_from(self):
return ""
async def _connect_to(self):
ct = ""
async with self.neighbors_lock:
ct = " ".join(self.neighbors)
return ct
async def update_neighbors(self, node, remove=False):
if node == self.addr:
return
async with self.neighbors_lock:
if remove:
try:
self.neighbors.remove(node)
if self._verbose:
logging.info(f"Remove neighbor | addr: {node}")
except KeyError:
pass
else:
self.neighbors.add(node)
if self._verbose:
logging.info(f"Add neighbor | addr: {node}")
async def get_posible_neighbors(self):
"""Return set of posible neighbors to connect to."""
async with self.neighbors_lock:
async with self.nodes_distances_lock:
closest_nodes: set[str] = {
nodo_id
for nodo_id, (distancia, _) in self.nodes_distances.items()
if distancia < self.MAX_DISTANCE_THRESHOLD - 20
}
if self._verbose:
logging.info(f"Closest nodes: {closest_nodes}, neighbors: {self.neighbors}")
available_nodes = closest_nodes.difference(self.neighbors)
if self._verbose:
logging.info(f"Available neighbors based on distance: {available_nodes}")
return available_nodes
async def any_leftovers_neighbors(self):
distant_nodes = set()
async with self.neighbors_lock:
async with self.nodes_distances_lock:
if not self.nodes_distances:
return False
distant_nodes: set[str] = {
nodo_id
for nodo_id, (distancia, _) in self.nodes_distances.items()
if distancia > self.MAX_DISTANCE_THRESHOLD
}
distant_nodes = self.neighbors.intersection(distant_nodes)
if self._verbose:
logging.info(f"Distant neighbors based on distance: {distant_nodes}")
return len(distant_nodes) > 0
async def get_neighbors_to_remove(self):
distant_nodes = set()
async with self.neighbors_lock:
async with self.nodes_distances_lock:
distant_nodes: set[str] = {
nodo_id
for nodo_id, (distancia, _) in self.nodes_distances.items()
if distancia > self.MAX_DISTANCE_THRESHOLD
}
distant_nodes = self.neighbors.intersection(distant_nodes)
if self._verbose:
logging.info(f"Remove neighbors based on distance: {distant_nodes}")
return distant_nodes
def stricted_topology_status(stricted_topology: bool):
pass
|