Skip to content

Documentation for Api Module

delete_certs()

Remove all certificate files from the system.

Source code in nebula/physical/api.py
226
227
228
229
230
231
232
233
234
@app.route("/certs/", methods=["DELETE"])
def delete_certs():
    """Remove **all** certificate files from the system."""
    certs_files = _find_x_files(CERTS_FOLDER, ".cert")
    removed: Dict[str, str] = {}
    for fn in certs_files:
        os.remove(fn)
        removed[fn] = "deleted"
    return jsonify(removed)

delete_config()

Remove the config .json from the given run directory.

Source code in nebula/physical/api.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@app.route("/config/", methods=["DELETE"])
def delete_config():
    """Remove the config *.json* from the given run directory."""
    path = request.args.get("path", "")
    if _LFI_sentry(path):
        _json_abort(404, "Item not found")

    json_files = _find_x_files(os.path.join(CONFIG_FOLDER, path))
    if len(json_files) != CONFIG_FILE_COUNT:
        _json_abort(404, "Item not found")

    fn = json_files.pop()
    os.remove(fn)
    return jsonify(filename=fn)

delete_dataset()

Delete both dataset .h5 files from the specified run directory.

Source code in nebula/physical/api.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
@app.route("/dataset/", methods=["DELETE"])
def delete_dataset():
    """Delete both dataset *.h5* files from the specified run directory."""
    path = request.args.get("path", "")
    if _LFI_sentry(path):
        _json_abort(404, "Item not found")

    data_files = _find_x_files(os.path.join(CONFIG_FOLDER, path), ".h5")
    if len(data_files) != DATASET_FILE_COUNT:
        _json_abort(404, "Item not found")

    removed: Dict[str, str] = {}
    for fn in data_files:
        os.remove(fn)
        removed[fn] = "deleted"
    return jsonify(removed)

delete_logs()

Delete the main .log for the requested run.

Source code in nebula/physical/api.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
@app.route("/get_logs/", methods=["DELETE"])
def delete_logs():
    """Delete the main *.log* for the requested run."""
    path = request.args.get("path", "")
    if _LFI_sentry(path):
        _json_abort(404, "Item not found")

    log_files = _find_x_files(os.path.join(LOGS_FOLDER, path), ".log")
    if not log_files:
        _json_abort(404, "Log file not found")

    target = min(log_files, key=lambda x: len(os.path.basename(x)))
    os.remove(target)
    return jsonify(filename=target)

get_certs()

Download every .cert file in a ZIP archive.

Source code in nebula/physical/api.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
@app.route("/certs/", methods=["GET"])
def get_certs():
    """Download every *.cert* file in a ZIP archive."""
    certs_files = _find_x_files(CERTS_FOLDER, ".cert")
    if not certs_files:
        _json_abort(404, "No cert files found")

    buf = io.BytesIO()
    with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
        for f in certs_files:
            zf.write(f, arcname=os.path.basename(f))
    buf.seek(0)

    return send_file(buf, mimetype="application/zip",
                     download_name="certs.zip", as_attachment=True)

get_config()

Return the single .json config file for the requested run.

Source code in nebula/physical/api.py
77
78
79
80
81
82
83
84
85
86
87
88
89
@app.route("/config/", methods=["GET"])
def get_config():
    """Return the single *.json* config file for the requested run."""
    path = request.args.get("path", "")
    if _LFI_sentry(path):
        _json_abort(404, "Item not found")

    json_files = _find_x_files(os.path.join(CONFIG_FOLDER, path))
    if len(json_files) != CONFIG_FILE_COUNT:
        _json_abort(404, "Item not found")

    return send_file(json_files.pop(), mimetype="application/json",
                     as_attachment=True)

get_dataset()

Deliver both .h5 datasets as a single ZIP archive.

Returning a single payload simplifies transfer, cache-control and client code compared to sending two independent responses.

Source code in nebula/physical/api.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@app.route("/dataset/", methods=["GET"])
def get_dataset():
    """
    Deliver both *.h5* datasets as a single ZIP archive.

    Returning a single payload simplifies transfer, cache-control and client
    code compared to sending two independent responses.
    """
    path = request.args.get("path", "")
    if _LFI_sentry(path):
        _json_abort(404, "Item not found")

    h5_files = _find_x_files(os.path.join(CONFIG_FOLDER, path), ".h5")
    if len(h5_files) != DATASET_FILE_COUNT:
        _json_abort(404, "Item not found")

    buf = io.BytesIO()
    with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
        for f in h5_files:
            zf.write(f, arcname=os.path.basename(f))
    buf.seek(0)

    return send_file(buf, mimetype="application/zip",
                     download_name="dataset.zip", as_attachment=True)

get_logs()

Download the main .log produced during training.

