Skip to content

Documentation for Controller Module

TermEscapeCodeFormatter

Bases: Formatter

Custom logging formatter that removes ANSI terminal escape codes from log messages.

This formatter is useful when you want to clean up log outputs by stripping out any terminal color codes or formatting sequences before logging them to a file or other non-terminal output.

Attributes:

Name Type Description
fmt str

Format string for the log message.

datefmt str

Format string for the date in the log message.

style str

Formatting style (default is '%').

validate bool

Whether to validate the format string.

Methods:

Name Description
format

Strips ANSI escape codes from the log message and formats it.

Source code in nebula/controller/controller.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class TermEscapeCodeFormatter(logging.Formatter):
    """
    Custom logging formatter that removes ANSI terminal escape codes from log messages.

    This formatter is useful when you want to clean up log outputs by stripping out
    any terminal color codes or formatting sequences before logging them to a file
    or other non-terminal output.

    Attributes:
        fmt (str): Format string for the log message.
        datefmt (str): Format string for the date in the log message.
        style (str): Formatting style (default is '%').
        validate (bool): Whether to validate the format string.

    Methods:
        format(record): Strips ANSI escape codes from the log message and formats it.
    """

    def __init__(self, fmt=None, datefmt=None, style="%", validate=True):
        """
        Initializes the TermEscapeCodeFormatter.

        Args:
            fmt (str, optional): The format string for the log message.
            datefmt (str, optional): The format string for the date.
            style (str, optional): The formatting style. Defaults to '%'.
            validate (bool, optional): Whether to validate the format string. Defaults to True.
        """
        super().__init__(fmt, datefmt, style, validate)

    def format(self, record):
        """
        Formats the specified log record, stripping out any ANSI escape codes.

        Args:
            record (logging.LogRecord): The log record to be formatted.

        Returns:
            str: The formatted log message with escape codes removed.
        """
        escape_re = re.compile(r"\x1b\[[0-9;]*m")
        record.msg = re.sub(escape_re, "", str(record.msg))
        return super().format(record)

__init__(fmt=None, datefmt=None, style='%', validate=True)

Initializes the TermEscapeCodeFormatter.

Parameters:

Name Type Description Default
fmt str

The format string for the log message.

None
datefmt str

The format string for the date.

None
style str

The formatting style. Defaults to '%'.

'%'
validate bool

Whether to validate the format string. Defaults to True.

True
Source code in nebula/controller/controller.py
42
43
44
45
46
47
48
49
50
51
52
def __init__(self, fmt=None, datefmt=None, style="%", validate=True):
    """
    Initializes the TermEscapeCodeFormatter.

    Args:
        fmt (str, optional): The format string for the log message.
        datefmt (str, optional): The format string for the date.
        style (str, optional): The formatting style. Defaults to '%'.
        validate (bool, optional): Whether to validate the format string. Defaults to True.
    """
    super().__init__(fmt, datefmt, style, validate)

format(record)

Formats the specified log record, stripping out any ANSI escape codes.

Parameters:

Name Type Description Default
record LogRecord

The log record to be formatted.

required

Returns:

Name Type Description
str

The formatted log message with escape codes removed.

Source code in nebula/controller/controller.py
54
55
56
57
58
59
60
61
62
63
64
65
66
def format(self, record):
    """
    Formats the specified log record, stripping out any ANSI escape codes.

    Args:
        record (logging.LogRecord): The log record to be formatted.

    Returns:
        str: The formatted log message with escape codes removed.
    """
    escape_re = re.compile(r"\x1b\[[0-9;]*m")
    record.msg = re.sub(escape_re, "", str(record.msg))
    return super().format(record)

add_user_controller(user=Body(...), password=Body(...), role=Body(...)) async

Endpoint to add a new user to the database.

Body Parameters: - user: Username. - password: Password for the new user. - role: Role assigned to the user (e.g., "admin", "user").

Returns a success message or an error if the user could not be added.

Source code in nebula/controller/controller.py
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
@app.post("/user/add")
async def add_user_controller(user: str = Body(...), password: str = Body(...), role: str = Body(...)):
    """
    Endpoint to add a new user to the database.

    Body Parameters:
    - user: Username.
    - password: Password for the new user.
    - role: Role assigned to the user (e.g., "admin", "user").

    Returns a success message or an error if the user could not be added.
    """
    from nebula.controller.database import add_user

    try:
        add_user(user, password, role)
        return {"detail": "User added successfully"}
    except Exception as e:
        logging.exception(f"Error adding user: {e}")
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error adding user: {e}")

check_scenario(role, scenario_name) async

Checks if a scenario is allowed for a specific role.

Parameters:

Name Type Description Default
role str

Role to validate.

required
scenario_name str

Name of the scenario.

required

Returns:

Name Type Description
dict

Whether the scenario is allowed for the role.

Source code in nebula/controller/controller.py
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
@app.get("/scenarios/check/{role}/{scenario_name}")
async def check_scenario(
    role: Annotated[str, Path(regex="^[a-zA-Z0-9_-]+$", min_length=1, max_length=50, description="Valid role")],
    scenario_name: Annotated[
        str, Path(regex="^[a-zA-Z0-9_-]+$", min_length=1, max_length=50, description="Valid scenario name")
    ],
):
    """
    Checks if a scenario is allowed for a specific role.

    Args:
        role (str): Role to validate.
        scenario_name (str): Name of the scenario.

    Returns:
        dict: Whether the scenario is allowed for the role.
    """
    from nebula.controller.database import check_scenario_with_role

    try:
        allowed = check_scenario_with_role(role, scenario_name)
        return {"allowed": allowed}
    except Exception as e:
        logging.exception(f"Error checking scenario with role: {e}")
        raise HTTPException(status_code=500, detail="Internal server error")

