class Trustworthiness():
def __init__(self, engine: Engine, config: Config):
config.reset_logging_configuration()
print_msg_box(
msg=f"Name Trustworthiness Module\nRole: {engine.rb.get_role_name()}",
indent=2,
)
self._engine = engine
self._config = config
self._trust_config = self._config.participant["trust_args"]["scenario"]
self._experiment_name = self._config.participant["scenario_args"]["name"]
self._trust_dir_files = f"/nebula/app/logs/{self._experiment_name}/trustworthiness"
self._emissions_file = 'emissions.csv'
self._role: Role = engine.rb.get_role()
self._idx = self._config.participant["device_args"]["idx"]
self._trust_workload: TrustWorkload = self._factory_trust_workload(self._role, self._engine, self._idx, self._trust_dir_files)
# EmissionsTracker from codecarbon to measure the emissions during the aggregation step in the server
self._tracker= EmissionsTracker(tracking_mode='process', log_level='error', save_to_file=False)
@property
def tw(self):
"""TrustWorkload depending on the node Role"""
return self._trust_workload
async def start(self):
await self._create_trustworthiness_directory()
await self.tw.init(self._experiment_name)
await EventManager.get_instance().subscribe_node_event(ExperimentFinishEvent, self._process_experiment_finish_event)
self._tracker.start()
async def _create_trustworthiness_directory(self):
import os
trust_dir = os.path.join(os.environ.get("NEBULA_LOGS_DIR"), self._experiment_name, "trustworthiness")
# Create a directory to save files to calcutate trust
os.makedirs(trust_dir, exist_ok=True)
os.chmod(trust_dir, 0o777)
async def _process_experiment_finish_event(self, efe: ExperimentFinishEvent):
from nebula.addons.trustworthiness.utils import save_class_count_per_participant
class_counter = self._engine.trainer.datamodule.get_samples_per_label()
save_class_count_per_participant(self._experiment_name, class_counter, self._idx)
await self.tw.finish_experiment_role_pre_actions()
last_loss, last_accuracy = self.tw.get_metrics()
# Get bytes send/received from reporter
bytes_sent = self._engine.reporter.acc_bytes_sent
bytes_recv = self._engine.reporter.acc_bytes_recv
# Get TrustWorkload info
workload = self.tw.get_workload()
sample_size = self.tw.get_sample_size()
# Last operations
save_results_csv(self._experiment_name, self._idx, bytes_sent, bytes_recv, last_loss, last_accuracy)
stop_emissions_tracking_and_save(self._tracker, self._trust_dir_files, self._emissions_file, self._role.value, workload, sample_size)
await self.tw.finish_experiment_role_post_actions(self._trust_config, self._experiment_name)
def _factory_trust_workload(self, role: Role, engine: Engine, idx, trust_files_route) -> TrustWorkload:
trust_workloads = {
Role.TRAINER: TrustWorkloadTrainer,
Role.SERVER: TrustWorkloadServer
}
trust_workload = trust_workloads.get(role)
if trust_workload:
return trust_workload(engine, idx, trust_files_route)
else:
raise TrustWorkloadException(f"Trustworthiness workload for role {role} not defined")