Source code in nebula/physical/api.py
247
248
249
250
251
252
253
@app.route("/get_logs/", methods=["GET"])
def get_logs():
    """Download the main *.log* produced during training."""
    path = request.args.get("path", "")
    if _LFI_sentry(path):
        _json_abort(404, "Item not found")
    return _send_single_log(os.path.join(LOGS_FOLDER, path), ".log")

get_metrics()

Bundle every file under METRICS_FOLDER into a ZIP archive.

Source code in nebula/physical/api.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
@app.route("/metrics/", methods=["GET"])
def get_metrics():
    """Bundle every file under *METRICS_FOLDER* into a ZIP archive."""
    log_files = _find_x_files(METRICS_FOLDER, "")
    if not log_files:
        _json_abort(404, "Log file not found")

    buf = io.BytesIO()
    with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
        for f in log_files:
            zf.write(f, arcname=os.path.basename(f))
    buf.seek(0)

    return send_file(buf, mimetype="application/zip",
                     download_name="metrics.zip", as_attachment=True)

run()

Spawn the federated training process (once).

Returns

JSON {pid, state}

Source code in nebula/physical/api.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
@app.route("/run/", methods=["GET"])
def run():
    """
    Spawn the federated training process (once).

    Returns
    -------
    JSON {pid, state}
    """
    json_files = _find_x_files(CONFIG_FOLDER)
    if len(json_files) != CONFIG_FILE_COUNT:
        _json_abort(404, "Config file not found")

    global TRAINING_PROC
    if TRAINING_PROC and TRAINING_PROC.poll() is None:
        _json_abort(409, "Training already running")

    cmd = ["python", "/home/dietpi/prueba/nebula/nebula/node.py", json_files[0]]
    TRAINING_PROC = subprocess.Popen(cmd)

    return jsonify(pid=TRAINING_PROC.pid, state="running")

set_cert()

Upload one .cert file to the global certificates folder.

Source code in nebula/physical/api.py
213
214
215
216
217
218
219
220
221
222
223
@app.route("/certs/", methods=["PUT"])
def set_cert():
    """Upload one *.cert* file to the global certificates folder."""
    if "cert" not in request.files:
        _json_abort(400, "Missing file field 'cert'")

    uploaded = request.files["cert"]
    dst = os.path.join(CERTS_FOLDER, uploaded.filename)
    uploaded.save(dst)

    return jsonify(filename=uploaded.filename), 201

set_config()

Upload a config .json for the provided run directory.

Source code in nebula/physical/api.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@app.route("/config/", methods=["PUT"])
def set_config():
    """Upload a config *.json* for the provided run directory."""
    if "config" not in request.files:
        _json_abort(400, "Missing file field 'config'")

    path = request.args.get("path", "")
    if _LFI_sentry(path):
        _json_abort(404, "Item not found")

    os.makedirs(os.path.join(CONFIG_FOLDER, path), exist_ok=True)

    uploaded = request.files["config"]
    dst = os.path.join(CONFIG_FOLDER, path, uploaded.filename)
    uploaded.save(dst)

    return jsonify(filename=uploaded.filename), 201

set_dataset()

Upload the pair of train/test .h5 files for a run.

Source code in nebula/physical/api.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
@app.route("/dataset/", methods=["PUT"])
def set_dataset():
    """Upload the pair of train/test *.h5* files for a run."""
    missing = [fld for fld in ("dataset", "dataset_p") if fld not in request.files]
    if missing:
        _json_abort(400, f"Missing file field(s): {', '.join(missing)}")

    path = request.args.get("path", "")
    if _LFI_sentry(path):
        _json_abort(404, "Item not found")

    os.makedirs(os.path.join(CONFIG_FOLDER, path), exist_ok=True)
    stored: List[str] = []

    for fld in ("dataset", "dataset_p"):
        up = request.files[fld]
        dst = os.path.join(CONFIG_FOLDER, path, up.filename)
        up.save(dst)
        stored.append(up.filename)

    return jsonify(stored), 201

setup_new_run()

Prepare a new federated-learning round.

Expected multipart-form fields

  • config – JSON with scenario, network and security arguments
  • global_test – shared evaluation dataset (*.h5)
  • train_set – participant-specific training dataset (*.h5)

The function rewrites paths inside config, validates neighbour IPs through Tailscale, deletes previous artefacts and finally stores the new trio of files.

