Skip to content

Documentation for Database Module

add_user(user, password, role)

Adds a new user to the users database with a hashed password.

Parameters:

Name Type Description Default
user str

The username to add (stored in uppercase).

required
password str

The plain text password to hash and store.

required
role str

The role assigned to the user.

required
Source code in nebula/controller/database.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
def add_user(user, password, role):
    """
    Adds a new user to the users database with a hashed password.

    Args:
        user (str): The username to add (stored in uppercase).
        password (str): The plain text password to hash and store.
        role (str): The role assigned to the user.
    """
    ph = PasswordHasher()
    with sqlite3.connect(user_db_file_location) as conn:
        c = conn.cursor()
        c.execute(
            "INSERT INTO users VALUES (?, ?, ?)",
            (user.upper(), ph.hash(password), role),
        )

check_scenario_federation_completed(scenario_name)

Check if all nodes in a given scenario have completed the required federation rounds.

Parameters:

Name Type Description Default
scenario_name str

The unique name identifier of the scenario to check.

required

Returns:

Name Type Description
bool

True if all nodes have completed the total rounds specified for the scenario, False otherwise or if an error occurs.

Behavior
  • Retrieves the total number of rounds defined for the scenario.
  • Fetches the current round progress of all nodes in that scenario.
  • Returns True only if every node has reached the total rounds.
  • Handles database errors and missing scenario cases gracefully.
Source code in nebula/controller/database.py
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
def check_scenario_federation_completed(scenario_name):
    """
    Check if all nodes in a given scenario have completed the required federation rounds.

    Parameters:
        scenario_name (str): The unique name identifier of the scenario to check.

    Returns:
        bool: True if all nodes have completed the total rounds specified for the scenario, False otherwise or if an error occurs.

    Behavior:
        - Retrieves the total number of rounds defined for the scenario.
        - Fetches the current round progress of all nodes in that scenario.
        - Returns True only if every node has reached the total rounds.
        - Handles database errors and missing scenario cases gracefully.
    """
    try:
        # Connect to the scenario database to get the total rounds for the scenario
        with sqlite3.connect(scenario_db_file_location) as conn:
            conn.row_factory = sqlite3.Row
            c = conn.cursor()
            c.execute("SELECT rounds FROM scenarios WHERE name = ?;", (scenario_name,))
            scenario = c.fetchone()

            if not scenario:
                raise ValueError(f"Scenario '{scenario_name}' not found.")

            total_rounds = scenario["rounds"]

        # Connect to the node database to check the rounds for each node
        with sqlite3.connect(node_db_file_location) as conn:
            conn.row_factory = sqlite3.Row
            c = conn.cursor()
            c.execute("SELECT round FROM nodes WHERE scenario = ?;", (scenario_name,))
            nodes = c.fetchall()

            if len(nodes) == 0:
                return False

            # Check if all nodes have completed the total rounds
            total_rounds_str = str(total_rounds)
            return all(str(node["round"]) == total_rounds_str for node in nodes)

    except sqlite3.Error as e:
        print(f"Database error: {e}")
        return False
    except Exception as e:
        print(f"An error occurred: {e}")
        return False

check_scenario_with_role(role, scenario_name)

Verify if a scenario exists with a specific role and name.

Parameters:

Name Type Description Default
role str

The role associated with the scenario (e.g., "admin", "user").

required
scenario_name str

The unique name identifier of the scenario.

required

Returns:

Name Type Description
bool

True if a scenario with the given role and name exists, False otherwise.

Source code in nebula/controller/database.py
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
def check_scenario_with_role(role, scenario_name):
    """
    Verify if a scenario exists with a specific role and name.

    Parameters:
        role (str): The role associated with the scenario (e.g., "admin", "user").
        scenario_name (str): The unique name identifier of the scenario.

    Returns:
        bool: True if a scenario with the given role and name exists, False otherwise.
    """
    with sqlite3.connect(scenario_db_file_location) as conn:
        conn.row_factory = sqlite3.Row
        c = conn.cursor()
        c.execute(
            "SELECT * FROM scenarios WHERE role = ? AND name = ?;",
            (
                role,
                scenario_name,
            ),
        )
        result = c.fetchone()

    return result is not None

delete_user_from_db(user)

Deletes a user record from the users database.

Parameters:

Name Type Description Default
user str

The username of the user to be deleted.

required
Source code in nebula/controller/database.py
374
375
376
377
378
379
380
381
382
383
def delete_user_from_db(user):
    """
    Deletes a user record from the users database.

    Args:
        user (str): The username of the user to be deleted.
    """
    with sqlite3.connect(user_db_file_location) as conn:
        c = conn.cursor()
        c.execute("DELETE FROM users WHERE user = ?", (user,))

ensure_columns(conn, table_name, desired_columns) async

Ensures that a table contains all the desired columns, adding any that are missing.

This function
  • Retrieves the current columns of the specified table.
  • Compares them with the desired columns.
  • Adds any missing columns to the table using ALTER TABLE statements.

Parameters:

Name Type Description Default
conn Connection

Active connection to the SQLite database.

required
table_name str

Name of the table to check and modify.

required
desired_columns dict

Dictionary mapping column names to their SQL definitions.

required
Note

This operation is committed immediately after all changes are applied.

Source code in nebula/controller/database.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
async def ensure_columns(conn, table_name, desired_columns):
    """
    Ensures that a table contains all the desired columns, adding any that are missing.

    This function:
        - Retrieves the current columns of the specified table.
        - Compares them with the desired columns.
        - Adds any missing columns to the table using ALTER TABLE statements.

    Args:
        conn (aiosqlite.Connection): Active connection to the SQLite database.
        table_name (str): Name of the table to check and modify.
        desired_columns (dict): Dictionary mapping column names to their SQL definitions.

    Note:
        This operation is committed immediately after all changes are applied.
    """
    _c = await conn.execute(f"PRAGMA table_info({table_name});")
    existing_columns = [row[1] for row in await _c.fetchall()]
    for column_name, column_definition in desired_columns.items():
        if column_name not in existing_columns:
            await conn.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_definition};")
    await conn.commit()

get_all_scenarios(username, role, sort_by='start_time')

Retrieve all scenarios from the database filtered by user role and sorted by a specified field.

Parameters:

Name Type Description Default
username str

The username of the requesting user.

required
role str

The role of the user, e.g., "admin" or regular user.

