Skip to content

Documentation for Nebuladiscoveryservice Module

NebulaClientProtocol

Bases: DatagramProtocol

Source code in nebula/core/network/externalconnection/nebuladiscoveryservice.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class NebulaClientProtocol(asyncio.DatagramProtocol):
    BCAST_IP = "239.255.255.250"
    BCAST_PORT = 1900
    SEARCH_TRIES = 3
    SEARCH_INTERVAL = 3

    def __init__(self, nebula_service):
        self.nebula_service: NebulaConnectionService = nebula_service
        self.transport = None
        self.search_done = asyncio.Event()

    def connection_made(self, transport):
        self.transport = transport
        sock = self.transport.get_extra_info("socket")
        if sock is not None:
            sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
        asyncio.create_task(self.keep_search())

    async def stop(self):
        """
        Stop the client protocol by setting the search_done event to release any waiting tasks.
        """
        self.search_done.set()

    async def keep_search(self):
        """
        Periodically broadcast search requests to discover other nodes in the federation.

        This loop runs a fixed number of times, each time sending a multicast
        discovery request and waiting for a predefined interval before repeating.

        When the loop completes, a synchronization event (`search_done`) is set
        to indicate that the search phase is finished.
        """
        logging.info("Federation searching loop started")
        for _ in range(self.SEARCH_TRIES):
            await self.search()
            await asyncio.sleep(self.SEARCH_INTERVAL)
        self.search_done.set()

    async def wait_for_search(self):
        """
        Wait for the search phase to complete.

        This coroutine blocks until the `search_done` event is set,
        signaling that the search loop has finished.
        """
        await self.search_done.wait()

    async def search(self):
        """
        Send a multicast discovery message to locate other Nebula nodes.

        Constructs and sends an SSDP-like M-SEARCH request targeted to
        all devices on the local multicast group. This message indicates
        interest in finding other participants in the Nebula DFL federation.

        If an error occurs during sending, it is logged as an exception.
        """
        logging.info("Searching for nodes...")
        try:
            search_request = (
                "M-SEARCH * HTTP/1.1\r\n"
                "HOST: 239.255.255.250:1900\r\n"
                'MAN: "ssdp:discover"\r\n'
                "MX: 1\r\n"
                "ST: urn:nebula-service\r\n"
                "TYPE: discover\r\n"
                "\r\n"
            )
            self.transport.sendto(search_request.encode("ASCII"), (self.BCAST_IP, self.BCAST_PORT))
        except Exception as e:
            logging.exception(f"Error sending search request: {e}")

    def datagram_received(self, data, addr):
        try:
            if "ST: urn:nebula-service" in data.decode("utf-8"):
                # logging.info("Received response from Node server-service")
                self.nebula_service.response_received(data, addr)
        except UnicodeDecodeError:
            logging.warning(f"Received malformed message from {addr}, ignoring.")

Periodically broadcast search requests to discover other nodes in the federation.

This loop runs a fixed number of times, each time sending a multicast discovery request and waiting for a predefined interval before repeating.

When the loop completes, a synchronization event (search_done) is set to indicate that the search phase is finished.

Source code in nebula/core/network/externalconnection/nebuladiscoveryservice.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
async def keep_search(self):
    """
    Periodically broadcast search requests to discover other nodes in the federation.

    This loop runs a fixed number of times, each time sending a multicast
    discovery request and waiting for a predefined interval before repeating.

    When the loop completes, a synchronization event (`search_done`) is set
    to indicate that the search phase is finished.
    """
    logging.info("Federation searching loop started")
    for _ in range(self.SEARCH_TRIES):
        await self.search()
        await asyncio.sleep(self.SEARCH_INTERVAL)
    self.search_done.set()

search() async

Send a multicast discovery message to locate other Nebula nodes.

Constructs and sends an SSDP-like M-SEARCH request targeted to all devices on the local multicast group. This message indicates interest in finding other participants in the Nebula DFL federation.

If an error occurs during sending, it is logged as an exception.

Source code in nebula/core/network/externalconnection/nebuladiscoveryservice.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
async def search(self):
    """
    Send a multicast discovery message to locate other Nebula nodes.

    Constructs and sends an SSDP-like M-SEARCH request targeted to
    all devices on the local multicast group. This message indicates
    interest in finding other participants in the Nebula DFL federation.

    If an error occurs during sending, it is logged as an exception.
    """
    logging.info("Searching for nodes...")
    try:
        search_request = (
            "M-SEARCH * HTTP/1.1\r\n"
            "HOST: 239.255.255.250:1900\r\n"
            'MAN: "ssdp:discover"\r\n'
            "MX: 1\r\n"
            "ST: urn:nebula-service\r\n"
            "TYPE: discover\r\n"
            "\r\n"
        )
        self.transport.sendto(search_request.encode("ASCII"), (self.BCAST_IP, self.BCAST_PORT))
    except Exception as e:
        logging.exception(f"Error sending search request: {e}")

stop() async

Stop the client protocol by setting the search_done event to release any waiting tasks.

Source code in nebula/core/network/externalconnection/nebuladiscoveryservice.py
119
120
121
122
123
async def stop(self):
    """
    Stop the client protocol by setting the search_done event to release any waiting tasks.
    """
    self.search_done.set()

Wait for the search phase to complete.

This coroutine blocks until the search_done event is set, signaling that the search loop has finished.

