Source code for rcds.project.assets

import io
import json
import os
import pathlib
import shutil
from dataclasses import dataclass
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    BinaryIO,
    ByteString,
    Callable,
    Dict,
    Iterable,
    Set,
    Tuple,
    Union,
    cast,
)
from warnings import warn

if TYPE_CHECKING:
    import rcds

    from .project import Project


File = Union[BinaryIO, Path, bytes]
"""
Something that the asset manager can interpret as a file (contents only)

Valid types:

- A :class:`pathlib.Path` object referring to a file that already exists on-disk

- Any :class:`typing.BinaryIO` object that is seekable

- A :class:`typing.ByteString` object containing the contents of the file (internally
  this is converted to a :class:`io.BytesIO`)
"""


def _is_valid_name(name: str):
    return (
        len(pathlib.PurePosixPath(name).parts) == 1
        and len(pathlib.PureWindowsPath(name).parts) == 1
    )


[docs] class AssetManagerTransaction: """ A transaction within an :class:`AssetManagerContext` This class manages declarative transactional updates to a context, allowing you to declare the files that should exist in the context, the last time that file was modified, and a callable to run to get the file, should it be out-of-date in the cache. The transaction starts in a blank state; without adding anything by calling :meth:`add`, :meth:`commit` will clear the context. No actions are performed until :meth:`commit` is called. This classs is not meant to be constructed directly, use :meth:`AssetManagerContext.transaction` """ _asset_manager_context: "AssetManagerContext" _is_active: bool @dataclass class _FileEntry: """ :meta private: """ mtime: float # Callable is wrapped in a tuple because otherwise, mypy thinks the field is a # class method (related to python/mypy#708) get_contents: Tuple[Callable[[], File]] _files: Dict[str, _FileEntry] def __init__(self, asset_manager_context: "AssetManagerContext"): """ :meta private: """ self._asset_manager_context = asset_manager_context self._is_active = True self._files = dict()
[docs] def add( self, name: str, mtime: float, contents: Union[File, Callable[[], File]] ) -> None: """ Add a file to the context :param str name: The name of the asset to add :param float mtime: The time the asset to add was modified (:attr:`os.stat_result.st_mtime`) :param contents: The contents of the file - this can either be the contents directly as a :const:`File`, or a thunk function that, when calls, returns the contents :type contents: :const:`File` or :obj:`Callable[[], File]` :raises RuntimeError: if the transaction has already been committed :raises ValueError: if the asset name is not valid """ if not self._is_active: raise RuntimeError("This transaction has already been committed") self._asset_manager_context._assert_valid_name(name) get_contents: Callable[[], File] if callable(contents): get_contents = contents else: def get_contents() -> File: return cast(File, contents) self._files[name] = self._FileEntry(mtime=mtime, get_contents=(get_contents,))
[docs] def add_file(self, name: str, file: Path): """ Add an already-existing file on disk to the context This wraps :meth:`add` :param str name: The name of the asset to add :param Path file: The path to the asset on disk """ if not file.is_file(): raise ValueError(f"Provided file does not exist: '{str(file)}'") self.add(name, file.stat().st_mtime, lambda: file)
def _create(self, fpath: Path, fentry: _FileEntry) -> None: """ Create / overwrite the asset in the cache :meta private: """ contents = fentry.get_contents[0]() if isinstance(contents, Path): if not contents.is_file(): raise ValueError(f"Provided file does not exist: '{str(contents)}'") fpath.symlink_to(contents) else: if isinstance(contents, ByteString): contents = io.BytesIO(contents) assert isinstance(contents, io.IOBase) with fpath.open("wb") as ofd: shutil.copyfileobj(contents, ofd) os.utime(fpath, (fentry.mtime, fentry.mtime))
[docs] def commit(self) -> None: """ Commit the transaction. This transaction can no longer be used after :meth:`commit` is called. """ self._is_active = False self._asset_manager_context._is_transaction_active = False files_to_delete = set(self._asset_manager_context.ls()) for name, file_entry in self._files.items(): fpath = self._asset_manager_context._get(name) try: files_to_delete.remove(name) except KeyError: pass if self._asset_manager_context.exists(name): cache_mtime = self._asset_manager_context.get_mtime(name) if not file_entry.mtime > cache_mtime: continue self._create(fpath, file_entry) self._asset_manager_context._add(name, force=True) for name in files_to_delete: fpath = self._asset_manager_context.get(name) fpath.unlink() self._asset_manager_context._rm(name) self._asset_manager_context.sync(check=True)
[docs] class AssetManagerContext: """ A subcontext within an :class:`AssetManager` Represents a namespace within the :class:`AssetManager`, essentially a subdirectory. The context holds assets for a challenge with the same id This class is not meant to be constructed directly, use :meth:`AssetManager.create_context` """ _asset_manager: "AssetManager" _name: str _root: Path _files: Set[str] _files_root: Path _manifest_file: Path _is_transaction_active: bool def __init__(self, asset_manager: "AssetManager", name: str): """ :meta private: """ self._asset_manager = asset_manager self._name = name self._files = set() self._is_transaction_active = False self._root = self._asset_manager.root / name self._root.mkdir(parents=True, exist_ok=True) self._files_root = self._root / "files" self._files_root.mkdir(exist_ok=True) self._manifest_file = self._root / "manifest.json" try: with self._manifest_file.open("r") as fd: manifest = json.load(fd) self._files = set(manifest["files"]) for fn in list(self._files): f = self._get(fn) if f.is_symlink() and not f.exists(): # Broken symlink; remove it self._files.remove(fn) f.unlink() self.sync() except FileNotFoundError: pass def _assert_valid_name(self, name: str) -> None: if not _is_valid_name(name): raise ValueError(f"Invalid asset name '{name}'")
[docs] def transaction(self) -> AssetManagerTransaction: """ Create a :class:`AssetManagerTransaction`. Only one transaction can be created at a time. :returns: The transaction :raises RuntimeError: when attempting to create a transaction while one already exists """ # TODO: better locking mechanism? if self._is_transaction_active: raise RuntimeError( "Attempted to create transaction while one is already in progress" ) self._is_transaction_active = True return AssetManagerTransaction(self)
[docs] def sync(self, *, check: bool = True): """ Syncs the manifest for this context to disk :param bool check: If true (default), check to make sure all files in the manifest exist, and that there are no extra files """ if check: disk = set(self._files_root.iterdir()) files = {self._files_root / f for f in self._files} for extra in disk - files: warn( RuntimeWarning( f"Unexpected item found in cache: '{str(extra)}'; removing..." ) ) if extra.is_dir(): shutil.rmtree(extra) else: extra.unlink() for missing in files - disk: raise RuntimeError(f"Cache item missing: '{str(missing)}'") with self._manifest_file.open("w") as fd: json.dump({"files": sorted(self._files)}, fd)
def _add(self, name: str, *, force: bool = False) -> None: """ Add an asset to the manifest :meta private: :param str name: The name of the asset :param bool force: If true, do not error if the asset already exists """ self._assert_valid_name(name) if not force and name in self._files: raise FileExistsError(f"Asset already exists: '{name}'") self._files.add(name) def _rm(self, name: str, *, force: bool = False) -> None: """ Remove an asset from the manifest :meta private: :param str name: The name of the asset :param bool force: If true, do not error if the asset does not exist """ self._assert_valid_name(name) try: self._files.remove(name) except KeyError: if not force: raise FileNotFoundError(f"Asset not found: '{name}'")
[docs] def ls(self) -> Iterable[str]: """ List all files within this context :returns: The list of asset names """ return self._files
def _get(self, name: str) -> Path: """ Retrieves the path for an asset with the given name, even if it does not already exist :meta private: """ self._assert_valid_name(name) return self._files_root / name
[docs] def exists(self, name: str) -> bool: """ Queries if an asset exists :param str name: The name of the asset :returns: Whether or not it exists """ self._assert_valid_name(name) return name in self._files
[docs] def get(self, name: str) -> Path: """ Retrieves the asset :param str name: The name of the asset :returns: The asset """ if not self.exists(name): raise FileNotFoundError(f"Asset not found: '{name}'") return self._get(name)
[docs] def get_mtime(self, name: str) -> float: """ Retrieves the time an asset was modified :param str name: The name of the asset :returns: The time the asset was modified (:attr`os.stat_result.st_mtime`) """ return self.get(name).stat().st_mtime
[docs] def clear(self) -> None: """ Clear all files in this context """ for f in self.ls(): self.get(f).unlink() self._files = set() self.sync(check=True)
[docs] class AssetManager: """ Class for managing assets from challenges that are provided to competitors This class manages all assets under a given project. """ project: "Project" root: Path def __init__(self, project: "rcds.Project"): self.project = project self.root = self.project.root / ".rcds-cache" / "assets" self.root.mkdir(parents=True, exist_ok=True)
[docs] def create_context(self, name: str) -> AssetManagerContext: """ Create a subcontext within the :class:`AssetManager` :param str name: The name of the context (challenge id) :raises ValueError: if the context name is not valid """ if not _is_valid_name(name): raise ValueError(f"Invalid context name '{name}'") return AssetManagerContext(self, name)
[docs] def list_context_names(self) -> Iterable[str]: """ List the names of all subcontexts within this :class:`AssetManager` :returns: The contexts' names. Call :meth:`create_context` on a name to obtain a :class:`AssetManagerContext` object """ for d in self.root.iterdir(): if not d.is_dir(): raise RuntimeError(f"Unexpected item found in cache: '{str(d)}'") yield d.name