Skip to content

Documentation for Messages Module

MessagesManager

Manages creation, processing, and whenever is neccesary to do forwarding of Nebula protobuf messages. Handles different message types defined in the protocol and coordinates with the CommunicationsManager.

Source code in nebula/core/network/messages.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
class MessagesManager:
    """
    Manages creation, processing, and whenever is neccesary to do forwarding of Nebula protobuf messages.
    Handles different message types defined in the protocol and coordinates with the CommunicationsManager.
    """

    def __init__(self, addr, config):
        """
        Initialize MessagesManager with the node address and configuration.

        Args:
            addr (str): The network address of the current node.
            config (dict): Configuration dictionary for the node.
        """
        self.addr = addr
        self.config = config
        self._cm = None
        self._message_templates = {}
        self._define_message_templates()

    @property
    def cm(self):
        """
        Lazy-load and return the singleton instance of CommunicationsManager.

        Returns:
            CommunicationsManager: The communications manager instance.
        """
        if not self._cm:
            from nebula.core.network.communications import CommunicationsManager

            self._cm = CommunicationsManager.get_instance()
            return self._cm
        else:
            return self._cm

    def _define_message_templates(self):
        """
        Define the message templates mapping message types to their parameters and default values.
        This is used to dynamically create messages of different types.
        """
        # Dictionary that maps message types to their required parameters and default values
        self._message_templates = {
            "offer": {
                "parameters": ["action", "n_neighbors", "loss", "parameters", "rounds", "round", "epochs"],
                "defaults": {
                    "parameters": None,
                    "rounds": 1,
                    "round": -1,
                    "epochs": 1,
                },
            },
            "connection": {"parameters": ["action"], "defaults": {}},
            "discovery": {
                "parameters": ["action", "latitude", "longitude"],
                "defaults": {
                    "latitude": 0.0,
                    "longitude": 0.0,
                },
            },
            "control": {
                "parameters": ["action", "log"],
                "defaults": {
                    "log": "Control message",
                },
            },
            "federation": {
                "parameters": ["action", "arguments", "round"],
                "defaults": {
                    "arguments": [],
                    "round": None,
                },
            },
            "model": {
                "parameters": ["round", "parameters", "weight"],
                "defaults": {
                    "weight": 1,
                },
            },
            "reputation": {
                "parameters": ["node_id", "score", "round", "action"],
                "defaults": {
                    "round": None,
                },
            },
            "discover": {"parameters": ["action"], "defaults": {}},
            "link": {"parameters": ["action", "addrs"], "defaults": {}},
            # Add additional message types here
        }

    def get_messages_events(self) -> dict:
        """
        Retrieve the available message event names and their corresponding actions.

        Returns:
            dict: Mapping of message names (excluding 'model') to their available action names.
        """
        message_events = {}
        for message_name in self._message_templates:
            if message_name != "model":
                message_events[message_name] = get_actions_names(message_name)
        return message_events

    async def process_message(self, data, addr_from):
        """
        Asynchronously process an incoming serialized protobuf message.

        Parses the message, verifies source, forwards or handles the message depending on its type,
        and prevents duplicate processing using message hashes.

        Args:
            data (bytes): Serialized protobuf message bytes.
            addr_from (str): Address from which the message was received.
        """
        not_processing_messages = {"control_message", "connection_message"}
        special_processing_messages = {"discovery_message", "federation_message", "model_message"}

        try:
            message_wrapper = nebula_pb2.Wrapper()
            message_wrapper.ParseFromString(data)
            source = message_wrapper.source
            logging.debug(f"📥  handle_incoming_message | Received message from {addr_from} with source {source}")
            if source == self.addr:
                return

            # Extract the active message from the oneof field
            message_type = message_wrapper.WhichOneof("message")
            msg_name = message_type.split("_")[0]
            if not message_type:
                logging.warning("Received message with no active field in the 'oneof'")
                return

            message_data = getattr(message_wrapper, message_type)

            # Not required processing messages
            if message_type in not_processing_messages:
                # await self.cm.handle_message(source, message_type, message_data)
                me = MessageEvent(
                    (msg_name, get_action_name_from_value(msg_name, message_data.action)), source, message_data
                )
                await self.cm.handle_message(me)

            # Message-specific forwarding and processing
            elif message_type in special_processing_messages:
                if await self.cm.include_received_message_hash(hashlib.md5(data).hexdigest(), addr_from):
                    # Forward the message if required
                    if self._should_forward_message(message_type, message_wrapper):
                        await self.cm.forward_message(data, addr_from)

                    if message_type == "model_message":
                        await self.cm.handle_model_message(source, message_data)
                    else:
                        me = MessageEvent(
                            (msg_name, get_action_name_from_value(msg_name, message_data.action)), source, message_data
                        )
                        await self.cm.handle_message(me)
            # Rest of messages
            else:
                # if await self.cm.include_received_message_hash(hashlib.md5(data).hexdigest()):
                me = MessageEvent(
                    (msg_name, get_action_name_from_value(msg_name, message_data.action)), source, message_data
                )
                await self.cm.handle_message(me)
        except Exception as e:
            logging.exception(f"📥  handle_incoming_message | Error while processing: {e}")
            logging.exception(traceback.format_exc())

    def _should_forward_message(self, message_type, message_wrapper):
        """
        Determine if a received message should be forwarded to other nodes.

        Forwarding is enabled for proxy devices or for specific message types
        like initialization model messages or federation start actions.

        Args:
            message_type (str): Type of the message, e.g. 'model_message'.
            message_wrapper (nebula_pb2.Wrapper): Parsed protobuf wrapper message.

        Returns:
            bool: True if the message should be forwarded, False otherwise.
        """
        if self.cm.config.participant["device_args"]["proxy"]:
            return True
        # TODO: Improve the technique. Now only forward model messages if the node is a proxy
        # Need to update the expected model messages receiving during the round
        # Round -1 is the initialization round --> all nodes should receive the model
        if message_type == "model_message" and message_wrapper.model_message.round == -1:
            return True
        if (
            message_type == "federation_message"
            and message_wrapper.federation_message.action
            == nebula_pb2.FederationMessage.Action.Value("FEDERATION_START")
        ):
            return True

    def create_message(self, message_type: str, action: str = "", *args, **kwargs):
        """
        Create and serialize a protobuf message of the given type and action.

        Dynamically maps provided arguments to the protobuf message fields using predefined templates.
        Wraps the message in a Nebula 'Wrapper' message with the node's address as source.

        Args:
            message_type (str): The type of message to create (e.g. 'offer', 'model', etc.).
            action (str, optional): Action name for the message, converted to protobuf enum. Defaults to "".
            *args: Positional arguments for message fields according to the template.
            **kwargs: Keyword arguments for message fields.

        Raises:
            ValueError: If the message_type is invalid.
            AttributeError: If the protobuf message class does not exist.

        Returns:
            bytes: Serialized protobuf 'Wrapper' message bytes ready for transmission.
        """
        # logging.info(f"Creating message | type: {message_type}, action: {action}, positionals: {args}, explicits: {kwargs.keys()}")
        # If an action is provided, convert it to its corresponding enum value using the factory
        message_action = None
        if action:
            message_action = factory_message_action(message_type, action)

        # Retrieve the template for the provided message type
        message_template = self._message_templates.get(message_type)
        if not message_template:
            raise ValueError(f"Invalid message type '{message_type}'")

        # Extract parameters and defaults from the template
        template_params = message_template["parameters"]
        default_values: dict = message_template.get("defaults", {})

        # Dynamically retrieve the class for the protobuf message (e.g., OfferMessage)
        class_name = message_type.capitalize() + "Message"
        message_class = getattr(nebula_pb2, class_name, None)

        if message_class is None:
            raise AttributeError(f"Message type {message_type} not found on the protocol")

        # Set the 'action' parameter if required and if the message_action is available
        if "action" in template_params and message_action is not None:
            kwargs["action"] = message_action

        # Map positional arguments to template parameters
        remaining_params = [param_name for param_name in template_params if param_name not in kwargs]
        if args:
            for param_name, arg_value in zip(remaining_params, args, strict=False):
                if param_name in kwargs:
                    continue
                kwargs[param_name] = arg_value

        # Fill in missing parameters with their default values
        # logging.info(f"kwargs parameters: {kwargs.keys()}")
        for param_name in template_params:
            if param_name not in kwargs:
                # logging.info(f"Filling parameter '{param_name}' with default value: {default_values.get(param_name)}")
                kwargs[param_name] = default_values.get(param_name)

        # Create an instance of the protobuf message class using the constructed kwargs
        message = message_class(**kwargs)

        message_wrapper = nebula_pb2.Wrapper()
        message_wrapper.source = self.addr
        field_name = f"{message_type}_message"
        getattr(message_wrapper, field_name).CopyFrom(message)
        data = message_wrapper.SerializeToString()
        return data