required
sort_by str

The field name to sort the results by. Defaults to "start_time".

'start_time'

Returns:

Type Description

list[sqlite3.Row]: A list of scenario records as SQLite Row objects.

Behavior
  • Admin users retrieve all scenarios.
  • Non-admin users retrieve only scenarios associated with their username.
  • Sorting by "start_time" applies custom datetime ordering.
  • Other sort fields are applied directly in the ORDER BY clause.
Source code in nebula/controller/database.py
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
def get_all_scenarios(username, role, sort_by="start_time"):
    """
    Retrieve all scenarios from the database filtered by user role and sorted by a specified field.

    Parameters:
        username (str): The username of the requesting user.
        role (str): The role of the user, e.g., "admin" or regular user.
        sort_by (str, optional): The field name to sort the results by. Defaults to "start_time".

    Returns:
        list[sqlite3.Row]: A list of scenario records as SQLite Row objects.

    Behavior:
        - Admin users retrieve all scenarios.
        - Non-admin users retrieve only scenarios associated with their username.
        - Sorting by "start_time" applies custom datetime ordering.
        - Other sort fields are applied directly in the ORDER BY clause.
    """
    with sqlite3.connect(scenario_db_file_location) as conn:
        conn.row_factory = sqlite3.Row
        c = conn.cursor()
        if role == "admin":
            if sort_by == "start_time":
                command = """
                SELECT * FROM scenarios
                ORDER BY strftime('%Y-%m-%d %H:%M:%S', substr(start_time, 7, 4) || '-' || substr(start_time, 4, 2) || '-' || substr(start_time, 1, 2) || ' ' || substr(start_time, 12, 8));
                """
                c.execute(command)
            else:
                command = "SELECT * FROM scenarios ORDER BY ?;"
                c.execute(command, (sort_by,))
        else:
            if sort_by == "start_time":
                command = """
                SELECT * FROM scenarios
                WHERE username = ?
                ORDER BY strftime('%Y-%m-%d %H:%M:%S', substr(start_time, 7, 4) || '-' || substr(start_time, 4, 2) || '-' || substr(start_time, 1, 2) || ' ' || substr(start_time, 12, 8));
                """
                c.execute(command, (username,))
            else:
                command = "SELECT * FROM scenarios WHERE username = ? ORDER BY ?;"
                c.execute(
                    command,
                    (
                        username,
                        sort_by,
                    ),
                )
        result = c.fetchall()

    return result

get_all_scenarios_and_check_completed(username, role, sort_by='start_time')

Retrieve all scenarios with detailed fields and update the status of running scenarios if their federation is completed.

Parameters:

Name Type Description Default
username str

The username of the requesting user.

required
role str

The role of the user, e.g., "admin" or regular user.

required
sort_by str

The field name to sort the results by. Defaults to "start_time".

'start_time'

Returns:

Type Description

list[sqlite3.Row]: A list of scenario records including name, username, title, start_time, model, dataset, rounds, and status.

Behavior
  • Admin users retrieve all scenarios.
  • Non-admin users retrieve only scenarios associated with their username.
  • Scenarios are sorted by start_time with special handling for null or empty values.
  • For scenarios with status "running", checks if federation is completed:
    • If completed, updates the scenario status to "completed".
    • Refreshes the returned scenario list after updates.
Source code in nebula/controller/database.py
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
def get_all_scenarios_and_check_completed(username, role, sort_by="start_time"):
    """
    Retrieve all scenarios with detailed fields and update the status of running scenarios if their federation is completed.

    Parameters:
        username (str): The username of the requesting user.
        role (str): The role of the user, e.g., "admin" or regular user.
        sort_by (str, optional): The field name to sort the results by. Defaults to "start_time".

    Returns:
        list[sqlite3.Row]: A list of scenario records including name, username, title, start_time, model, dataset, rounds, and status.

    Behavior:
        - Admin users retrieve all scenarios.
        - Non-admin users retrieve only scenarios associated with their username.
        - Scenarios are sorted by start_time with special handling for null or empty values.
        - For scenarios with status "running", checks if federation is completed:
            - If completed, updates the scenario status to "completed".
            - Refreshes the returned scenario list after updates.
    """
    with sqlite3.connect(scenario_db_file_location) as conn:
        conn.row_factory = sqlite3.Row
        c = conn.cursor()

        if role == "admin":
            if sort_by == "start_time":
                command = """
                SELECT name, username, title, start_time, model, dataset, rounds, status FROM scenarios
                ORDER BY
                    CASE
                        WHEN start_time IS NULL OR start_time = '' THEN 1
                        ELSE 0
                    END,
                    strftime(
                        '%Y-%m-%d %H:%M:%S',
                        substr(start_time, 7, 4) || '-' || substr(start_time, 4, 2) || '-' || substr(start_time, 1, 2) || ' ' || substr(start_time, 12, 8)
                    );
                """
                c.execute(command)
            else:
                command = "SELECT name, username, title, start_time, model, dataset, rounds, status FROM scenarios ORDER BY ?;"
                c.execute(command, (sort_by,))
            result = c.fetchall()
        else:
            if sort_by == "start_time":
                command = """
                SELECT name, username, title, start_time, model, dataset, rounds, status FROM scenarios
                WHERE username = ?
                ORDER BY
                    CASE
                        WHEN start_time IS NULL OR start_time = '' THEN 1
                        ELSE 0
                    END,
                    strftime(
                        '%Y-%m-%d %H:%M:%S',
                        substr(start_time, 7, 4) || '-' || substr(start_time, 4, 2) || '-' || substr(start_time, 1, 2) || ' ' || substr(start_time, 12, 8)
                    );
                """
                c.execute(command, (username,))
            else:
                command = "SELECT name, username, title, start_time, model, dataset, rounds, status FROM scenarios WHERE username = ? ORDER BY ?;"
                c.execute(
                    command,
                    (
                        username,
                        sort_by,
                    ),
                )
            result = c.fetchall()

        for scenario in result:
            if scenario["status"] == "running":
                if check_scenario_federation_completed(scenario["name"]):
                    scenario_set_status_to_completed(scenario["name"])
                    result = get_all_scenarios(username, role)

    return result

get_completed_scenario()

Retrieve a single scenario with status "completed" from the database.

Returns:

Type Description

sqlite3.Row: A scenario record with status "completed", or None if no such scenario exists.