Source code in nebula/core/network/externalconnection/nebuladiscoveryservice.py
141
142
143
144
145
146
147
148
async def wait_for_search(self):
    """
    Wait for the search phase to complete.

    This coroutine blocks until the `search_done` event is set,
    signaling that the search loop has finished.
    """
    await self.search_done.wait()

NebulaServerProtocol

Bases: DatagramProtocol

Source code in nebula/core/network/externalconnection/nebuladiscoveryservice.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class NebulaServerProtocol(asyncio.DatagramProtocol):
    BCAST_IP = "239.255.255.250"
    UPNP_PORT = 1900
    DISCOVER_MESSAGE = "TYPE: discover"
    BEACON_MESSAGE = "TYPE: beacon"

    def __init__(self, nebula_service, addr):
        self.nebula_service: NebulaConnectionService = nebula_service
        self.addr = addr
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        logging.info("Nebula UPnP server is listening...")

    def datagram_received(self, data, addr):
        msg = data.decode("utf-8")
        if self._is_nebula_message(msg):
            # logging.info("Nebula message received...")
            if self.DISCOVER_MESSAGE in msg:
                logging.info("Discovery request received, responding...")
                asyncio.create_task(self.respond(addr))
            elif self.BEACON_MESSAGE in msg:
                asyncio.create_task(self.handle_beacon_received(msg))

    async def respond(self, addr):
        """
        Send a unicast HTTP-like response message to a given address.

        This method is typically called when a discovery request is received.
        It returns metadata indicating that this node is available for
        participation in a DFL federation.

        Args:
            addr (tuple): The address (IP, port) to send the response to.
        """
        try:
            response = (
                "HTTP/1.1 200 OK\r\n"
                "CACHE-CONTROL: max-age=1800\r\n"
                "ST: urn:nebula-service\r\n"
                "TYPE: response\r\n"
                f"LOCATION: {self.addr}\r\n"
                "\r\n"
            )
            self.transport.sendto(response.encode("ASCII"), addr)
        except Exception as e:
            logging.exception(f"Error responding to client: {e}")

    async def handle_beacon_received(self, msg):
        """
        Process a received beacon message from another node.

        Extracts and parses the beacon content, validates it is not from
        this same node, and then notifies the associated Nebula service
        about the presence of a neighbor.

        Args:
            msg (str): The raw message string received via multicast.
        """
        lines = msg.split("\r\n")
        beacon_data = {}

        for line in lines:
            if ": " in line:
                key, value = line.split(": ", 1)
                beacon_data[key] = value

        # Verify that it is not the beacon itself
        beacon_addr = beacon_data.get("LOCATION")
        if beacon_addr == self.addr:
            return

        latitude = float(beacon_data.get("LATITUDE", 0.0))
        longitude = float(beacon_data.get("LONGITUDE", 0.0))
        await self.nebula_service.notify_beacon_received(beacon_addr, (latitude, longitude))

    def _is_nebula_message(self, msg):
        """
        Determine if a message corresponds to the Nebula discovery protocol.

        Args:
            msg (str): The raw message string to evaluate.

        Returns:
            bool: True if the message follows the Nebula service format, False otherwise.
        """
        return "ST: urn:nebula-service" in msg

handle_beacon_received(msg) async

Process a received beacon message from another node.

Extracts and parses the beacon content, validates it is not from this same node, and then notifies the associated Nebula service about the presence of a neighbor.

Parameters:

Name Type Description Default
msg str

The raw message string received via multicast.

required
Source code in nebula/core/network/externalconnection/nebuladiscoveryservice.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
async def handle_beacon_received(self, msg):
    """
    Process a received beacon message from another node.

    Extracts and parses the beacon content, validates it is not from
    this same node, and then notifies the associated Nebula service
    about the presence of a neighbor.

    Args:
        msg (str): The raw message string received via multicast.
    """
    lines = msg.split("\r\n")
    beacon_data = {}

    for line in lines:
        if ": " in line:
            key, value = line.split(": ", 1)
            beacon_data[key] = value

    # Verify that it is not the beacon itself
    beacon_addr = beacon_data.get("LOCATION")
    if beacon_addr == self.addr:
        return

    latitude = float(beacon_data.get("LATITUDE", 0.0))
    longitude = float(beacon_data.get("LONGITUDE", 0.0))
    await self.nebula_service.notify_beacon_received(beacon_addr, (latitude, longitude))

respond(addr) async

Send a unicast HTTP-like response message to a given address.

This method is typically called when a discovery request is received. It returns metadata indicating that this node is available for participation in a DFL federation.

Parameters:

Name Type Description Default
addr tuple

The address (IP, port) to send the response to.

required
Source code in nebula/core/network/externalconnection/nebuladiscoveryservice.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
async def respond(self, addr):
    """
    Send a unicast HTTP-like response message to a given address.

    This method is typically called when a discovery request is received.
    It returns metadata indicating that this node is available for
    participation in a DFL federation.

    Args:
        addr (tuple): The address (IP, port) to send the response to.
    """
    try:
        response = (
            "HTTP/1.1 200 OK\r\n"
            "CACHE-CONTROL: max-age=1800\r\n"
            "ST: urn:nebula-service\r\n"
            "TYPE: response\r\n"
            f"LOCATION: {self.addr}\r\n"
            "\r\n"
        )
        self.transport.sendto(response.encode("ASCII"), addr)
    except Exception as e:
        logging.exception(f"Error responding to client: {e}")