cm property

Lazy-load and return the singleton instance of CommunicationsManager.

Returns:

Name Type Description
CommunicationsManager

The communications manager instance.

__init__(addr, config)

Initialize MessagesManager with the node address and configuration.

Parameters:

Name Type Description Default
addr str

The network address of the current node.

required
config dict

Configuration dictionary for the node.

required
Source code in nebula/core/network/messages.py
16
17
18
19
20
21
22
23
24
25
26
27
28
def __init__(self, addr, config):
    """
    Initialize MessagesManager with the node address and configuration.

    Args:
        addr (str): The network address of the current node.
        config (dict): Configuration dictionary for the node.
    """
    self.addr = addr
    self.config = config
    self._cm = None
    self._message_templates = {}
    self._define_message_templates()

create_message(message_type, action='', *args, **kwargs)

Create and serialize a protobuf message of the given type and action.

Dynamically maps provided arguments to the protobuf message fields using predefined templates. Wraps the message in a Nebula 'Wrapper' message with the node's address as source.

Parameters:

Name Type Description Default
message_type str

The type of message to create (e.g. 'offer', 'model', etc.).

required
action str

Action name for the message, converted to protobuf enum. Defaults to "".

''
*args

Positional arguments for message fields according to the template.

()
**kwargs

Keyword arguments for message fields.