Behavior
  • Fetches the first scenario found with status "completed".
Source code in nebula/controller/database.py
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
def get_completed_scenario():
    """
    Retrieve a single scenario with status "completed" from the database.

    Returns:
        sqlite3.Row: A scenario record with status "completed", or None if no such scenario exists.

    Behavior:
        - Fetches the first scenario found with status "completed".
    """
    with sqlite3.connect(scenario_db_file_location) as conn:
        conn.row_factory = sqlite3.Row
        c = conn.cursor()
        command = "SELECT * FROM scenarios WHERE status = ?;"
        c.execute(command, ("completed",))
        result = c.fetchone()

    return result

get_notes(scenario)

Retrieve notes associated with a specific scenario.

Parameters:

Name Type Description Default
scenario str

The unique identifier of the scenario.

required

Returns:

Type Description

sqlite3.Row or None: The notes record for the given scenario, or None if no notes exist.

Source code in nebula/controller/database.py
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
def get_notes(scenario):
    """
    Retrieve notes associated with a specific scenario.

    Parameters:
        scenario (str): The unique identifier of the scenario.

    Returns:
        sqlite3.Row or None: The notes record for the given scenario, or None if no notes exist.
    """
    with sqlite3.connect(notes_db_file_location) as conn:
        conn.row_factory = sqlite3.Row
        c = conn.cursor()
        c.execute("SELECT * FROM notes WHERE scenario = ?;", (scenario,))
        result = c.fetchone()

    return result

get_running_scenario(username=None, get_all=False)

Retrieve running or completed scenarios from the database, optionally filtered by username.

Parameters:

Name Type Description Default
username str

The username to filter scenarios by. If None, no user filter is applied.

None
get_all bool

If True, returns all matching scenarios; otherwise returns only one. Defaults to False.

False

Returns:

Type Description

sqlite3.Row or list[sqlite3.Row]: A single scenario record or a list of scenario records matching the criteria.

Behavior
  • Filters scenarios with status "running".
  • Applies username filter if provided.
  • Returns either one or all matching records depending on get_all.
Source code in nebula/controller/database.py
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
def get_running_scenario(username=None, get_all=False):
    """
    Retrieve running or completed scenarios from the database, optionally filtered by username.

    Parameters:
        username (str, optional): The username to filter scenarios by. If None, no user filter is applied.
        get_all (bool, optional): If True, returns all matching scenarios; otherwise returns only one. Defaults to False.

    Returns:
        sqlite3.Row or list[sqlite3.Row]: A single scenario record or a list of scenario records matching the criteria.

    Behavior:
        - Filters scenarios with status "running".
        - Applies username filter if provided.
        - Returns either one or all matching records depending on get_all.
    """
    with sqlite3.connect(scenario_db_file_location) as conn:
        conn.row_factory = sqlite3.Row
        c = conn.cursor()

        if username:
            command = """
                SELECT * FROM scenarios
                WHERE (status = ?) AND username = ?;
            """
            c.execute(command, ("running", username))

            result = c.fetchone()
        else:
            command = "SELECT * FROM scenarios WHERE status = ?;"
            c.execute(command, ("running",))
            if get_all:
                result = c.fetchall()
            else:
                result = c.fetchone()

    return result

get_scenario_by_name(scenario_name)

Retrieve a scenario record by its unique name.

Parameters:

Name Type Description Default
scenario_name str

The unique name identifier of the scenario.

required

Returns:

Type Description

sqlite3.Row: The scenario record matching the given name, or None if not found.

Source code in nebula/controller/database.py
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
def get_scenario_by_name(scenario_name):
    """
    Retrieve a scenario record by its unique name.

    Parameters:
        scenario_name (str): The unique name identifier of the scenario.

    Returns:
        sqlite3.Row: The scenario record matching the given name, or None if not found.
    """
    with sqlite3.connect(scenario_db_file_location) as conn:
        conn.row_factory = sqlite3.Row
        c = conn.cursor()
        c.execute("SELECT * FROM scenarios WHERE name = ?;", (scenario_name,))
        result = c.fetchone()

    return result

get_user_by_scenario_name(scenario_name)

Retrieve the username associated with a given scenario name.

Parameters:

Name Type Description Default
scenario_name str

The unique name identifier of the scenario.

required

Returns:

Name Type Description
str

The username linked to the specified scenario, or None if not found.

Source code in nebula/controller/database.py
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
def get_user_by_scenario_name(scenario_name):
    """
    Retrieve the username associated with a given scenario name.

    Parameters:
        scenario_name (str): The unique name identifier of the scenario.

    Returns:
        str: The username linked to the specified scenario, or None if not found.
    """
    with sqlite3.connect(scenario_db_file_location) as conn:
        conn.row_factory = sqlite3.Row
        c = conn.cursor()
        c.execute("SELECT username FROM scenarios WHERE name = ?;", (scenario_name,))
        result = c.fetchone()

    return result["username"]

get_user_info(user)

Fetches detailed information for a specific user from the users database.

Parameters:

Name Type Description Default
user str

The username to retrieve information for.

required

Returns:

Type Description

sqlite3.Row or None: A row containing the user's information if found, otherwise None.

Source code in nebula/controller/database.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def get_user_info(user):
    """
    Fetches detailed information for a specific user from the users database.

    Args:
        user (str): The username to retrieve information for.

    Returns:
        sqlite3.Row or None: A row containing the user's information if found, otherwise None.
    """
    with sqlite3.connect(user_db_file_location) as conn:
        conn.row_factory = sqlite3.Row
        c = conn.cursor()

        command = "SELECT * FROM users WHERE user = ?"
        c.execute(command, (user,))
        result = c.fetchone()

    return result

initialize_databases(databases_dir) async

Initializes all required SQLite databases and their corresponding tables for the system.

This function
  • Defines paths for user, node, scenario, and notes databases based on the provided directory.
  • Sets up each database with appropriate PRAGMA settings.
  • Creates necessary tables if they do not exist.
  • Ensures all expected columns are present in each table, adding any missing ones.
  • Creates a default admin user if no users are present.

Parameters:

Name Type Description Default
databases_dir str

Path to the directory where the database files will be created or accessed.

required
Note

Default credentials (username and password) are taken from environment variables: - NEBULA_DEFAULT_USER - NEBULA_DEFAULT_PASSWORD

