import base64
import collections.abc
import hashlib
import json
from pathlib import Path, PurePosixPath
from typing import TYPE_CHECKING, Any, Dict, Iterable, Iterator, Type, Union, cast
import docker # type: ignore
import pathspec # type: ignore
if TYPE_CHECKING:
from ..project import Project
from .challenge import Challenge
def flatten(i: Iterable[Union[str, Iterable[str]]]) -> Iterable[str]:
for x in i:
if isinstance(x, collections.abc.Iterable) and not isinstance(x, str):
yield from (y for y in x)
else:
yield x
[docs]
def get_context_files(root: Path) -> Iterator[Path]:
"""
Generate a list of all files in the build context of the specified Dockerfile
:param pathlib.Path root: Path to the containing directory of the Dockerfile to
analyze
"""
files: Iterator[Path] = root.rglob("*")
dockerignore = root / ".dockerignore"
if dockerignore.exists():
with dockerignore.open("r") as fd:
spec = pathspec.PathSpec.from_lines(
"gitwildmatch",
flatten(
# pathspec's behavior with negated patterns is different than that
# of docker (and its own behavior with non-negated patterns) in that
# patterns ending with `/` will match files in subdirectories, but
# not a file with the same name, and pattens not ending in `/` will
# only match files, but not files in subdirectories. For example,
# the pattern `!/a` will exclude `a`, but not `a/b`, and the pattern
# `!/a/` will exclude `a/b`, but not `a`. Since docker treats these
# interchangeably, we automatically insert the corresponding ignore
# rule into the rules list if a negated pattern is detected (insert
# `!/a/` if `!/a` is detected and vice versa).
# FIXME: normalize / parse the lines better to support e.g. comments
(
(
[line[:-2] + "\n", line]
if line[-2] == "/"
else [line, line[:-1] + "/\n"]
)
if line[0] == "!"
else line
)
for line in fd
),
)
files = filter(lambda p: not spec.match_file(p.relative_to(root)), files)
return filter(lambda p: p.is_file(), files)
[docs]
def generate_sum(root: Path) -> str:
"""
Generate a checksum of all files in the build context of the specified directory
:param pathlib.Path root: Path to the containing directory of the Dockerfile to
analyze
"""
h = hashlib.sha256()
for f in sorted(get_context_files(root), key=lambda f: str(f.relative_to(root))):
h.update(bytes(f.relative_to(root)))
with f.open("rb") as fd:
for chunk in iter(lambda: fd.read(524288), b""):
h.update(chunk)
return h.hexdigest()
[docs]
class Container:
"""
A single container
"""
manager: "ContainerManager"
challenge: "Challenge"
project: "Project"
name: str
config: Dict[str, Any]
IS_BUILDABLE: bool = False
def __init__(self, *, container_manager: "ContainerManager", name: str) -> None:
self.manager = container_manager
self.challenge = self.manager.challenge
self.project = self.challenge.project
self.name = name
self.config = container_manager.config[self.name]
[docs]
def get_full_tag(self) -> str:
"""
Get the full image tag (e.g. ``k8s.gcr.io/etcd:3.4.3-0``) for this container
:returns: The image tag
"""
return self.config["image"]
[docs]
def is_built(self) -> bool:
"""
If the container is buildable (:const:`IS_BUILDABLE` is `True`), this method
returns whether or not the container is already built (and up-to-date). For
non-buildable containers, this method always returns `True`.
:returns: Whether or not the container is built
"""
return True
[docs]
def build(self, force: bool = False) -> None:
"""
Build the challenge if applicable and necessary.
For challenges that are not buildable (:const:`IS_BUILDABLE` is False), this
method is a no-op
:param bool force: Force a rebuild of this container even if it is up-to-date
"""
pass
[docs]
class BuildableContainer(Container):
"""
A container that is built from source
"""
root: Path
dockerfile: str
buildargs: Dict[str, str]
IS_BUILDABLE: bool = True
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
build = self.config.get("build", None)
assert build is not None
if isinstance(build, str):
self.root = self.challenge.root / Path(build)
self.dockerfile = "Dockerfile"
self.buildargs = dict()
elif isinstance(build, dict):
build = cast(Dict[str, Any], build)
self.root = self.challenge.root / Path(build["context"])
self.dockerfile = build.get("dockerfile", "Dockerfile")
self.buildargs = cast(Dict[str, str], build.get("args", dict()))
self.content_hash = generate_sum(self.root)
self.image = self.manager.get_docker_image(self)
def _build(self) -> None:
self.project.docker_client.images.build(
path=str(self.root),
tag=f"{self.image}:{self.content_hash}",
dockerfile=self.dockerfile,
buildargs=self.buildargs,
pull=True,
rm=True,
)
self.project.docker_client.images.push(
self.image, tag=self.content_hash, auth_config=self.manager._auth_config
)
[docs]
def get_full_tag(self) -> str:
return f"{self.image}:{self.content_hash}"
[docs]
def is_built(self) -> bool:
"""
Checks if a container built with a build context with a matching hash exists,
either locally or remotely.
:returns: Whether or not the image was found
"""
try:
self.project.docker_client.images.get_registry_data(
self.get_full_tag(), auth_config=self.manager._auth_config
)
return True
except docker.errors.NotFound:
pass # continue
return False
[docs]
def build(self, force: bool = False) -> None:
if force or not self.is_built():
self._build()
class _AuthCfgCache:
_cache: Dict[str, Dict[str, str]] = dict() # class-level
def get_auth_config(self, registry: str, api_client) -> Dict[str, str]:
if registry not in self._cache:
header = docker.auth.get_config_header(api_client, registry)
if header is not None:
auth_config = json.loads(
base64.urlsafe_b64decode(header).decode("ascii")
)
else:
auth_config = None
self._cache[registry] = auth_config
return self._cache[registry]
_auth_cfg_cache = _AuthCfgCache()
[docs]
class ContainerManager:
"""
Object managing all containers defined by a given :class:`rcds.Challenge`
"""
challenge: "Challenge"
project: "Project"
config: Dict[str, Dict[str, Any]]
containers: Dict[str, Container]
_auth_config: Dict[str, str]
[docs]
def __init__(self, challenge: "Challenge"):
"""
:param rcds.Challenge challenge: The challenge that this ContainerManager
belongs to
"""
self.challenge = challenge
self.project = self.challenge.project
self.containers = dict()
self.config = cast(
Dict[str, Dict[str, Any]], self.challenge.config.get("containers", dict())
)
self._auth_config = self._get_auth_config()
for name in self.config.keys():
container_config = self.config[name]
container_constructor: Type[Container]
if "build" in container_config:
container_constructor = BuildableContainer
else:
container_constructor = Container
self.containers[name] = container_constructor(
container_manager=self, name=name
)
container_config["image"] = self.containers[name].get_full_tag()
def get_docker_image(self, container: Container) -> str:
image_template = self.project.jinja_env.from_string(
self.project.config["docker"]["image"]["template"]
)
template_context = {
"challenge": self.challenge.config,
"container": dict(container.config),
}
template_context["container"]["name"] = container.name
image = image_template.render(template_context)
# FIXME: better implementation than abusing PosixPath?
return str(
PurePosixPath(self.project.config["docker"]["image"]["prefix"]) / image
)
def _get_auth_config(self) -> Dict[str, str]:
registry, _ = docker.auth.resolve_repository_name(
self.project.config["docker"]["image"]["prefix"]
)
return _auth_cfg_cache.get_auth_config(registry, self.project.docker_client.api)