{}

Raises:

Type Description
ValueError

If the message_type is invalid.

AttributeError

If the protobuf message class does not exist.

Returns:

Name Type Description
bytes

Serialized protobuf 'Wrapper' message bytes ready for transmission.

Source code in nebula/core/network/messages.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def create_message(self, message_type: str, action: str = "", *args, **kwargs):
    """
    Create and serialize a protobuf message of the given type and action.

    Dynamically maps provided arguments to the protobuf message fields using predefined templates.
    Wraps the message in a Nebula 'Wrapper' message with the node's address as source.

    Args:
        message_type (str): The type of message to create (e.g. 'offer', 'model', etc.).
        action (str, optional): Action name for the message, converted to protobuf enum. Defaults to "".
        *args: Positional arguments for message fields according to the template.
        **kwargs: Keyword arguments for message fields.

    Raises:
        ValueError: If the message_type is invalid.
        AttributeError: If the protobuf message class does not exist.

    Returns:
        bytes: Serialized protobuf 'Wrapper' message bytes ready for transmission.
    """
    # logging.info(f"Creating message | type: {message_type}, action: {action}, positionals: {args}, explicits: {kwargs.keys()}")
    # If an action is provided, convert it to its corresponding enum value using the factory
    message_action = None
    if action:
        message_action = factory_message_action(message_type, action)

    # Retrieve the template for the provided message type
    message_template = self._message_templates.get(message_type)
    if not message_template:
        raise ValueError(f"Invalid message type '{message_type}'")

    # Extract parameters and defaults from the template
    template_params = message_template["parameters"]
    default_values: dict = message_template.get("defaults", {})

    # Dynamically retrieve the class for the protobuf message (e.g., OfferMessage)
    class_name = message_type.capitalize() + "Message"
    message_class = getattr(nebula_pb2, class_name, None)

    if message_class is None:
        raise AttributeError(f"Message type {message_type} not found on the protocol")

    # Set the 'action' parameter if required and if the message_action is available
    if "action" in template_params and message_action is not None:
        kwargs["action"] = message_action

    # Map positional arguments to template parameters
    remaining_params = [param_name for param_name in template_params if param_name not in kwargs]
    if args:
        for param_name, arg_value in zip(remaining_params, args, strict=False):
            if param_name in kwargs:
                continue
            kwargs[param_name] = arg_value

    # Fill in missing parameters with their default values
    # logging.info(f"kwargs parameters: {kwargs.keys()}")
    for param_name in template_params:
        if param_name not in kwargs:
            # logging.info(f"Filling parameter '{param_name}' with default value: {default_values.get(param_name)}")
            kwargs[param_name] = default_values.get(param_name)

    # Create an instance of the protobuf message class using the constructed kwargs
    message = message_class(**kwargs)

    message_wrapper = nebula_pb2.Wrapper()
    message_wrapper.source = self.addr
    field_name = f"{message_type}_message"
    getattr(message_wrapper, field_name).CopyFrom(message)
    data = message_wrapper.SerializeToString()
    return data