Source code in nebula/controller/database.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
async def initialize_databases(databases_dir):
    """
    Initializes all required SQLite databases and their corresponding tables for the system.

    This function:
        - Defines paths for user, node, scenario, and notes databases based on the provided directory.
        - Sets up each database with appropriate PRAGMA settings.
        - Creates necessary tables if they do not exist.
        - Ensures all expected columns are present in each table, adding any missing ones.
        - Creates a default admin user if no users are present.

    Args:
        databases_dir (str): Path to the directory where the database files will be created or accessed.

    Note:
        Default credentials (username and password) are taken from environment variables:
        - NEBULA_DEFAULT_USER
        - NEBULA_DEFAULT_PASSWORD
    """
    global user_db_file_location, node_db_file_location, scenario_db_file_location, notes_db_file_location

    user_db_file_location = os.path.join(databases_dir, "users.db")
    node_db_file_location = os.path.join(databases_dir, "nodes.db")
    scenario_db_file_location = os.path.join(databases_dir, "scenarios.db")
    notes_db_file_location = os.path.join(databases_dir, "notes.db")

    await setup_database(user_db_file_location)
    await setup_database(node_db_file_location)
    await setup_database(scenario_db_file_location)
    await setup_database(notes_db_file_location)

    async with aiosqlite.connect(user_db_file_location) as conn:
        await conn.execute(
            """
            CREATE TABLE IF NOT EXISTS users (
                user TEXT PRIMARY KEY,
                password TEXT,
                role TEXT
            );
            """
        )
        desired_columns = {"user": "TEXT PRIMARY KEY", "password": "TEXT", "role": "TEXT"}
        await ensure_columns(conn, "users", desired_columns)

    async with aiosqlite.connect(node_db_file_location) as conn:
        await conn.execute(
            """
            CREATE TABLE IF NOT EXISTS nodes (
                uid TEXT PRIMARY KEY,
                idx TEXT,
                ip TEXT,
                port TEXT,
                role TEXT,
                neighbors TEXT,
                latitude TEXT,
                longitude TEXT,
                timestamp TEXT,
                federation TEXT,
                round TEXT,
                scenario TEXT,
                hash TEXT,
                malicious TEXT
            );
            """
        )
        desired_columns = {
            "uid": "TEXT PRIMARY KEY",
            "idx": "TEXT",
            "ip": "TEXT",
            "port": "TEXT",
            "role": "TEXT",
            "neighbors": "TEXT",
            "latitude": "TEXT",
            "longitude": "TEXT",
            "timestamp": "TEXT",
            "federation": "TEXT",
            "round": "TEXT",
            "scenario": "TEXT",
            "hash": "TEXT",
            "malicious": "TEXT",
        }
        await ensure_columns(conn, "nodes", desired_columns)

    async with aiosqlite.connect(scenario_db_file_location) as conn:
        await conn.execute(
            """
            CREATE TABLE IF NOT EXISTS scenarios (
                name TEXT PRIMARY KEY,
                start_time TEXT,
                end_time TEXT,
                title TEXT,
                description TEXT,
                deployment TEXT,
                federation TEXT,
                topology TEXT,
                nodes TEXT,
                nodes_graph TEXT,
                n_nodes TEXT,
                matrix TEXT,
                random_topology_probability TEXT,
                dataset TEXT,
                iid TEXT,
                partition_selection TEXT,
                partition_parameter TEXT,
                model TEXT,
                agg_algorithm TEXT,
                rounds TEXT,
                logginglevel TEXT,
                report_status_data_queue TEXT,
                accelerator TEXT,
                network_subnet TEXT,
                network_gateway TEXT,
                epochs TEXT,
                attack_params TEXT,
                reputation TEXT,
                random_geo TEXT,
                latitude TEXT,
                longitude TEXT,
                mobility TEXT,
                mobility_type TEXT,
                radius_federation TEXT,
                scheme_mobility TEXT,
                round_frequency TEXT,
                mobile_participants_percent TEXT,
                additional_participants TEXT,
                schema_additional_participants TEXT,
                status TEXT,
                role TEXT,
                username TEXT,
                gpu_id TEXT
            );
            """
        )
        desired_columns = {
            "name": "TEXT PRIMARY KEY",
            "start_time": "TEXT",
            "end_time": "TEXT",
            "title": "TEXT",
            "description": "TEXT",
            "deployment": "TEXT",
            "federation": "TEXT",
            "topology": "TEXT",
            "nodes": "TEXT",
            "nodes_graph": "TEXT",
            "n_nodes": "TEXT",
            "matrix": "TEXT",
            "random_topology_probability": "TEXT",
            "dataset": "TEXT",
            "iid": "TEXT",
            "partition_selection": "TEXT",
            "partition_parameter": "TEXT",
            "model": "TEXT",
            "agg_algorithm": "TEXT",
            "rounds": "TEXT",
            "logginglevel": "TEXT",
            "report_status_data_queue": "TEXT",
            "accelerator": "TEXT",
            "gpu_id": "TEXT",
            "network_subnet": "TEXT",
            "network_gateway": "TEXT",
            "epochs": "TEXT",
            "attack_params": "TEXT",
            "reputation": "TEXT",
            "random_geo": "TEXT",
            "latitude": "TEXT",
            "longitude": "TEXT",
            "mobility": "TEXT",
            "mobility_type": "TEXT",
            "radius_federation": "TEXT",
            "scheme_mobility": "TEXT",
            "round_frequency": "TEXT",
            "mobile_participants_percent": "TEXT",
            "additional_participants": "TEXT",
            "schema_additional_participants": "TEXT",
            "status": "TEXT",
            "role": "TEXT",
            "username": "TEXT",
        }
        await ensure_columns(conn, "scenarios", desired_columns)

    async with aiosqlite.connect(notes_db_file_location) as conn:
        await conn.execute(
            """
            CREATE TABLE IF NOT EXISTS notes (
                scenario TEXT PRIMARY KEY,
                scenario_notes TEXT
            );
            """
        )
        desired_columns = {"scenario": "TEXT PRIMARY KEY", "scenario_notes": "TEXT"}
        await ensure_columns(conn, "notes", desired_columns)

    username = os.environ.get("NEBULA_DEFAULT_USER", "admin")
    password = os.environ.get("NEBULA_DEFAULT_PASSWORD", "admin")
    if not list_users():
        add_user(username, password, "admin")
    if not verify_hash_algorithm(username):
        update_user(username, password, "admin")

