importcontextlibimportimportlib.metadataimportjsonimportosimportshutilimportwarningsfromdatetimeimportdatetimefrompathlibimportPathimportshortuuidfrom...importconfigfrom...exceptionsimportBsbProvenanceUpgradeWarningfrom...servicesimportMPILockfrom..decoratorsimporton_main_untilfrom..interfacesimportEngine,NoopLockfrom..interfacesimportStorageNodeasIStorageNodefrom..provenanceimportbuild_root_metadatafrom.file_storeimportFileStore,_atomic_write_bytes# noqa: F401_METADATA_FILENAME="metadata.json"_LEGACY_VERSIONS_FILENAME="versions.txt"def_metadata_path(root:str)->Path:returnPath(root)/_METADATA_FILENAMEdef_legacy_versions_path(root:str)->Path:returnPath(root)/_LEGACY_VERSIONS_FILENAMEdef_write_metadata(root:str,payload:dict)->None:# Serialise the provenance bundle and write it through the same atomic# writer the file store uses, so there is a single race-safe write path.blob=json.dumps(payload,sort_keys=True).encode("utf-8")_atomic_write_bytes(_metadata_path(root),blob,root)
[docs]classFileSystemEngine(Engine):def__init__(self,root,comm):super().__init__(root,comm)self._lock=MPILock.sync()self._readonly=Falseself._upgrade_if_needed()@propertydefroot_slug(self):returnos.path.relpath(self._root)@propertydefversions(self):md=self.metadataifmd:return{"bsb":md.get("bsb_core_version"),"engine":"fs","version":md.get("engine_version")ormd.get("bsb_core_version"),}# Pre-upgrade legacy fallback.legacy=_legacy_versions_path(self._root)iflegacy.exists():returnjson.loads(legacy.read_text())return{}@propertydefmetadata(self)->dict:path=_metadata_path(self._root)ifnotpath.exists():return{}try:returnjson.loads(path.read_text())except(OSError,json.JSONDecodeError):return{}def_bump_state(self)->None:ifself._readonly:return# Serialise the read-modify-write through the engine lock so concurrent# ranks can't clobber each other's metadata.json.withself._write():md=self.metadataifnotmd:returnmd["state_id"]=int(md.get("state_id",0))+1_write_metadata(self._root,md)def_upgrade_if_needed(self):ifself._readonlyornotself.exists():returnif_metadata_path(self._root).exists():returnwithself._write():# Re-check under the lock: another rank may have upgraded already,# so only the first rank in writes the bundle and warns.if_metadata_path(self._root).exists():returnbundle=build_root_metadata(engine_name="fs",engine_version=importlib.metadata.version("bsb-core"),mpi_size=self.comm.get_size(),)bundle["state_id"]=1_write_metadata(self._root,bundle)legacy=_legacy_versions_path(self._root)iflegacy.exists():withcontextlib.suppress(OSError):legacy.unlink()warnings.warn("Auto-upgraded legacy FS storage with a fresh storage_id and ""provenance bundle.",category=BsbProvenanceUpgradeWarning,stacklevel=3,)
[docs]@config.nodeclassStorageNode(IStorageNode):root=config.attr(type=str,default=_get_default_root,call_default=True)""" Path to the filesystem storage file. """