Skip to content

Documentation for Ringneighborpolicy Module

RINGNeighborPolicy

Bases: NeighborPolicy

Neighbor policy for ring topologies.

This policy maintains a strict limit on the number of neighbors per node, enforcing a ring-like structure. Each node connects to a fixed number of neighbors (by default 2), and excess connections are detected and marked for removal.

The policy ensures
  • No node connects to more than max_neighbors.
  • New connections are accepted only if the node has not reached its limit or the incomming connection is made by a joinning node.
  • When a node joins, it's accepted only if not already connected.
  • Excess neighbors (due to dynamic changes) can be identified and pruned, ensuring that incomers dont get pruned way to fast.

Attributes:

Name Type Description
max_neighbors int

Maximum number of neighbors allowed (default is 2).

nodes_known set[str]

Set of node IDs discovered in the network.

neighbors set[str]

Set of current neighbor node IDs.

neighbors_lock Locker

Lock for thread-safe access to the neighbors set.

nodes_known_lock Locker

Lock for managing access to the known nodes set.

addr str

This node's own address.

_excess_neighbors_removed set[str]

Recently removed nodes due to excess connections.

_excess_neighbors_removed_lock Locker

Lock for accessing the removal tracking set.

_verbose bool

Enables verbose logging.

Source code in nebula/core/situationalawareness/awareness/sanetwork/neighborpolicies/ringneighborpolicy.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
class RINGNeighborPolicy(NeighborPolicy):
    """
    Neighbor policy for ring topologies.

    This policy maintains a strict limit on the number of neighbors per node, 
    enforcing a ring-like structure. Each node connects to a fixed number of 
    neighbors (by default 2), and excess connections are detected and marked 
    for removal.

    The policy ensures:
      - No node connects to more than `max_neighbors`.
      - New connections are accepted only if the node has not reached its limit
        or the incomming connection is made by a joinning node.
      - When a node joins, it's accepted only if not already connected.
      - Excess neighbors (due to dynamic changes) can be identified and pruned,
        ensuring that incomers dont get pruned way to fast.

    Attributes:
        max_neighbors (int): Maximum number of neighbors allowed (default is 2).
        nodes_known (set[str]): Set of node IDs discovered in the network.
        neighbors (set[str]): Set of current neighbor node IDs.
        neighbors_lock (Locker): Lock for thread-safe access to the neighbors set.
        nodes_known_lock (Locker): Lock for managing access to the known nodes set.
        addr (str): This node's own address.
        _excess_neighbors_removed (set[str]): Recently removed nodes due to excess connections.
        _excess_neighbors_removed_lock (Locker): Lock for accessing the removal tracking set.
        _verbose (bool): Enables verbose logging.
    """

    RECENTLY_REMOVED_BAN_TIME = 20

    def __init__(self):
        self.max_neighbors = 2
        self.nodes_known = set()
        self.neighbors = set()
        self.neighbors_lock = Locker(name="neighbors_lock")
        self.nodes_known_lock = Locker(name="nodes_known_lock")
        self.addr = ""
        self._excess_neighbors_removed = set()
        self._excess_neighbors_removed_lock = Locker("excess_neighbors_removed_lock", async_lock=True)
        self._verbose = False

    async def need_more_neighbors(self):
        self.neighbors_lock.acquire()
        need_more = len(self.neighbors) < self.max_neighbors
        self.neighbors_lock.release()
        return need_more

    async def set_config(self, config):
        """
        Args:
            config[0] -> list of self neighbors
            config[1] -> list of nodes known on federation
            config[2] -> self.addr
            config[3] -> stricted_topology
        """
        logging.info("Initializing Ring Topology Neighbor Policy")
        self.neighbors_lock.acquire()
        if self._verbose:
            logging.info(f"neighbors: {config[0]}")
        self.neighbors = config[0]
        self.neighbors_lock.release()
        for addr in config[1]:
            self.nodes_known.add(addr)
        self.addr = config[2]

    async def accept_connection(self, source, joining=False):
        """
        return true if connection is accepted
        """
        ac = False
        if await self._is_recently_removed(source):
            return ac

        with self.neighbors_lock:
            if joining:
                ac = source not in self.neighbors
            else:
                ac = not len(self.neighbors) >= self.max_neighbors
        return ac

    async def meet_node(self, node):
        self.nodes_known_lock.acquire()
        if node != self.addr:
            if node not in self.nodes_known:
                logging.info(f"Update nodes known | addr: {node}")
            self.nodes_known.add(node)
        self.nodes_known_lock.release()

    async def forget_nodes(self, nodes, forget_all=False):
        self.nodes_known_lock.acquire()
        if forget_all:
            self.nodes_known.clear()
        else:
            for node in nodes:
                self.nodes_known.discard(node)
        self.nodes_known_lock.release()

    async def get_nodes_known(self, neighbors_too=False, neighbors_only=False):
        if neighbors_only:
            self.neighbors_lock.acquire()
            no = self.neighbors.copy()
            self.neighbors_lock.release()
            return no

        self.nodes_known_lock.acquire()
        nk = self.nodes_known.copy()
        if not neighbors_too:
            self.neighbors_lock.acquire()
            nk = self.nodes_known - self.neighbors
            self.neighbors_lock.release()
        self.nodes_known_lock.release()
        return nk

    async def get_actions(self):
        """
        return list of actions to do in response to connection
            - First list represents addrs argument to LinkMessage to connect to
            - Second one represents the same but for disconnect from LinkMessage
        """
        self.neighbors_lock.acquire()
        ct_actions = ""
        df_actions = ""
        if len(self.neighbors) == self.max_neighbors:
            list_neighbors = list(self.neighbors)
            index = random.randint(0, len(list_neighbors) - 1)
            node = list_neighbors[index]
            ct_actions = node  # connect to
            df_actions = node  # disconnect from
        self.neighbors_lock.release()
        return [ct_actions, df_actions]

    async def update_neighbors(self, node, remove=False):
        self.neighbors_lock.acquire()
        if remove:
            if node in self.neighbors:
                self.neighbors.remove(node)
        else:
            self.neighbors.add(node)
        self.neighbors_lock.release()

    async def get_posible_neighbors(self):
        """Return set of posible neighbors to connect to."""
        return await self.get_nodes_known(neighbors_too=False)

    async def any_leftovers_neighbors(self):
        self.neighbors_lock.acquire()
        aln = len(self.neighbors) > self.max_neighbors
        self.neighbors_lock.release()
        return aln

    async def get_neighbors_to_remove(self):
        neighbors = list()
        self.neighbors_lock.acquire()
        if self.neighbors:
            neighbors = set(self.neighbors)
            neighbors_to_remove = len(self.neighbors) - self.max_neighbors
            neighbors = set(random.sample(list(neighbors), neighbors_to_remove))
            self.neighbors_lock.release()
        await self._add_removed_ban(neighbors)
        return neighbors

    async def stricted_topology_status(stricted_topology: bool):
        pass

    async def _is_recently_removed(self, source):
        async with self._excess_neighbors_removed_lock:
            return source in self._excess_neighbors_removed

    async def _add_removed_ban(self, sources):
        async with self._excess_neighbors_removed_lock:
            for source in sources:
                self._excess_neighbors_removed.add(source)
                asyncio.create_task(self._clear_ban(source))

    async def _clear_ban(self, source):
        asyncio.sleep(self.RECENTLY_REMOVED_BAN_TIME)
        async with self._excess_neighbors_removed_lock:
            self._excess_neighbors_removed.discard(source)

