Skip to content

Documentation for Ringneighborpolicy Module

RINGNeighborPolicy

Bases: NeighborPolicy

Source code in nebula/core/situationalawareness/awareness/sanetwork/neighborpolicies/ringneighborpolicy.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
class RINGNeighborPolicy(NeighborPolicy):
    RECENTLY_REMOVED_BAN_TIME = 20

    def __init__(self):
        self.max_neighbors = 2
        self.nodes_known = set()
        self.neighbors = set()
        self.neighbors_lock = Locker(name="neighbors_lock")
        self.nodes_known_lock = Locker(name="nodes_known_lock")
        self.addr = ""
        self._excess_neighbors_removed = set()
        self._excess_neighbors_removed_lock = Locker("excess_neighbors_removed_lock", async_lock=True)
        self._verbose = False

    async def need_more_neighbors(self):
        self.neighbors_lock.acquire()
        need_more = len(self.neighbors) < self.max_neighbors
        self.neighbors_lock.release()
        return need_more

    async def set_config(self, config):
        """
        Args:
            config[0] -> list of self neighbors
            config[1] -> list of nodes known on federation
            config[2] -> self.addr
            config[3] -> stricted_topology
        """
        logging.info("Initializing Ring Topology Neighbor Policy")
        self.neighbors_lock.acquire()
        if self._verbose:
            logging.info(f"neighbors: {config[0]}")
        self.neighbors = config[0]
        self.neighbors_lock.release()
        for addr in config[1]:
            self.nodes_known.add(addr)
        self.addr = config[2]

    async def accept_connection(self, source, joining=False):
        """
        return true if connection is accepted
        """
        ac = False
        if await self._is_recently_removed(source):
            return ac

        with self.neighbors_lock:
            if joining:
                ac = source not in self.neighbors
            else:
                ac = not len(self.neighbors) >= self.max_neighbors
        return ac

    async def meet_node(self, node):
        self.nodes_known_lock.acquire()
        if node != self.addr:
            if node not in self.nodes_known:
                logging.info(f"Update nodes known | addr: {node}")
            self.nodes_known.add(node)
        self.nodes_known_lock.release()

    async def forget_nodes(self, nodes, forget_all=False):
        self.nodes_known_lock.acquire()
        if forget_all:
            self.nodes_known.clear()
        else:
            for node in nodes:
                self.nodes_known.discard(node)
        self.nodes_known_lock.release()

    async def get_nodes_known(self, neighbors_too=False, neighbors_only=False):
        if neighbors_only:
            self.neighbors_lock.acquire()
            no = self.neighbors.copy()
            self.neighbors_lock.release()
            return no

        self.nodes_known_lock.acquire()
        nk = self.nodes_known.copy()
        if not neighbors_too:
            self.neighbors_lock.acquire()
            nk = self.nodes_known - self.neighbors
            self.neighbors_lock.release()
        self.nodes_known_lock.release()
        return nk

    async def get_actions(self):
        """
        return list of actions to do in response to connection
            - First list represents addrs argument to LinkMessage to connect to
            - Second one represents the same but for disconnect from LinkMessage
        """
        self.neighbors_lock.acquire()
        ct_actions = ""
        df_actions = ""
        if len(self.neighbors) == self.max_neighbors:
            list_neighbors = list(self.neighbors)
            index = random.randint(0, len(list_neighbors) - 1)
            node = list_neighbors[index]
            ct_actions = node  # connect to
            df_actions = node  # disconnect from
        self.neighbors_lock.release()
        return [ct_actions, df_actions]

    async def update_neighbors(self, node, remove=False):
        self.neighbors_lock.acquire()
        if remove:
            if node in self.neighbors:
                self.neighbors.remove(node)
        else:
            self.neighbors.add(node)
        self.neighbors_lock.release()

    async def get_posible_neighbors(self):
        """Return set of posible neighbors to connect to."""
        return await self.get_nodes_known(neighbors_too=False)

    async def any_leftovers_neighbors(self):
        self.neighbors_lock.acquire()
        aln = len(self.neighbors) > self.max_neighbors
        self.neighbors_lock.release()
        return aln

    async def get_neighbors_to_remove(self):
        neighbors = list()
        self.neighbors_lock.acquire()
        if self.neighbors:
            neighbors = set(self.neighbors)
            neighbors_to_remove = len(self.neighbors) - self.max_neighbors
            neighbors = set(random.sample(list(neighbors), neighbors_to_remove))
            self.neighbors_lock.release()
        await self._add_removed_ban(neighbors)
        return neighbors

    async def stricted_topology_status(stricted_topology: bool):
        pass

    async def _is_recently_removed(self, source):
        async with self._excess_neighbors_removed_lock:
            return source in self._excess_neighbors_removed

    async def _add_removed_ban(self, sources):
        async with self._excess_neighbors_removed_lock:
            for source in sources:
                self._excess_neighbors_removed.add(source)
                asyncio.create_task(self._clear_ban(source))

    async def _clear_ban(self, source):
        asyncio.sleep(self.RECENTLY_REMOVED_BAN_TIME)
        async with self._excess_neighbors_removed_lock:
            self._excess_neighbors_removed.discard(source)

accept_connection(source, joining=False) async

return true if connection is accepted

Source code in nebula/core/situationalawareness/awareness/sanetwork/neighborpolicies/ringneighborpolicy.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
async def accept_connection(self, source, joining=False):
    """
    return true if connection is accepted
    """
    ac = False
    if await self._is_recently_removed(source):
        return ac

    with self.neighbors_lock:
        if joining:
            ac = source not in self.neighbors
        else:
            ac = not len(self.neighbors) >= self.max_neighbors
    return ac

get_actions() async

return list of actions to do in response to connection - First list represents addrs argument to LinkMessage to connect to - Second one represents the same but for disconnect from LinkMessage

Source code in nebula/core/situationalawareness/awareness/sanetwork/neighborpolicies/ringneighborpolicy.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
async def get_actions(self):
    """
    return list of actions to do in response to connection
        - First list represents addrs argument to LinkMessage to connect to
        - Second one represents the same but for disconnect from LinkMessage
    """
    self.neighbors_lock.acquire()
    ct_actions = ""
    df_actions = ""
    if len(self.neighbors) == self.max_neighbors:
        list_neighbors = list(self.neighbors)
        index = random.randint(0, len(list_neighbors) - 1)
        node = list_neighbors[index]
        ct_actions = node  # connect to
        df_actions = node  # disconnect from
    self.neighbors_lock.release()
    return [ct_actions, df_actions]

get_posible_neighbors() async

Return set of posible neighbors to connect to.

Source code in nebula/core/situationalawareness/awareness/sanetwork/neighborpolicies/ringneighborpolicy.py
122
123
124
async def get_posible_neighbors(self):
    """Return set of posible neighbors to connect to."""
    return await self.get_nodes_known(neighbors_too=False)

set_config(config) async

Source code in nebula/core/situationalawareness/awareness/sanetwork/neighborpolicies/ringneighborpolicy.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
async def set_config(self, config):
    """
    Args:
        config[0] -> list of self neighbors
        config[1] -> list of nodes known on federation
        config[2] -> self.addr
        config[3] -> stricted_topology
    """
    logging.info("Initializing Ring Topology Neighbor Policy")
    self.neighbors_lock.acquire()
    if self._verbose:
        logging.info(f"neighbors: {config[0]}")
    self.neighbors = config[0]
    self.neighbors_lock.release()
    for addr in config[1]:
        self.nodes_known.add(addr)
    self.addr = config[2]