configure_logger(controller_log)

Configures the logging system for the controller.

  • Sets a format for console and file logging.
  • Creates a console handler with INFO level.
  • Creates a file handler for 'controller.log' with INFO level.
  • Configures specific Uvicorn loggers to use the file handler without duplicating log messages.
Source code in nebula/controller/controller.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def configure_logger(controller_log):
    """
    Configures the logging system for the controller.

    - Sets a format for console and file logging.
    - Creates a console handler with INFO level.
    - Creates a file handler for 'controller.log' with INFO level.
    - Configures specific Uvicorn loggers to use the file handler
      without duplicating log messages.
    """
    log_console_format = "[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s"
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    console_handler.setFormatter(TermEscapeCodeFormatter(log_console_format))
    console_handler_file = logging.FileHandler(os.path.join(controller_log), mode="w")
    console_handler_file.setLevel(logging.INFO)
    console_handler_file.setFormatter(logging.Formatter("[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s"))
    logging.basicConfig(
        level=logging.DEBUG,
        handlers=[
            console_handler,
            console_handler_file,
        ],
    )
    uvicorn_loggers = ["uvicorn", "uvicorn.error", "uvicorn.access"]
    for logger_name in uvicorn_loggers:
        logger = logging.getLogger(logger_name)
        logger.handlers = []  # Remove existing handlers
        logger.propagate = False  # Prevent duplicate logs
        handler = logging.FileHandler(os.path.join(controller_log), mode="a")
        handler.setFormatter(logging.Formatter("[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s"))
        logger.addHandler(handler)

discover_vpn() async

Calls the Tailscale CLI to fetch the current status in JSON format, extracts all IPv4 addresses (by filtering out any address containing “:”), and returns them as a JSON object {"ips": [...]}.

Source code in nebula/controller/controller.py
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
@app.get("/discover-vpn")
async def discover_vpn():
    """
    Calls the Tailscale CLI to fetch the current status in JSON format,
    extracts all IPv4 addresses (by filtering out any address containing “:”),
    and returns them as a JSON object {"ips": [...]}.
    """
    try:
        # 1) Launch the `tailscale status --json` subprocess
        proc = await asyncio.create_subprocess_exec(
            "tailscale", "status", "--json",
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )

        # 2) Wait for it to finish and capture stdout/stderr
        out, err = await proc.communicate()
        if proc.returncode != 0:
            # If the CLI returned an error, raise to be caught below
            raise RuntimeError(err.decode())

        # 3) Parse the JSON output
        data = json.loads(out.decode())

        # 4) Collect only the IPv4 addresses from each peer
        ips = []
        for peer in data.get("Peer", {}).values():
            for ip in peer.get("TailscaleIPs", []):
                if ":" not in ip:  
                    # Skip IPv6 entries (they contain colons)
                    ips.append(ip)

        # 5) Return the list of IPv4s
        return {"ips": ips}

    except Exception as e:
        # 6) Log any failure and respond with HTTP 500
        logging.error(f"Error discovering VPN devices: {e}")
        raise HTTPException(status_code=500, detail="No devices discovered")

get_available_gpu() async

Get the list of GPUs with memory usage below 5%.

Returns:

Name Type Description
dict

A dictionary with a list of GPU indices that are mostly free (usage < 5%).

Source code in nebula/controller/controller.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
@app.get("/available_gpus/")
async def get_available_gpu():
    """
    Get the list of GPUs with memory usage below 5%.

    Returns:
        dict: A dictionary with a list of GPU indices that are mostly free (usage < 5%).
    """
    available_gpus = []

    if importlib.util.find_spec("pynvml") is not None:
        try:
            import pynvml

            await asyncio.to_thread(pynvml.nvmlInit)
            devices = await asyncio.to_thread(pynvml.nvmlDeviceGetCount)

            # Obtain GPU info
            for i in range(devices):
                handle = await asyncio.to_thread(pynvml.nvmlDeviceGetHandleByIndex, i)
                memory_info = await asyncio.to_thread(pynvml.nvmlDeviceGetMemoryInfo, handle)
                memory_used_percent = (memory_info.used / memory_info.total) * 100

                # Obtain available GPUs
                if memory_used_percent < 5:
                    available_gpus.append(i)

            return {
                "available_gpus": available_gpus,
            }
        except Exception:  # noqa: S110
            pass

get_least_memory_gpu() async

Identify the GPU with the highest memory usage above a threshold (50%).

Note

Despite the name, this function returns the GPU using the most memory above 50% usage.

Returns:

Name Type Description
dict

A dictionary with the index of the GPU using the most memory above the threshold, or None if no such GPU is found.