get_messages_events()

Retrieve the available message event names and their corresponding actions.

Returns:

Name Type Description
dict dict

Mapping of message names (excluding 'model') to their available action names.

Source code in nebula/core/network/messages.py
100
101
102
103
104
105
106
107
108
109
110
111
def get_messages_events(self) -> dict:
    """
    Retrieve the available message event names and their corresponding actions.

    Returns:
        dict: Mapping of message names (excluding 'model') to their available action names.
    """
    message_events = {}
    for message_name in self._message_templates:
        if message_name != "model":
            message_events[message_name] = get_actions_names(message_name)
    return message_events

process_message(data, addr_from) async

Asynchronously process an incoming serialized protobuf message.

Parses the message, verifies source, forwards or handles the message depending on its type, and prevents duplicate processing using message hashes.

Parameters:

Name Type Description Default
data bytes

Serialized protobuf message bytes.

required
addr_from str

Address from which the message was received.

required
Source code in nebula/core/network/messages.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
async def process_message(self, data, addr_from):
    """
    Asynchronously process an incoming serialized protobuf message.

    Parses the message, verifies source, forwards or handles the message depending on its type,
    and prevents duplicate processing using message hashes.

    Args:
        data (bytes): Serialized protobuf message bytes.
        addr_from (str): Address from which the message was received.
    """
    not_processing_messages = {"control_message", "connection_message"}
    special_processing_messages = {"discovery_message", "federation_message", "model_message"}

    try:
        message_wrapper = nebula_pb2.Wrapper()
        message_wrapper.ParseFromString(data)
        source = message_wrapper.source
        logging.debug(f"📥  handle_incoming_message | Received message from {addr_from} with source {source}")
        if source == self.addr:
            return

        # Extract the active message from the oneof field
        message_type = message_wrapper.WhichOneof("message")
        msg_name = message_type.split("_")[0]
        if not message_type:
            logging.warning("Received message with no active field in the 'oneof'")
            return

        message_data = getattr(message_wrapper, message_type)

        # Not required processing messages
        if message_type in not_processing_messages:
            # await self.cm.handle_message(source, message_type, message_data)
            me = MessageEvent(
                (msg_name, get_action_name_from_value(msg_name, message_data.action)), source, message_data
            )
            await self.cm.handle_message(me)

        # Message-specific forwarding and processing
        elif message_type in special_processing_messages:
            if await self.cm.include_received_message_hash(hashlib.md5(data).hexdigest(), addr_from):
                # Forward the message if required
                if self._should_forward_message(message_type, message_wrapper):
                    await self.cm.forward_message(data, addr_from)

                if message_type == "model_message":
                    await self.cm.handle_model_message(source, message_data)
                else:
                    me = MessageEvent(
                        (msg_name, get_action_name_from_value(msg_name, message_data.action)), source, message_data
                    )
                    await self.cm.handle_message(me)
        # Rest of messages
        else:
            # if await self.cm.include_received_message_hash(hashlib.md5(data).hexdigest()):
            me = MessageEvent(
                (msg_name, get_action_name_from_value(msg_name, message_data.action)), source, message_data
            )
            await self.cm.handle_message(me)
    except Exception as e:
        logging.exception(f"📥  handle_incoming_message | Error while processing: {e}")
        logging.exception(traceback.format_exc())