13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161 | class NebulaGPS(GPSModule):
BROADCAST_IP = "255.255.255.255" # Broadcast IP
BROADCAST_PORT = 50001 # Port used for GPS
INTERFACE = "eth2" # Interface to avoid network conditions
def __init__(self, config, addr, update_interval: float = 5.0, verbose=False):
self._config = config
self._addr = addr
self.update_interval = update_interval # Frequency
self._node_locations = {} # Dictionary for storing node locations
self._broadcast_socket = None
self._nodes_location_lock = Locker("nodes_location_lock", async_lock=True)
self._verbose = verbose
self._running = asyncio.Event()
self._background_tasks = [] # Track background tasks
async def start(self):
"""Starts the GPS service, sending and receiving locations."""
logging.info("Starting NebulaGPS service...")
self._running.set()
# Create broadcast socket
self._broadcast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# Bind socket on eth2 to also receive data
self._broadcast_socket.bind(("", self.BROADCAST_PORT))
# Start sending and receiving tasks
self._background_tasks = [
asyncio.create_task(self._send_location_loop(), name="NebulaGPS_send_location"),
asyncio.create_task(self._receive_location_loop(), name="NebulaGPS_receive_location"),
asyncio.create_task(self._notify_geolocs(), name="NebulaGPS_notify_geolocs"),
]
async def stop(self):
"""Stops the GPS service."""
logging.info("🛑 Stopping NebulaGPS service...")
self._running.clear()
logging.info("🛑 NebulaGPS _running event cleared")
# Cancel all background tasks
if self._background_tasks:
logging.info(f"🛑 Cancelling {len(self._background_tasks)} background tasks...")
for task in self._background_tasks:
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
self._background_tasks.clear()
logging.info("🛑 All background tasks cancelled")
if self._broadcast_socket:
self._broadcast_socket.close()
self._broadcast_socket = None
logging.info("🛑 NebulaGPS broadcast socket closed")
logging.info("✅ NebulaGPS service stopped successfully")
async def is_running(self):
return self._running.is_set()
async def get_geoloc(self):
latitude = self._config.participant["mobility_args"]["latitude"]
longitude = self._config.participant["mobility_args"]["longitude"]
return (latitude, longitude)
async def calculate_distance(self, self_lat, self_long, other_lat, other_long):
distance_m = distance.distance((self_lat, self_long), (other_lat, other_long)).m
return distance_m
async def _send_location_loop(self):
"""Send the geolocation periodically by broadcast."""
while await self.is_running():
# Check if learning cycle has finished
try:
from nebula.core.network.communications import CommunicationsManager
cm = CommunicationsManager.get_instance()
if await cm.learning_finished():
logging.info("GPS: Learning cycle finished, stopping location broadcast")
break
except Exception:
pass # If we can't get the communications manager, continue
latitude, longitude = await self.get_geoloc() # Obtener ubicación actual
message = f"GPS-UPDATE {self._addr} {latitude} {longitude}"
self._broadcast_socket.sendto(message.encode(), (self.BROADCAST_IP, self.BROADCAST_PORT))
if self._verbose:
logging.info(f"Sent GPS location: ({latitude}, {longitude})")
await asyncio.sleep(self.update_interval)
async def _receive_location_loop(self):
"""Listens to and stores geolocations from other nodes."""
while await self.is_running():
# Check if learning cycle has finished
try:
from nebula.core.network.communications import CommunicationsManager
cm = CommunicationsManager.get_instance()
if await cm.learning_finished():
logging.info("GPS: Learning cycle finished, stopping location reception")
break
except Exception:
pass # If we can't get the communications manager, continue
try:
data, addr = await asyncio.get_running_loop().run_in_executor(
None, self._broadcast_socket.recvfrom, 1024
)
message = data.decode().strip()
if message.startswith("GPS-UPDATE"):
_, sender_addr, lat, lon = message.split()
if sender_addr != self._addr:
async with self._nodes_location_lock:
self._node_locations[sender_addr] = (float(lat), float(lon))
if self._verbose:
logging.info(f"Received GPS from {addr[0]}: {lat}, {lon}")
except Exception as e:
logging.exception(f"Error receiving GPS update: {e}")
async def _notify_geolocs(self):
while await self.is_running():
# Check if learning cycle has finished
try:
from nebula.core.network.communications import CommunicationsManager
cm = CommunicationsManager.get_instance()
if await cm.learning_finished():
logging.info("GPS: Learning cycle finished, stopping geolocation notifications")
break
except Exception:
pass # If we can't get the communications manager, continue
await asyncio.sleep(self.update_interval)
await self._nodes_location_lock.acquire_async()
geolocs: dict = self._node_locations.copy()
await self._nodes_location_lock.release_async()
if geolocs:
distances = {}
self_lat, self_long = await self.get_geoloc()
for addr, (lat, long) in geolocs.items():
dist = await self.calculate_distance(self_lat, self_long, lat, long)
distances[addr] = (dist, (lat, long))
self._config.update_nodes_distance(distances)
gpsevent = GPSEvent(distances)
asyncio.create_task(EventManager.get_instance().publish_addonevent(gpsevent))
|