Source code in nebula/controller/controller.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
@app.get("/least_memory_gpu")
async def get_least_memory_gpu():
    """
    Identify the GPU with the highest memory usage above a threshold (50%).

    Note:
        Despite the name, this function returns the GPU using the **most**
        memory above 50% usage.

    Returns:
        dict: A dictionary with the index of the GPU using the most memory above the threshold,
              or None if no such GPU is found.
    """
    gpu_with_least_memory_index = None

    if importlib.util.find_spec("pynvml") is not None:
        max_memory_used_percent = 50
        try:
            import pynvml

            await asyncio.to_thread(pynvml.nvmlInit)
            devices = await asyncio.to_thread(pynvml.nvmlDeviceGetCount)

            # Obtain GPU info
            for i in range(devices):
                handle = await asyncio.to_thread(pynvml.nvmlDeviceGetHandleByIndex, i)
                memory_info = await asyncio.to_thread(pynvml.nvmlDeviceGetMemoryInfo, handle)
                memory_used_percent = (memory_info.used / memory_info.total) * 100

                # Obtain GPU with less memory available
                if memory_used_percent > max_memory_used_percent:
                    max_memory_used_percent = memory_used_percent
                    gpu_with_least_memory_index = i

        except Exception:  # noqa: S110
            pass

    return {
        "gpu_with_least_memory_index": gpu_with_least_memory_index,
    }

get_notes_by_scenario_name(scenario_name) async

Endpoint to retrieve notes associated with a scenario.

Path Parameters: - scenario_name: Name of the scenario.

Returns the notes or raises an HTTPException on error.

Source code in nebula/controller/controller.py
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
@app.get("/notes/{scenario_name}")
async def get_notes_by_scenario_name(
    scenario_name: Annotated[
        str, Path(regex="^[a-zA-Z0-9_-]+$", min_length=1, max_length=50, description="Valid scenario name")
    ],
):
    """
    Endpoint to retrieve notes associated with a scenario.

    Path Parameters:
    - scenario_name: Name of the scenario.

    Returns the notes or raises an HTTPException on error.
    """
    from nebula.controller.database import get_notes

    try:
        notes = get_notes(scenario_name)
    except Exception as e:
        logging.exception(f"Error obtaining notes {notes}: {e}")
        raise HTTPException(status_code=500, detail="Internal server error")

    return notes

get_physical_node_state(ip) async

Query a single Raspberry Pi (or other node) for its training state.

Parameters

ip : str IP address or hostname of the node.

Returns

dict • running (bool) – True if a training process is active.
• error (str) – Optional error message when the node is unreachable or returns a non-200 HTTP status.

Source code in nebula/controller/controller.py
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
@app.get("/physical/state/{ip}", tags=["physical"])
async def get_physical_node_state(ip: str):
    """
    Query a single Raspberry Pi (or other node) for its training state.

    Parameters
    ----------
    ip : str
        IP address or hostname of the node.

    Returns
    -------
    dict
        • running (bool) – True if a training process is active.  
        • error   (str)  – Optional error message when the node is unreachable
                            or returns a non-200 HTTP status.
    """
    # Short global timeout so a dead node doesn't block the whole request
    timeout = aiohttp.ClientTimeout(total=3)            # seconds

    try:
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.get(f"http://{ip}/state/") as resp:
                if resp.status == 200:
                    # Forward the node's own JSON, expected to be {"running": bool}
                    return await resp.json()
                # Node responded but with an HTTP error code
                return {"running": False,
                        "error": f"HTTP {resp.status}"}
    except Exception as exc:
        # Network errors, timeouts, DNS failures, …
        return {"running": False, "error": str(exc)}

get_physical_scenario_state(scenario_name) async

Check the training state of every physical node assigned to a scenario.

Parameters

scenario_name : str Scenario identifier.

Returns