accept_connection(source, joining=False) async

return true if connection is accepted

Source code in nebula/core/situationalawareness/awareness/sanetwork/neighborpolicies/ringneighborpolicy.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
async def accept_connection(self, source, joining=False):
    """
    return true if connection is accepted
    """
    ac = False
    if await self._is_recently_removed(source):
        return ac

    with self.neighbors_lock:
        if joining:
            ac = source not in self.neighbors
        else:
            ac = not len(self.neighbors) >= self.max_neighbors
    return ac

get_actions() async

return list of actions to do in response to connection - First list represents addrs argument to LinkMessage to connect to - Second one represents the same but for disconnect from LinkMessage

Source code in nebula/core/situationalawareness/awareness/sanetwork/neighborpolicies/ringneighborpolicy.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
async def get_actions(self):
    """
    return list of actions to do in response to connection
        - First list represents addrs argument to LinkMessage to connect to
        - Second one represents the same but for disconnect from LinkMessage
    """
    self.neighbors_lock.acquire()
    ct_actions = ""
    df_actions = ""
    if len(self.neighbors) == self.max_neighbors:
        list_neighbors = list(self.neighbors)
        index = random.randint(0, len(list_neighbors) - 1)
        node = list_neighbors[index]
        ct_actions = node  # connect to
        df_actions = node  # disconnect from
    self.neighbors_lock.release()
    return [ct_actions, df_actions]

get_posible_neighbors() async

Return set of posible neighbors to connect to.

Source code in nebula/core/situationalawareness/awareness/sanetwork/neighborpolicies/ringneighborpolicy.py
150
151
152
async def get_posible_neighbors(self):
    """Return set of posible neighbors to connect to."""
    return await self.get_nodes_known(neighbors_too=False)

set_config(config) async

Source code in nebula/core/situationalawareness/awareness/sanetwork/neighborpolicies/ringneighborpolicy.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
async def set_config(self, config):
    """
    Args:
        config[0] -> list of self neighbors
        config[1] -> list of nodes known on federation
        config[2] -> self.addr
        config[3] -> stricted_topology
    """
    logging.info("Initializing Ring Topology Neighbor Policy")
    self.neighbors_lock.acquire()
    if self._verbose:
        logging.info(f"neighbors: {config[0]}")
    self.neighbors = config[0]
    self.neighbors_lock.release()
    for addr in config[1]:
        self.nodes_known.add(addr)
    self.addr = config[2]