list_nodes(scenario_name=None, sort_by='idx')

Retrieves a list of nodes from the nodes database, optionally filtered by scenario and sorted.

Parameters:

Name Type Description Default
scenario_name str

Name of the scenario to filter nodes by. If None, returns all nodes.

None
sort_by str

Column name to sort the results by. Defaults to "idx".

'idx'

Returns:

Type Description

list or None: A list of sqlite3.Row objects representing nodes, or None if an error occurs.

Source code in nebula/controller/database.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def list_nodes(scenario_name=None, sort_by="idx"):
    """
    Retrieves a list of nodes from the nodes database, optionally filtered by scenario and sorted.

    Args:
        scenario_name (str, optional): Name of the scenario to filter nodes by. If None, returns all nodes.
        sort_by (str): Column name to sort the results by. Defaults to "idx".

    Returns:
        list or None: A list of sqlite3.Row objects representing nodes, or None if an error occurs.
    """
    try:
        with sqlite3.connect(node_db_file_location) as conn:
            conn.row_factory = sqlite3.Row
            c = conn.cursor()

            if scenario_name:
                command = "SELECT * FROM nodes WHERE scenario = ? ORDER BY " + sort_by + ";"
                c.execute(command, (scenario_name,))
            else:
                command = "SELECT * FROM nodes ORDER BY " + sort_by + ";"
                c.execute(command)

            result = c.fetchall()

            return result
    except sqlite3.Error as e:
        print(f"Error occurred while listing nodes: {e}")
        return None

list_nodes_by_scenario_name(scenario_name)

Fetches all nodes associated with a specific scenario, ordered by their index as integers.

Parameters:

Name Type Description Default
scenario_name str

The name of the scenario to filter nodes by.

required

Returns:

Type Description

list or None: A list of sqlite3.Row objects for nodes in the scenario, or None if an error occurs.

Source code in nebula/controller/database.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
def list_nodes_by_scenario_name(scenario_name):
    """
    Fetches all nodes associated with a specific scenario, ordered by their index as integers.

    Args:
        scenario_name (str): The name of the scenario to filter nodes by.

    Returns:
        list or None: A list of sqlite3.Row objects for nodes in the scenario, or None if an error occurs.
    """
    try:
        with sqlite3.connect(node_db_file_location) as conn:
            conn.row_factory = sqlite3.Row
            c = conn.cursor()

            command = "SELECT * FROM nodes WHERE scenario = ? ORDER BY CAST(idx AS INTEGER) ASC;"
            c.execute(command, (scenario_name,))
            result = c.fetchall()

            return result
    except sqlite3.Error as e:
        print(f"Error occurred while listing nodes by scenario name: {e}")
        return None

list_users(all_info=False)

Retrieves a list of users from the users database.

Parameters:

Name Type Description Default
all_info bool

If True, returns full user records; otherwise, returns only usernames. Default is False.

False

Returns:

Name Type Description
list

A list of usernames or full user records depending on the all_info flag.

Source code in nebula/controller/database.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def list_users(all_info=False):
    """
    Retrieves a list of users from the users database.

    Args:
        all_info (bool): If True, returns full user records; otherwise, returns only usernames. Default is False.

    Returns:
        list: A list of usernames or full user records depending on the all_info flag.
    """
    with sqlite3.connect(user_db_file_location) as conn:
        conn.row_factory = sqlite3.Row
        c = conn.cursor()
        c.execute("SELECT * FROM users")
        result = c.fetchall()

    if not all_info:
        result = [user["user"] for user in result]

    return result

remove_all_nodes()

Deletes all node records from the nodes database.

This operation removes every entry in the nodes table.

Returns:

Type Description

None

Source code in nebula/controller/database.py
577
578
579
580
581
582
583
584
585
586
587
588
589
def remove_all_nodes():
    """
    Deletes all node records from the nodes database.

    This operation removes every entry in the nodes table.

    Returns:
        None
    """
    with sqlite3.connect(node_db_file_location) as conn:
        c = conn.cursor()
        command = "DELETE FROM nodes;"
        c.execute(command)

remove_nodes_by_scenario_name(scenario_name)

Deletes all nodes associated with a specific scenario from the database.

Parameters:

Name Type Description Default
scenario_name str

The name of the scenario whose nodes should be removed.

required

Returns:

Type Description

None

Source code in nebula/controller/database.py
592
593
594
595
596
597
598
599
600
601
602
603
604
605
def remove_nodes_by_scenario_name(scenario_name):
    """
    Deletes all nodes associated with a specific scenario from the database.

    Args:
        scenario_name (str): The name of the scenario whose nodes should be removed.

    Returns:
        None
    """
    with sqlite3.connect(node_db_file_location) as conn:
        c = conn.cursor()
        command = "DELETE FROM nodes WHERE scenario = ?;"
        c.execute(command, (scenario_name,))

remove_note(scenario)

Delete the note associated with a specific scenario.

Parameters:

Name Type Description Default
scenario str

The unique identifier of the scenario whose note should be removed.

required
Source code in nebula/controller/database.py
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
def remove_note(scenario):
    """
    Delete the note associated with a specific scenario.

    Parameters:
        scenario (str): The unique identifier of the scenario whose note should be removed.
    """
    with sqlite3.connect(notes_db_file_location) as conn:
        c = conn.cursor()
        c.execute("DELETE FROM notes WHERE scenario = ?;", (scenario,))
        conn.commit()

remove_scenario_by_name(scenario_name)

Delete a scenario from the database by its unique name.

Parameters:

Name Type Description Default
scenario_name str

The unique name identifier of the scenario to be removed.

required
Behavior
  • Removes the scenario record matching the given name.
  • Commits the deletion to the database.
Source code in nebula/controller/database.py
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
def remove_scenario_by_name(scenario_name):
    """
    Delete a scenario from the database by its unique name.

    Parameters:
        scenario_name (str): The unique name identifier of the scenario to be removed.

    Behavior:
        - Removes the scenario record matching the given name.
        - Commits the deletion to the database.
    """
    with sqlite3.connect(scenario_db_file_location) as conn:
        conn.row_factory = sqlite3.Row
        c = conn.cursor()
        c.execute("DELETE FROM scenarios WHERE name = ?;", (scenario_name,))
        conn.commit()