dict { "running": bool, # True ⇢ at least one node is training "nodes_state": { ip: {...} }, # result from each /state/ call "all_available": bool # True ⇢ every node responded and # none is training }

Source code in nebula/controller/controller.py
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
@app.get("/physical/scenario-state/{scenario_name}", tags=["physical"])
async def get_physical_scenario_state(scenario_name: str):
    """
    Check the training state of *every* physical node assigned to a scenario.

    Parameters
    ----------
    scenario_name : str
        Scenario identifier.

    Returns
    -------
    dict
        {
          "running":       bool,            # True  ⇢ at least one node is training
          "nodes_state":   { ip: {...} },   # result from each /state/ call
          "all_available": bool             # True  ⇢ every node responded and
                                            #          none is training
        }
    """
    # 1) Retrieve scenario metadata and node list from the DB
    scenario = await get_scenario_by_name(scenario_name)
    if not scenario:
        raise HTTPException(status_code=404, detail="Scenario not found")

    nodes = await list_nodes_by_scenario_name(scenario_name)
    if not nodes:
        raise HTTPException(status_code=404, detail="No nodes found for scenario")

    # 2) Probe all nodes concurrently
    ips   = [n["ip"] for n in nodes]
    tasks = [get_physical_node_state(ip) for ip in ips]
    states = await asyncio.gather(*tasks)               # parallel HTTP calls

    # 3) Aggregate results
    nodes_state  = dict(zip(ips, states))
    any_running  = any(s.get("running") for s in states)
    # 'all_available' is true only if *every* node answered with running=False
    # *and* without an error field.
    all_available = all(
        (not s.get("running")) and (not s.get("error")) for s in states
    )

    return {
        "running": any_running,
        "nodes_state": nodes_state,
        "all_available": all_available,
    }

get_resources() async

Get system resource usage including RAM and GPU memory usage.

Returns:

Name Type Description
dict

A dictionary containing: - gpus (int): Number of GPUs detected. - memory_percent (float): Percentage of used RAM. - gpu_memory_percent (List[float]): List of GPU memory usage percentages.

Source code in nebula/controller/controller.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@app.get("/resources")
async def get_resources():
    """
    Get system resource usage including RAM and GPU memory usage.

    Returns:
        dict: A dictionary containing:
            - gpus (int): Number of GPUs detected.
            - memory_percent (float): Percentage of used RAM.
            - gpu_memory_percent (List[float]): List of GPU memory usage percentages.
    """
    devices = 0
    gpu_memory_percent = []

    # Obtain available RAM
    memory_info = await asyncio.to_thread(psutil.virtual_memory)

    if importlib.util.find_spec("pynvml") is not None:
        try:
            import pynvml

            await asyncio.to_thread(pynvml.nvmlInit)
            devices = await asyncio.to_thread(pynvml.nvmlDeviceGetCount)

            # Obtain GPU info
            for i in range(devices):
                handle = await asyncio.to_thread(pynvml.nvmlDeviceGetHandleByIndex, i)
                memory_info_gpu = await asyncio.to_thread(pynvml.nvmlDeviceGetMemoryInfo, handle)
                memory_used_percent = (memory_info_gpu.used / memory_info_gpu.total) * 100
                gpu_memory_percent.append(memory_used_percent)

        except Exception:  # noqa: S110
            pass

    return {
        # "cpu_percent": psutil.cpu_percent(),
        "gpus": devices,
        "memory_percent": memory_info.percent,
        "gpu_memory_percent": gpu_memory_percent,
    }

get_running_scenario(get_all=False) async

Retrieves the currently running scenario(s).

Parameters:

Name Type Description Default
get_all bool

If True, retrieves all running scenarios.

False

Returns:

Type Description

dict or list: Running scenario(s) information.

Source code in nebula/controller/controller.py
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
@app.get("/scenarios/running")
async def get_running_scenario(get_all: bool = False):
    """
    Retrieves the currently running scenario(s).

    Args:
        get_all (bool): If True, retrieves all running scenarios.

    Returns:
        dict or list: Running scenario(s) information.
    """
    from nebula.controller.database import get_running_scenario

    try:
        return get_running_scenario(get_all=get_all)
    except Exception as e:
        logging.exception(f"Error obtaining running scenario: {e}")
        raise HTTPException(status_code=500, detail="Internal server error")

get_scenario_by_name(scenario_name) async

Fetches a scenario by its name.

Parameters:

Name Type Description Default
scenario_name str

The name of the scenario.

required

Returns:

Name Type Description
dict

The scenario data.

Source code in nebula/controller/controller.py
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
@app.get("/scenarios/{scenario_name}")
async def get_scenario_by_name(
    scenario_name: Annotated[
        str, Path(regex="^[a-zA-Z0-9_-]+$", min_length=1, max_length=50, description="Valid scenario name")
    ],
):
    """
    Fetches a scenario by its name.

    Args:
        scenario_name (str): The name of the scenario.

    Returns:
        dict: The scenario data.
    """
    from nebula.controller.database import get_scenario_by_name

    try:
        scenario = get_scenario_by_name(scenario_name)
    except Exception as e:
        logging.exception(f"Error obtaining scenario {scenario_name}: {e}")
        raise HTTPException(status_code=500, detail="Internal server error")

    return scenario

get_scenarios(user, role) async

Retrieves all scenarios associated with a given user and role.

Parameters:

Name Type Description Default
user str

Username to filter scenarios.

required
role str

Role of the user (e.g., "admin").

required

Returns:

Name Type Description
dict

A list of scenarios and the currently running scenario.

Source code in nebula/controller/controller.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
@app.get("/scenarios/{user}/{role}")
async def get_scenarios(
    user: Annotated[str, Path(regex="^[a-zA-Z0-9_-]+$", min_length=1, max_length=50, description="Valid username")],
    role: Annotated[str, Path(regex="^[a-zA-Z0-9_-]+$", min_length=1, max_length=50, description="Valid role")],
):
    """
    Retrieves all scenarios associated with a given user and role.

    Args:
        user (str): Username to filter scenarios.
        role (str): Role of the user (e.g., "admin").

    Returns:
        dict: A list of scenarios and the currently running scenario.
    """
    from nebula.controller.database import get_all_scenarios_and_check_completed, get_running_scenario

    try:
        scenarios = get_all_scenarios_and_check_completed(username=user, role=role)
        if role == "admin":
            scenario_running = get_running_scenario()
        else:
            scenario_running = get_running_scenario(username=user)
    except Exception as e:
        logging.exception(f"Error obtaining scenarios: {e}")
        raise HTTPException(status_code=500, detail="Internal server error")

    return {"scenarios": scenarios, "scenario_running": scenario_running}

get_status() async

Check the status of the NEBULA Controller API.

Returns:

Name Type Description
dict

A status message confirming the API is running.

Source code in nebula/controller/controller.py
136
137
138
139
140
141
142
143
144
@app.get("/status")
async def get_status():
    """
    Check the status of the NEBULA Controller API.

    Returns:
        dict: A status message confirming the API is running.
    """
    return {"status": "NEBULA Controller API is running"}

get_user_by_scenario_name(scenario_name) async

Endpoint to retrieve the user assigned to a scenario.

Path Parameters: - scenario_name: Name of the scenario.

Returns user info or raises an HTTPException on error.

Source code in nebula/controller/controller.py
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
@app.get("/user/{scenario_name}")
async def get_user_by_scenario_name(
    scenario_name: Annotated[
        str, Path(regex="^[a-zA-Z0-9_-]+$", min_length=1, max_length=50, description="Valid scenario name")
    ],
):
    """
    Endpoint to retrieve the user assigned to a scenario.

    Path Parameters:
    - scenario_name: Name of the scenario.

    Returns user info or raises an HTTPException on error.
    """
    from nebula.controller.database import get_user_by_scenario_name

    try:
        user = get_user_by_scenario_name(scenario_name)
    except Exception as e:
        logging.exception(f"Error obtaining user {user}: {e}")
        raise HTTPException(status_code=500, detail="Internal server error")

    return user

list_nodes_by_scenario_name(scenario_name) async

Lists all nodes associated with a specific scenario.

Parameters:

Name Type Description Default
scenario_name str

Name of the scenario.

required

Returns:

Name Type Description
list

List of nodes.

Source code in nebula/controller/controller.py
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
@app.get("/nodes/{scenario_name}")
async def list_nodes_by_scenario_name(
    scenario_name: Annotated[
        str, Path(regex="^[a-zA-Z0-9_-]+$", min_length=1, max_length=50, description="Valid scenario name")
    ],
):
    """
    Lists all nodes associated with a specific scenario.

    Args:
        scenario_name (str): Name of the scenario.

    Returns:
        list: List of nodes.
    """
    from nebula.controller.database import list_nodes_by_scenario_name

    try:
        nodes = list_nodes_by_scenario_name(scenario_name)
    except Exception as e:
        logging.exception(f"Error obtaining nodes: {e}")
        raise HTTPException(status_code=500, detail="Internal server error")

    return nodes

list_users_controller(all_info=False) async

Endpoint to list all users in the database.

Query Parameters: - all_info (bool): If True, returns full user info as dictionaries.

Returns a list of users or raises an HTTPException on error.

Source code in nebula/controller/controller.py
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
@app.get("/user/list")
async def list_users_controller(all_info: bool = False):
    """
    Endpoint to list all users in the database.

    Query Parameters:
    - all_info (bool): If True, returns full user info as dictionaries.

    Returns a list of users or raises an HTTPException on error.
    """
    from nebula.controller.database import list_users

    try:
        user_list = list_users(all_info)
        if all_info:
            # Convert each sqlite3.Row to a dictionary so that it is JSON serializable.
            user_list = [dict(user) for user in user_list]
        return {"users": user_list}
    except Exception as e:
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error retrieving users: {e}")

node_done(scenario_name, request) async

Endpoint to forward node status to the frontend.

Receives a JSON payload and forwards it to the frontend's /node/done route for the given scenario.

Parameters: - scenario_name: Name of the scenario. - request: HTTP request with JSON body.

Returns the response from the frontend or raises an HTTPException if it fails.

Source code in nebula/controller/controller.py
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
@app.post("/nodes/{scenario_name}/done")
async def node_done(
    scenario_name: Annotated[
        str,
        Path(regex="^[a-zA-Z0-9_-]+$", min_length=1, max_length=50, description="Valid scenario name"),
    ],
    request: Request,
):
    """
    Endpoint to forward node status to the frontend.

    Receives a JSON payload and forwards it to the frontend's /node/done route
    for the given scenario.

    Parameters:
    - scenario_name: Name of the scenario.
    - request: HTTP request with JSON body.

    Returns the response from the frontend or raises an HTTPException if it fails.
    """
    url = f"http://{os.environ['NEBULA_CONTROLLER_NAME']}_nebula-frontend/platform/dashboard/{scenario_name}/node/done"

    data = await request.json()

    async with aiohttp.ClientSession() as session:
        async with session.post(url, json=data) as response:
            if response.status == 200:
                return await response.json()
            else:
                raise HTTPException(status_code=response.status, detail="Error posting data")

    return {"message": "Nodes done"}

read_root() async

Root endpoint of the NEBULA Controller API.

Returns:

Name Type Description
dict

A welcome message indicating the API is accessible.

Source code in nebula/controller/controller.py
125
126
127
128
129
130
131
132
133
@app.get("/")
async def read_root():
    """
    Root endpoint of the NEBULA Controller API.

    Returns:
        dict: A welcome message indicating the API is accessible.
    """
    return {"message": "Welcome to the NEBULA Controller API"}

remove_nodes_by_scenario_name(scenario_name=Body(..., embed=True)) async

Endpoint to remove all nodes associated with a scenario.

Body Parameters: - scenario_name: Name of the scenario whose nodes should be removed.

Returns a success message or an error if something goes wrong.

Source code in nebula/controller/controller.py
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
@app.post("/nodes/remove")
async def remove_nodes_by_scenario_name(scenario_name: str = Body(..., embed=True)):
    """
    Endpoint to remove all nodes associated with a scenario.

    Body Parameters:
    - scenario_name: Name of the scenario whose nodes should be removed.

    Returns a success message or an error if something goes wrong.
    """
    from nebula.controller.database import remove_nodes_by_scenario_name

    try:
        remove_nodes_by_scenario_name(scenario_name)
    except Exception as e:
        logging.exception(f"Error removing nodes: {e}")
        raise HTTPException(status_code=500, detail="Internal server error")

    return {"message": f"Nodes for scenario {scenario_name} removed successfully"}

remove_notes_by_scenario_name(scenario_name=Body(..., embed=True)) async

Endpoint to remove notes associated with a scenario.

Body Parameters: - scenario_name: Name of the scenario.

Returns a success message or an error if something goes wrong.

Source code in nebula/controller/controller.py
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
@app.post("/notes/remove")
async def remove_notes_by_scenario_name(scenario_name: str = Body(..., embed=True)):
    """
    Endpoint to remove notes associated with a scenario.

    Body Parameters:
    - scenario_name: Name of the scenario.

    Returns a success message or an error if something goes wrong.
    """
    from nebula.controller.database import remove_note

    try:
        remove_note(scenario_name)
    except Exception as e:
        logging.exception(f"Error removing notes: {e}")
        raise HTTPException(status_code=500, detail="Internal server error")

    return {"message": f"Notes for scenario {scenario_name} removed successfully"}

remove_scenario(scenario_name=Body(..., embed=True)) async

Removes a scenario from the database by its name.

Parameters:

Name Type Description Default
scenario_name str

Name of the scenario to remove.

Body(..., embed=True)

Returns:

Name Type Description
dict

A message indicating successful removal.

Source code in nebula/controller/controller.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
@app.post("/scenarios/remove")
async def remove_scenario(
    scenario_name: str = Body(..., embed=True),
):
    """
    Removes a scenario from the database by its name.

    Args:
        scenario_name (str): Name of the scenario to remove.

    Returns:
        dict: A message indicating successful removal.
    """
    from nebula.controller.database import remove_scenario_by_name
    from nebula.controller.scenarios import ScenarioManagement

    try:
        remove_scenario_by_name(scenario_name)
        ScenarioManagement.remove_files_by_scenario(scenario_name)
    except Exception as e:
        logging.exception(f"Error removing scenario {scenario_name}: {e}")
        raise HTTPException(status_code=500, detail="Internal server error")

    return {"message": f"Scenario {scenario_name} removed successfully"}

remove_user_controller(user=Body(..., embed=True)) async

Controller endpoint that inserts a new user into the database.

Parameters: - user: The username for the new user.

Returns a success message if the user is deleted, or an HTTP error if an exception occurs.

Source code in nebula/controller/controller.py
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
@app.post("/user/delete")
async def remove_user_controller(user: str = Body(..., embed=True)):
    """
    Controller endpoint that inserts a new user into the database.

    Parameters:
    - user: The username for the new user.

    Returns a success message if the user is deleted, or an HTTP error if an exception occurs.
    """
    from nebula.controller.database import delete_user_from_db

    try:
        delete_user_from_db(user)
        return {"detail": "User deleted successfully"}
    except Exception as e:
        logging.exception(f"Error deleting user: {e}")
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error deleting user: {e}")

run_scenario(scenario_data=Body(..., embed=True), role=Body(..., embed=True), user=Body(..., embed=True)) async

Launches a new scenario based on the provided configuration.

Parameters:

Name Type Description Default
scenario_data dict

The complete configuration of the scenario to be executed.

Body(..., embed=True)
role str

The role of the user initiating the scenario.

Body(..., embed=True)
user str

The username of the user initiating the scenario.

Body(..., embed=True)

Returns:

Name Type Description
str

The name of the scenario that was started.

Source code in nebula/controller/controller.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
@app.post("/scenarios/run")
async def run_scenario(
    scenario_data: dict = Body(..., embed=True), role: str = Body(..., embed=True), user: str = Body(..., embed=True)
):
    """
    Launches a new scenario based on the provided configuration.

    Args:
        scenario_data (dict): The complete configuration of the scenario to be executed.
        role (str): The role of the user initiating the scenario.
        user (str): The username of the user initiating the scenario.

    Returns:
        str: The name of the scenario that was started.
    """

    import subprocess

    from nebula.controller.scenarios import ScenarioManagement

    validate_physical_fields(scenario_data)

    # Manager for the actual scenario
    scenarioManagement = ScenarioManagement(scenario_data, user)

    await update_scenario(
        scenario_name=scenarioManagement.scenario_name,
        start_time=scenarioManagement.start_date_scenario,
        end_time="",
        scenario=scenario_data,
        status="running",
        role=role,
        username=user,
    )

    # Run the actual scenario
    try:
        if scenarioManagement.scenario.mobility:
            additional_participants = scenario_data["additional_participants"]
            schema_additional_participants = scenario_data["schema_additional_participants"]
            await scenarioManagement.load_configurations_and_start_nodes(
                additional_participants, schema_additional_participants
            )
        else:
            await scenarioManagement.load_configurations_and_start_nodes()
    except subprocess.CalledProcessError as e:
        logging.exception(f"Error docker-compose up: {e}")
        return

    return scenarioManagement.scenario_name

set_scenario_status_to_finished(scenario_name=Body(..., embed=True), all=Body(False, embed=True)) async

Sets the status of a scenario (or all scenarios) to 'finished'.

Parameters:

Name Type Description Default
scenario_name str

Name of the scenario to mark as finished.

Body(..., embed=True)
all bool

If True, sets all scenarios to finished.

Body(False, embed=True)

Returns:

Name Type Description
dict

A message confirming the operation.

Source code in nebula/controller/controller.py
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
@app.post("/scenarios/set_status_to_finished")
async def set_scenario_status_to_finished(
    scenario_name: str = Body(..., embed=True), all: bool = Body(False, embed=True)
):
    """
    Sets the status of a scenario (or all scenarios) to 'finished'.

    Args:
        scenario_name (str): Name of the scenario to mark as finished.
        all (bool): If True, sets all scenarios to finished.

    Returns:
        dict: A message confirming the operation.
    """
    from nebula.controller.database import scenario_set_all_status_to_finished, scenario_set_status_to_finished

    try:
        if all:
            scenario_set_all_status_to_finished()
        else:
            scenario_set_status_to_finished(scenario_name)
    except Exception as e:
        logging.exception(f"Error setting scenario {scenario_name} to finished: {e}")
        raise HTTPException(status_code=500, detail="Internal server error")

    return {"message": f"Scenario {scenario_name} status set to finished successfully"}

stop_scenario(scenario_name=Body(..., embed=True), username=Body(..., embed=True), all=Body(False, embed=True)) async

Stops the execution of a federated learning scenario and performs cleanup operations.

This endpoint
  • Stops all participant containers associated with the specified scenario.
  • Removes Docker containers and network resources tied to the scenario and user.
  • Sets the scenario's status to "finished" in the database.
  • Optionally finalizes all active scenarios if the 'all' flag is set.

Parameters:

Name Type Description Default
scenario_name str

Name of the scenario to stop.

Body(..., embed=True)
username str

User who initiated the stop operation.

Body(..., embed=True)
all bool

Whether to stop all running scenarios instead of just one (default: False).

Body(False, embed=True)

Raises:

Type Description
HTTPException

Returns a 500 status code if any step fails.

Note

This function does not currently trigger statistics generation.

Source code in nebula/controller/controller.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
@app.post("/scenarios/stop")
async def stop_scenario(
    scenario_name: str = Body(..., embed=True),
    username: str = Body(..., embed=True),
    all: bool = Body(False, embed=True),
):
    """
    Stops the execution of a federated learning scenario and performs cleanup operations.

    This endpoint:
        - Stops all participant containers associated with the specified scenario.
        - Removes Docker containers and network resources tied to the scenario and user.
        - Sets the scenario's status to "finished" in the database.
        - Optionally finalizes all active scenarios if the 'all' flag is set.

    Args:
        scenario_name (str): Name of the scenario to stop.
        username (str): User who initiated the stop operation.
        all (bool): Whether to stop all running scenarios instead of just one (default: False).

    Raises:
        HTTPException: Returns a 500 status code if any step fails.

    Note:
        This function does not currently trigger statistics generation.
    """
    from nebula.controller.scenarios import ScenarioManagement

    # ScenarioManagement.stop_participants(scenario_name)
    DockerUtils.remove_containers_by_prefix(f"{os.environ.get('NEBULA_CONTROLLER_NAME')}_{username}-participant")
    DockerUtils.remove_docker_network(
        f"{(os.environ.get('NEBULA_CONTROLLER_NAME'))}_{str(username).lower()}-nebula-net-scenario"
    )
    try:
        if all:
            scenario_set_all_status_to_finished()
        else:
            scenario_set_status_to_finished(scenario_name)
    except Exception as e:
        logging.exception(f"Error setting scenario {scenario_name} to finished: {e}")
        raise HTTPException(status_code=500, detail="Internal server error")

update_nodes(scenario_name, request) async

Updates the configuration of a node in the database and notifies the frontend.

Parameters:

Name Type Description Default
scenario_name str

The scenario to which the node belongs.

required
request Request

The HTTP request containing the node data.

required

Returns:

Name Type Description
dict

Confirmation or response from the frontend.

Source code in nebula/controller/controller.py
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
@app.post("/nodes/{scenario_name}/update")
async def update_nodes(
    scenario_name: Annotated[
        str,
        Path(regex="^[a-zA-Z0-9_-]+$", min_length=1, max_length=50, description="Valid scenario name"),
    ],
    request: Request,
):
    """
    Updates the configuration of a node in the database and notifies the frontend.

    Args:
        scenario_name (str): The scenario to which the node belongs.
        request (Request): The HTTP request containing the node data.

    Returns:
        dict: Confirmation or response from the frontend.
    """
    from nebula.controller.database import update_node_record

    try:
        config = await request.json()
        timestamp = datetime.datetime.now()
        # Update the node in database
        await update_node_record(
            str(config["device_args"]["uid"]),
            str(config["device_args"]["idx"]),
            str(config["network_args"]["ip"]),
            str(config["network_args"]["port"]),
            str(config["device_args"]["role"]),
            str(config["network_args"]["neighbors"]),
            str(config["mobility_args"]["latitude"]),
            str(config["mobility_args"]["longitude"]),
            str(timestamp),
            str(config["scenario_args"]["federation"]),
            str(config["federation_args"]["round"]),
            str(config["scenario_args"]["name"]),
            str(config["tracking_args"]["run_hash"]),
            str(config["device_args"]["malicious"]),
        )
    except Exception as e:
        logging.exception(f"Error updating nodes: {e}")
        raise HTTPException(status_code=500, detail="Internal server error")

    url = (
        f"http://{os.environ['NEBULA_CONTROLLER_NAME']}_nebula-frontend/platform/dashboard/{scenario_name}/node/update"
    )

    config["timestamp"] = str(timestamp)

    async with aiohttp.ClientSession() as session:
        async with session.post(url, json=config) as response:
            if response.status == 200:
                return await response.json()
            else:
                raise HTTPException(status_code=response.status, detail="Error posting data")

    return {"message": "Nodes updated successfully in the database"}

update_notes_by_scenario_name(scenario_name=Body(..., embed=True), notes=Body(..., embed=True)) async

Endpoint to update notes for a given scenario.

Body Parameters: - scenario_name: Name of the scenario. - notes: Text content to store as notes.

Returns a success message or an error if something goes wrong.

Source code in nebula/controller/controller.py
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
@app.post("/notes/update")
async def update_notes_by_scenario_name(scenario_name: str = Body(..., embed=True), notes: str = Body(..., embed=True)):
    """
    Endpoint to update notes for a given scenario.

    Body Parameters:
    - scenario_name: Name of the scenario.
    - notes: Text content to store as notes.

    Returns a success message or an error if something goes wrong.
    """
    from nebula.controller.database import save_notes

    try:
        save_notes(scenario_name, notes)
    except Exception as e:
        logging.exception(f"Error updating notes: {e}")
        raise HTTPException(status_code=500, detail="Internal server error")

    return {"message": f"Notes for scenario {scenario_name} updated successfully"}

update_scenario(scenario_name=Body(..., embed=True), start_time=Body(..., embed=True), end_time=Body(..., embed=True), scenario=Body(..., embed=True), status=Body(..., embed=True), role=Body(..., embed=True), username=Body(..., embed=True)) async

Updates the status and metadata of a scenario.

Parameters:

Name Type Description Default
scenario_name str

Name of the scenario.

Body(..., embed=True)
start_time str

Start time of the scenario.

Body(..., embed=True)
end_time str

End time of the scenario.

Body(..., embed=True)
scenario dict

Scenario configuration.

Body(..., embed=True)
status str

New status of the scenario (e.g., "running", "finished").

Body(..., embed=True)
role str

Role associated with the scenario.

Body(..., embed=True)
username str

User performing the update.

Body(..., embed=True)

Returns:

Name Type Description
dict

A message confirming the update.

Source code in nebula/controller/controller.py
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
@app.post("/scenarios/update")
async def update_scenario(
    scenario_name: str = Body(..., embed=True),
    start_time: str = Body(..., embed=True),
    end_time: str = Body(..., embed=True),
    scenario: dict = Body(..., embed=True),
    status: str = Body(..., embed=True),
    role: str = Body(..., embed=True),
    username: str = Body(..., embed=True),
):
    """
    Updates the status and metadata of a scenario.

    Args:
        scenario_name (str): Name of the scenario.
        start_time (str): Start time of the scenario.
        end_time (str): End time of the scenario.
        scenario (dict): Scenario configuration.
        status (str): New status of the scenario (e.g., "running", "finished").
        role (str): Role associated with the scenario.
        username (str): User performing the update.

    Returns:
        dict: A message confirming the update.
    """
    from nebula.controller.database import scenario_update_record
    from nebula.controller.scenarios import Scenario

    try:
        scenario = Scenario.from_dict(scenario)
        scenario_update_record(scenario_name, start_time, end_time, scenario, status, role, username)
    except Exception as e:
        logging.exception(f"Error updating scenario {scenario_name}: {e}")
        raise HTTPException(status_code=500, detail="Internal server error")

    return {"message": f"Scenario {scenario_name} updated successfully"}

update_user_controller(user=Body(...), password=Body(...), role=Body(...)) async

Controller endpoint that modifies a user of the database.

Parameters: - user: The username of the user. - password: The user's password. - role: The role of the user.

Returns a success message if the user is updated, or an HTTP error if an exception occurs.

Source code in nebula/controller/controller.py
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
@app.post("/user/update")
async def update_user_controller(user: str = Body(...), password: str = Body(...), role: str = Body(...)):
    """
    Controller endpoint that modifies a user of the database.

    Parameters:
    - user: The username of the user.
    - password: The user's password.
    - role: The role of the user.

    Returns a success message if the user is updated, or an HTTP error if an exception occurs.
    """
    from nebula.controller.database import update_user

    try:
        update_user(user, password, role)
        return {"detail": "User updated successfully"}
    except Exception as e:
        logging.exception(f"Error updating user: {e}")
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error updating user: {e}")

verify_user_controller(user=Body(...), password=Body(...)) async

Endpoint to verify user credentials.

Body Parameters: - user: Username. - password: Password.

Returns the user role on success or raises an error on failure.

Source code in nebula/controller/controller.py
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
@app.post("/user/verify")
async def verify_user_controller(user: str = Body(...), password: str = Body(...)):
    """
    Endpoint to verify user credentials.

    Body Parameters:
    - user: Username.
    - password: Password.

    Returns the user role on success or raises an error on failure.
    """
    from nebula.controller.database import get_user_info, list_users, verify

    try:
        user_submitted = user.upper()
        if (user_submitted in list_users()) and verify(user_submitted, password):
            user_info = get_user_info(user_submitted)
            return {"user": user_submitted, "role": user_info[2]}
        else:
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
    except Exception as e:
        logging.exception(f"Error verifying user: {e}")
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error verifying user: {e}")