Source code in nebula/physical/api.py
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
@app.route("/setup/", methods=["PUT"])
def setup_new_run():
    """
    Prepare a **new** federated-learning round.

    Expected multipart-form fields
    -------------------------------
    * **config**     – JSON with scenario, network and security arguments  
    * **global_test** – shared evaluation dataset (`*.h5`)  
    * **train_set**   – participant-specific training dataset (`*.h5`)

    The function rewrites paths inside *config*, validates neighbour IPs
    through Tailscale, deletes previous artefacts and finally stores the new
    trio of files.
    """
    # 1 · Refuse while a training task is still running
    global TRAINING_PROC
    if TRAINING_PROC and TRAINING_PROC.poll() is None:
        _json_abort(409, "Training already running; pause or stop it first.")

    # 2 · Check field presence
    missing = [x for x in ("config", "global_test", "train_set")
               if x not in request.files]
    if missing:
        _json_abort(400, f"Missing file field(s): {', '.join(missing)}")

    config_up   = request.files["config"]
    global_test = request.files["global_test"]
    train_set   = request.files["train_set"]

    # 3 · Extension sanity
    if not config_up.filename.endswith(".json"):
        _json_abort(400, f"`{config_up.filename}` must have a .json extension.")
    for ds in (global_test, train_set):
        if not ds.filename.endswith(".h5"):
            _json_abort(400, f"`{ds.filename}` must have a .h5 extension.")

    # 4 · Parse + patch JSON
    try:
        original_cfg = json.load(config_up)
    except Exception as exc:  # broad – any parsing failure should abort
        _json_abort(400, f"Invalid JSON file: {exc}")

    # Update tracking / security paths to local folders
    tracking = original_cfg.get("tracking_args", {})
    tracking["log_dir"]    = LOGS_FOLDER.rstrip("/")
    tracking["config_dir"] = CONFIG_FOLDER.rstrip("/")
    original_cfg["tracking_args"] = tracking

    sec = original_cfg.get("security_args", {})
    for key in ("certfile", "keyfile", "cafile"):
        if key in sec and sec[key]:
            sec[key] = os.path.join(CERTS_FOLDER.rstrip("/"),
                                    os.path.basename(sec[key]))
    original_cfg["security_args"] = sec

    # 5 · (May be removed) Check neighbour reachability via Tailscale
    neigh_str = original_cfg.get("network_args", {}).get("neighbors", "").strip()
    requested_ips: Set[str] = {re.split(r":", n)[0] for n in neigh_str.split() if n}

    if requested_ips:
        try:
            ts_out = subprocess.run(
                ["tailscale", "status", "--json"],
                capture_output=True, text=True, check=True,
            )
            ts_status = json.loads(ts_out.stdout)
            reachable: Set[str] = set(ts_status.get("Self", {}).get("TailscaleIPs", []))
            for peer in ts_status.get("Peer", {}).values():
                reachable.update(peer.get("TailscaleIPs", []))
        except Exception as exc:
            _json_abort(400, f"Could not verify neighbours via Tailscale: {exc}")

        missing = sorted(ip for ip in requested_ips if ip not in reachable)
        if missing:
            _json_abort(400, f"Neighbour IP(s) not reachable: {', '.join(missing)}")

    # 6 · Clean previous JSON/H5 artefacts
    for fn in os.listdir(CONFIG_FOLDER):
        if fn.endswith((".json", ".h5")):
            try:
                os.remove(os.path.join(CONFIG_FOLDER, fn))
            except OSError:
                pass
    if any(fn.endswith((".json", ".h5")) for fn in os.listdir(CONFIG_FOLDER)):
        _json_abort(400, "Could not delete old JSON/H5 files.")

    # 7 · Persist patched JSON
    json_dest = os.path.join(CONFIG_FOLDER, config_up.filename)
    with open(json_dest, "wb") as dst:
        dst.write(json.dumps(original_cfg, indent=2).encode())

    # 8 · Persist datasets
    saved = [config_up.filename]
    for up in (global_test, train_set):
        dst = os.path.join(CONFIG_FOLDER, up.filename)
        up.save(dst)
        saved.append(up.filename)

    # 9 · Purge previous log files
    for root, _, files in os.walk(LOGS_FOLDER):
        for fn in files:
            if fn.endswith(".log"):
                try:
                    os.remove(os.path.join(root, fn))
                except OSError:
                    pass
    if any(fn.endswith(".log") for _, _, fns in os.walk(LOGS_FOLDER) for fn in fns):
        _json_abort(400, "Could not delete old log files.")

    return jsonify(saved), 201

stop()

Terminate the running training process (SIGTERM) and wait for it.

Source code in nebula/physical/api.py
359
360
361
362
363
364
365
366
367
368
369
370
371
@app.route("/stop/", methods=["GET"])
def stop():
    """Terminate the running training process (SIGTERM) and wait for it."""
    global TRAINING_PROC
    if not TRAINING_PROC or TRAINING_PROC.poll() is not None:
        _json_abort(404, "No training running")

    TRAINING_PROC.send_signal(signal.SIGTERM)
    TRAINING_PROC.wait()
    pid = TRAINING_PROC.pid
    TRAINING_PROC = None

    return jsonify(pid=pid, state="stopped")