save_notes(scenario, notes)

Save or update notes associated with a specific scenario.

Parameters:

Name Type Description Default
scenario str

The unique identifier of the scenario.

required
notes str

The textual notes to be saved for the scenario.

required
Behavior
  • Inserts new notes if the scenario does not exist in the database.
  • Updates existing notes if the scenario already has notes saved.
  • Handles SQLite integrity and general database errors gracefully.
Source code in nebula/controller/database.py
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
def save_notes(scenario, notes):
    """
    Save or update notes associated with a specific scenario.

    Parameters:
        scenario (str): The unique identifier of the scenario.
        notes (str): The textual notes to be saved for the scenario.

    Behavior:
        - Inserts new notes if the scenario does not exist in the database.
        - Updates existing notes if the scenario already has notes saved.
        - Handles SQLite integrity and general database errors gracefully.
    """
    try:
        with sqlite3.connect(notes_db_file_location) as conn:
            c = conn.cursor()
            c.execute(
                """
                INSERT INTO notes (scenario, scenario_notes) VALUES (?, ?)
                ON CONFLICT(scenario) DO UPDATE SET scenario_notes = excluded.scenario_notes;
                """,
                (scenario, notes),
            )
            conn.commit()
    except sqlite3.IntegrityError as e:
        print(f"SQLite integrity error: {e}")
    except sqlite3.Error as e:
        print(f"SQLite error: {e}")

scenario_set_all_status_to_finished()

Set the status of all currently running scenarios to "finished" and update their end time to the current datetime.

Behavior
  • Finds all scenarios with status "running".
  • Updates their status to "finished".
  • Sets the end_time to the current timestamp.
  • Commits the changes to the database.
Source code in nebula/controller/database.py
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
def scenario_set_all_status_to_finished():
    """
    Set the status of all currently running scenarios to "finished" and update their end time to the current datetime.

    Behavior:
        - Finds all scenarios with status "running".
        - Updates their status to "finished".
        - Sets the end_time to the current timestamp.
        - Commits the changes to the database.
    """
    with sqlite3.connect(scenario_db_file_location) as conn:
        conn.row_factory = sqlite3.Row
        c = conn.cursor()
        current_time = str(datetime.datetime.now())
        c.execute("UPDATE scenarios SET status = 'finished', end_time = ? WHERE status = 'running';", (current_time,))
        conn.commit()

scenario_set_status_to_completed(scenario_name)

Set the status of a specific scenario to "completed".

Parameters:

Name Type Description Default
scenario_name str

The unique name identifier of the scenario to update.

required
Behavior
  • Updates the scenario's status to "completed".
  • Commits the change to the database.
Source code in nebula/controller/database.py
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
def scenario_set_status_to_completed(scenario_name):
    """
    Set the status of a specific scenario to "completed".

    Parameters:
        scenario_name (str): The unique name identifier of the scenario to update.

    Behavior:
        - Updates the scenario's status to "completed".
        - Commits the change to the database.
    """
    with sqlite3.connect(scenario_db_file_location) as conn:
        conn.row_factory = sqlite3.Row
        c = conn.cursor()
        c.execute("UPDATE scenarios SET status = 'completed' WHERE name = ?;", (scenario_name,))
        conn.commit()

scenario_set_status_to_finished(scenario_name)

Set the status of a specific scenario to "finished" and update its end time to the current datetime.

Parameters:

Name Type Description Default
scenario_name str

The unique name identifier of the scenario to update.

required
Behavior
  • Updates the scenario's status to "finished".
  • Sets the end_time to the current timestamp.
  • Commits the update to the database.
Source code in nebula/controller/database.py
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
def scenario_set_status_to_finished(scenario_name):
    """
    Set the status of a specific scenario to "finished" and update its end time to the current datetime.

    Parameters:
        scenario_name (str): The unique name identifier of the scenario to update.

    Behavior:
        - Updates the scenario's status to "finished".
        - Sets the end_time to the current timestamp.
        - Commits the update to the database.
    """
    with sqlite3.connect(scenario_db_file_location) as conn:
        conn.row_factory = sqlite3.Row
        c = conn.cursor()
        current_time = str(datetime.datetime.now())
        c.execute(
            "UPDATE scenarios SET status = 'finished', end_time = ? WHERE name = ?;", (current_time, scenario_name)
        )
        conn.commit()

scenario_update_record(name, start_time, end_time, scenario, status, role, username)

Insert a new scenario record or update an existing one in the database based on the scenario name.

Parameters:

Name Type Description Default
name str

The unique name identifier of the scenario.

required
start_time str

The start time of the scenario.

required
end_time str

The end time of the scenario.

required
scenario object

An object containing detailed scenario attributes.

required
status str

The current status of the scenario.

required
role str

The role of the user performing the operation.

required
username str

The username of the user performing the operation.

required
Behavior
  • Checks if a scenario with the given name exists.
  • If not, inserts a new record with all scenario details.
  • If exists, updates the existing record with the provided data.
  • Commits the transaction to persist changes.
