Source code for rcds.challenge.config

import re
from copy import deepcopy
from itertools import tee
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Iterable,
    Optional,
    Pattern,
    Tuple,
    Union,
    cast,
)
from warnings import warn

import jsonschema  # type: ignore

from rcds import errors

from ..util import deep_merge, load_any
from ..util.jsonschema import DefaultValidatingDraft7Validator

if TYPE_CHECKING:
    from rcds import Project


config_schema = load_any(Path(__file__).parent / "challenge.schema.yaml")


[docs] class TargetNotFoundError(errors.ValidationError): pass
[docs] class TargetFileNotFoundError(TargetNotFoundError): target: Path def __init__(self, message: str, target: Path): super().__init__(message) self.target = target
[docs] class InvalidFlagError(errors.ValidationError): pass
[docs] class ConfigLoader: """ Object that manages loading challenge config files """ project: "Project" config_schema: Dict[str, Any] config_schema_validator: Any _flag_regex: Optional[Pattern[str]] = None def __init__(self, project: "Project"): """ :param rcds.Project project: project context to use """ self.project = project self.config_schema = deepcopy(config_schema) # Load flag regex if present if "flagFormat" in self.project.config: self._flag_regex = re.compile(f"^{self.project.config['flagFormat']}$") # Backend config patching for backend in [ self.project.container_backend, self.project.scoreboard_backend, ]: if backend is not None: backend.patch_challenge_schema(self.config_schema) self.config_schema_validator = DefaultValidatingDraft7Validator( schema=self.config_schema, format_checker=jsonschema.draft7_format_checker ) def _apply_defaults(self, config: Dict[str, Any]) -> Dict[str, Any]: """ Apply project-level defaults """ try: root_defaults = deepcopy(self.project.config["defaults"]) except KeyError: # No defaults present return config container_defaults = root_defaults.pop("containers", None) expose_defaults = root_defaults.pop("expose", None) # Array types with no sensible defaults - ignore them root_defaults.pop("provide", None) config = deep_merge(root_defaults, config) if container_defaults is not None and "containers" in config: for container_name, container_config in config["containers"].items(): config["containers"][container_name] = deep_merge( dict(), container_defaults, container_config ) if expose_defaults is not None and "expose" in config: for expose_config in config["expose"].values(): for i, expose_port in enumerate(expose_config): expose_config[i] = deep_merge(dict(), expose_defaults, expose_port) return config
[docs] def parse_config( self, config_file: Path ) -> Iterable[Union[errors.ValidationError, Dict[str, Any]]]: """ Load and validate a config file, returning both the config and any errors encountered. :param pathlib.Path config_file: The challenge config to load :returns: Iterable containing any errors (all instances of :class:`rcds.errors.ValidationError`) and the parsed config. The config will always be last. """ root = config_file.parent relative_path = root.resolve().relative_to(self.project.root.resolve()) config = load_any(config_file) config.setdefault("id", root.name) # derive id from parent directory name config = self._apply_defaults(config) if len(relative_path.parts) >= 2: # default category name is the parent of the challenge directory config.setdefault("category", relative_path.parts[-2]) schema_errors: Iterable[errors.SchemaValidationError] = ( errors.SchemaValidationError(str(e), e) for e in self.config_schema_validator.iter_errors(config) ) # Make a duplicate to check whethere there are errors returned schema_errors, schema_errors_dup = tee(schema_errors) # This is the same test as used in Validator.is_valid if next(schema_errors_dup, None) is not None: yield from schema_errors else: if "expose" in config: if "containers" not in config: yield TargetNotFoundError( "Cannot expose ports without containers defined" ) else: for key, expose_objs in config["expose"].items(): if key not in config["containers"]: yield TargetNotFoundError( f'`expose` references container "{key}" but ' f"it is not defined in `containers`" ) else: for expose_obj in expose_objs: if ( expose_obj["target"] not in config["containers"][key]["ports"] ): yield TargetNotFoundError( f"`expose` references port " f'{expose_obj["target"]} on container ' f'"{key}" which is not defined' ) if "provide" in config: for f in config["provide"]: if isinstance(f, str): f = Path(f) else: f = Path(f["file"]) if not (root / f).is_file(): yield TargetFileNotFoundError( f'`provide` references file "{str(f)}" which does not ' f"exist", f, ) if "flag" in config: if isinstance(config["flag"], dict): if "file" in config["flag"]: f = Path(config["flag"]["file"]) f_resolved = root / f if f_resolved.is_file(): with f_resolved.open("r") as fd: flag = fd.read().strip() config["flag"] = flag else: yield TargetFileNotFoundError( f'`flag.file` references file "{str(f)}" which does ' f"not exist", f, ) if isinstance(config["flag"], str): if self._flag_regex is not None and not self._flag_regex.match( config["flag"] ): yield InvalidFlagError( f'Flag "{config["flag"]}" does not match the flag format' ) if config["flag"].count("\n") > 0: warn( RuntimeWarning( "Flag contains multiple lines; is this intended?" ) ) yield config
[docs] def check_config( self, config_file: Path ) -> Tuple[Optional[Dict[str, Any]], Optional[Iterable[errors.ValidationError]]]: """ Load and validate a config file, returning any errors encountered. If the config file is valid, the tuple returned contains the loaded config as the first element, and the second element is None. Otherwise, the second element is an iterable of errors that occurred during validation This method wraps :meth:`parse_config`. :param pathlib.Path config_file: The challenge config to load """ load_data = self.parse_config(config_file) load_data, load_data_dup = tee(load_data) first = next(load_data_dup) if isinstance(first, errors.ValidationError): validation_errors = cast( Iterable[errors.ValidationError], filter(lambda v: isinstance(v, errors.ValidationError), load_data), ) return (None, validation_errors) else: return (first, None)
[docs] def load_config(self, config_file: Path) -> Dict[str, Any]: """ Loads a config file, or throw an exception if it is not valid This method wraps :meth:`check_config`, and throws the first error returned if there are any errors. :param pathlib.Path config_file: The challenge config to load :returns: The loaded config """ config, errors = self.check_config(config_file) if errors is not None: raise next(iter(errors)) # errors is None assert config is not None return config