Source code in nebula/controller/database.py
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
def scenario_update_record(name, start_time, end_time, scenario, status, role, username):
    """
    Insert a new scenario record or update an existing one in the database based on the scenario name.

    Parameters:
        name (str): The unique name identifier of the scenario.
        start_time (str): The start time of the scenario.
        end_time (str): The end time of the scenario.
        scenario (object): An object containing detailed scenario attributes.
        status (str): The current status of the scenario.
        role (str): The role of the user performing the operation.
        username (str): The username of the user performing the operation.

    Behavior:
        - Checks if a scenario with the given name exists.
        - If not, inserts a new record with all scenario details.
        - If exists, updates the existing record with the provided data.
        - Commits the transaction to persist changes.
    """
    with sqlite3.connect(scenario_db_file_location) as conn:
        conn.row_factory = sqlite3.Row
        c = conn.cursor()

        select_command = "SELECT * FROM scenarios WHERE name = ?;"
        c.execute(select_command, (name,))
        result = c.fetchone()

        if result is None:
            insert_command = """
                INSERT INTO scenarios (
                    name,
                    start_time,
                    end_time,
                    title,
                    description,
                    deployment,
                    federation,
                    topology,
                    nodes,
                    nodes_graph,
                    n_nodes,
                    matrix,
                    random_topology_probability,
                    dataset,
                    iid,
                    partition_selection,
                    partition_parameter,
                    model,
                    agg_algorithm,
                    rounds,
                    logginglevel,
                    report_status_data_queue,
                    accelerator,
                    gpu_id,
                    network_subnet,
                    network_gateway,
                    epochs,
                    attack_params,
                    reputation,
                    random_geo,
                    latitude,
                    longitude,
                    mobility,
                    mobility_type,
                    radius_federation,
                    scheme_mobility,
                    round_frequency,
                    mobile_participants_percent,
                    additional_participants,
                    schema_additional_participants,
                    status,
                    role,
                    username
                ) VALUES (
                    ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
                );
            """
            c.execute(
                insert_command,
                (
                    name,
                    start_time,
                    end_time,
                    scenario.scenario_title,
                    scenario.scenario_description,
                    scenario.deployment,
                    scenario.federation,
                    scenario.topology,
                    json.dumps(scenario.nodes),
                    json.dumps(scenario.nodes_graph),
                    scenario.n_nodes,
                    json.dumps(scenario.matrix),
                    scenario.random_topology_probability,
                    scenario.dataset,
                    scenario.iid,
                    scenario.partition_selection,
                    scenario.partition_parameter,
                    scenario.model,
                    scenario.agg_algorithm,
                    scenario.rounds,
                    scenario.logginglevel,
                    scenario.report_status_data_queue,
                    scenario.accelerator,
                    json.dumps(scenario.gpu_id),
                    scenario.network_subnet,
                    scenario.network_gateway,
                    scenario.epochs,
                    json.dumps(scenario.attack_params),
                    json.dumps(scenario.reputation),
                    scenario.random_geo,
                    scenario.latitude,
                    scenario.longitude,
                    scenario.mobility,
                    scenario.mobility_type,
                    scenario.radius_federation,
                    scenario.scheme_mobility,
                    scenario.round_frequency,
                    scenario.mobile_participants_percent,
                    json.dumps(scenario.additional_participants),
                    scenario.schema_additional_participants,
                    status,
                    role,
                    username,
                ),
            )
        else:
            update_command = """
                UPDATE scenarios SET
                    start_time = ?,
                    end_time = ?,
                    title = ?,
                    description = ?,
                    deployment = ?,
                    federation = ?,
                    topology = ?,
                    nodes = ?,
                    nodes_graph = ?,
                    n_nodes = ?,
                    matrix = ?,
                    random_topology_probability = ?,
                    dataset = ?,
                    iid = ?,
                    partition_selection = ?,
                    partition_parameter = ?,
                    model = ?,
                    agg_algorithm = ?,
                    rounds = ?,
                    logginglevel = ?,
                    report_status_data_queue = ?,
                    accelerator = ?,
                    gpu_id = ?,
                    network_subnet = ?,
                    network_gateway = ?,
                    epochs = ?,
                    attack_params = ?,
                    reputation = ?,
                    random_geo = ?,
                    latitude = ?,
                    longitude = ?,
                    mobility = ?,
                    mobility_type = ?,
                    radius_federation = ?,
                    scheme_mobility = ?,
                    round_frequency = ?,
                    mobile_participants_percent = ?,
                    additional_participants = ?,
                    schema_additional_participants = ?,
                    status = ?,
                    role = ?,
                    username = ?
                WHERE name = ?;
            """
            c.execute(
                update_command,
                (
                    start_time,
                    end_time,
                    scenario.scenario_title,
                    scenario.scenario_description,
                    scenario.deployment,
                    scenario.federation,
                    scenario.topology,
                    json.dumps(scenario.nodes),
                    json.dumps(scenario.nodes_graph),
                    scenario.n_nodes,
                    json.dumps(scenario.matrix),
                    scenario.random_topology_probability,
                    scenario.dataset,
                    scenario.iid,
                    scenario.partition_selection,
                    scenario.partition_parameter,
                    scenario.model,
                    scenario.agg_algorithm,
                    scenario.rounds,
                    scenario.logginglevel,
                    scenario.report_status_data_queue,
                    scenario.accelerator,
                    json.dumps(scenario.gpu_id),
                    scenario.network_subnet,
                    scenario.network_gateway,
                    scenario.epochs,
                    json.dumps(scenario.attack_params),
                    json.dumps(scenario.reputation),
                    scenario.random_geo,
                    scenario.latitude,
                    scenario.longitude,
                    scenario.mobility,
                    scenario.mobility_type,
                    scenario.radius_federation,
                    scenario.scheme_mobility,
                    scenario.round_frequency,
                    scenario.mobile_participants_percent,
                    json.dumps(scenario.additional_participants),
                    scenario.schema_additional_participants,
                    status,
                    role,
                    username,
                    name,
                ),
            )

        conn.commit()

setup_database(db_file_location) async

Initializes the SQLite database with the required PRAGMA settings.

This function
  • Connects asynchronously to the specified SQLite database file.
  • Applies a predefined list of PRAGMA settings to configure the database.
  • Commits the changes after applying the settings.

Parameters:

Name Type Description Default
db_file_location str

Path to the SQLite database file.

required

Raises:

Type Description
PermissionError

Logged if the application lacks permission to create or modify the database file.

Exception

Logs any other unexpected error that occurs during setup.

Source code in nebula/controller/database.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
async def setup_database(db_file_location):
    """
    Initializes the SQLite database with the required PRAGMA settings.

    This function:
        - Connects asynchronously to the specified SQLite database file.
        - Applies a predefined list of PRAGMA settings to configure the database.
        - Commits the changes after applying the settings.

    Args:
        db_file_location (str): Path to the SQLite database file.

    Exceptions:
        PermissionError: Logged if the application lacks permission to create or modify the database file.
        Exception: Logs any other unexpected error that occurs during setup.
    """
    try:
        async with aiosqlite.connect(db_file_location) as db:
            for pragma in PRAGMA_SETTINGS:
                await db.execute(pragma)
            await db.commit()
    except PermissionError:
        logging.info("No permission to create the databases. Change the default databases directory")
    except Exception as e:
        logging.exception(f"An error has ocurred during setup_database: {e}")

update_node_record(node_uid, idx, ip, port, role, neighbors, latitude, longitude, timestamp, federation, federation_round, scenario, run_hash, malicious) async

Inserts or updates a node record in the database for a given scenario, ensuring thread-safe access.

Parameters:

Name Type Description Default
node_uid str

Unique identifier of the node.

required
idx str

Index or identifier within the scenario.

required
ip str

IP address of the node.

required
port str

Port used by the node.

required
role str

Role of the node in the federation.

required
neighbors str

Neighbors of the node (serialized).

required
latitude str

Geographic latitude of the node.

required
longitude str

Geographic longitude of the node.

required
timestamp str

Timestamp of the last update.

required
federation str

Federation identifier the node belongs to.

required
federation_round str

Current federation round.

required
scenario str

Scenario name the node is part of.

required
run_hash str

Hash of the current run/state.

required
malicious str

Indicator if the node is malicious.

required

Returns:

Type Description

dict or None: The updated or inserted node record as a dictionary, or None if insertion/update failed.

Source code in nebula/controller/database.py
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
async def update_node_record(
    node_uid,
    idx,
    ip,
    port,
    role,
    neighbors,
    latitude,
    longitude,
    timestamp,
    federation,
    federation_round,
    scenario,
    run_hash,
    malicious,
):
    """
    Inserts or updates a node record in the database for a given scenario, ensuring thread-safe access.

    Args:
        node_uid (str): Unique identifier of the node.
        idx (str): Index or identifier within the scenario.
        ip (str): IP address of the node.
        port (str): Port used by the node.
        role (str): Role of the node in the federation.
        neighbors (str): Neighbors of the node (serialized).
        latitude (str): Geographic latitude of the node.
        longitude (str): Geographic longitude of the node.
        timestamp (str): Timestamp of the last update.
        federation (str): Federation identifier the node belongs to.
        federation_round (str): Current federation round.
        scenario (str): Scenario name the node is part of.
        run_hash (str): Hash of the current run/state.
        malicious (str): Indicator if the node is malicious.

    Returns:
        dict or None: The updated or inserted node record as a dictionary, or None if insertion/update failed.
    """
    global _node_lock
    async with _node_lock:
        async with aiosqlite.connect(node_db_file_location) as conn:
            conn.row_factory = aiosqlite.Row
            _c = await conn.cursor()

            # Check if the node already exists
            await _c.execute("SELECT * FROM nodes WHERE uid = ? AND scenario = ?;", (node_uid, scenario))
            result = await _c.fetchone()

            if result is None:
                # Insert new node
                await _c.execute(
                    "INSERT INTO nodes (uid, idx, ip, port, role, neighbors, latitude, longitude, timestamp, federation, round, scenario, hash, malicious) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
                    (
                        node_uid,
                        idx,
                        ip,
                        port,
                        role,
                        neighbors,
                        latitude,
                        longitude,
                        timestamp,
                        federation,
                        federation_round,
                        scenario,
                        run_hash,
                        malicious,
                    ),
                )
            else:
                # Update existing node
                await _c.execute(
                    "UPDATE nodes SET idx = ?, ip = ?, port = ?, role = ?, neighbors = ?, latitude = ?, longitude = ?, timestamp = ?, federation = ?, round = ?, hash = ?, malicious = ? WHERE uid = ? AND scenario = ?;",
                    (
                        idx,
                        ip,
                        port,
                        role,
                        neighbors,
                        latitude,
                        longitude,
                        timestamp,
                        federation,
                        federation_round,
                        run_hash,
                        malicious,
                        node_uid,
                        scenario,
                    ),
                )

            await conn.commit()

            # Fetch the updated or newly inserted row
            await _c.execute("SELECT * FROM nodes WHERE uid = ? AND scenario = ?;", (node_uid, scenario))
            updated_row = await _c.fetchone()
            return dict(updated_row) if updated_row else None

update_user(user, password, role)

Updates the password and role of an existing user in the users database.

Parameters:

Name Type Description Default
user str

The username to update (case-insensitive, stored as uppercase).

required
password str

The new plain text password to hash and store.

required
role str

The new role to assign to the user.

required
Source code in nebula/controller/database.py
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
def update_user(user, password, role):
    """
    Updates the password and role of an existing user in the users database.

    Args:
        user (str): The username to update (case-insensitive, stored as uppercase).
        password (str): The new plain text password to hash and store.
        role (str): The new role to assign to the user.
    """
    ph = PasswordHasher()
    with sqlite3.connect(user_db_file_location) as conn:
        c = conn.cursor()
        c.execute(
            "UPDATE users SET password = ?, role = ? WHERE user = ?",
            (ph.hash(password), role, user.upper()),
        )

verify(user, password)

Verifies whether the provided password matches the stored hashed password for a user.

Parameters:

Name Type Description Default
user str

The username to verify.

required
password str

The plain text password to check against the stored hash.

required

Returns:

Name Type Description
bool

True if the password is correct, False otherwise.

Source code in nebula/controller/database.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def verify(user, password):
    """
    Verifies whether the provided password matches the stored hashed password for a user.

    Args:
        user (str): The username to verify.
        password (str): The plain text password to check against the stored hash.

    Returns:
        bool: True if the password is correct, False otherwise.
    """
    ph = PasswordHasher()
    with sqlite3.connect(user_db_file_location) as conn:
        c = conn.cursor()

        c.execute("SELECT password FROM users WHERE user = ?", (user,))
        result = c.fetchone()
        if result:
            try:
                return ph.verify(result[0], password)
            except:
                return False
    return False

verify_hash_algorithm(user)

Checks if the stored password hash for a user uses a supported Argon2 algorithm.

Parameters:

Name Type Description Default
user str

The username to check (case-insensitive, converted to uppercase).

required

Returns:

Name Type Description
bool

True if the password hash starts with a valid Argon2 prefix, False otherwise.

Source code in nebula/controller/database.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def verify_hash_algorithm(user):
    """
    Checks if the stored password hash for a user uses a supported Argon2 algorithm.

    Args:
        user (str): The username to check (case-insensitive, converted to uppercase).

    Returns:
        bool: True if the password hash starts with a valid Argon2 prefix, False otherwise.
    """
    user = user.upper()
    argon2_prefixes = ("$argon2i$", "$argon2id$")

    with sqlite3.connect(user_db_file_location) as conn:
        conn.row_factory = sqlite3.Row
        c = conn.cursor()

        c.execute("SELECT password FROM users WHERE user = ?", (user,))
        result = c.fetchone()
        if result:
            password_hash = result["password"]
            return password_hash.startswith(argon2_prefixes)

    return False