Source code for remoteappmanager.docker.container_manager

import os
import string
from urllib.parse import urlparse
import uuid
import random

from docker.errors import APIError, NotFound
from escapism import escape
from remoteappmanager.docker.async_docker_client import AsyncDockerClient
from remoteappmanager.docker.container import Container
from remoteappmanager.docker.docker_labels import SIMPHONY_NS_RUNINFO

from remoteappmanager.docker.image import Image
from remoteappmanager.logging.logging_mixin import LoggingMixin
from remoteappmanager.utils import (
    url_path_join,
    without_end_slash)


from tornado import gen
from traitlets import (
    Int,
    Dict,
    Set,
    Instance,
    Unicode,
    default)


#: Set of characters that are safe to use and should not be escaped by escapism

_CONTAINER_SAFE_CHARS = set(string.ascii_letters + string.digits + '-.')
_CONTAINER_ESCAPE_CHAR = '_'


[docs]class OperationInProgress(Exception): """Exception raised when the operation for the requested image or container is already in progress.""" pass
[docs]class MultipleResultsFound(Exception): """Raised when we are asking for a specific container, but more than one result is found."""
class ContainerManager(LoggingMixin): #: The container (not host) port. We decided it's 8888 by default. It will #: be mapped to a random port on the host, so that our reverse proxy can #: refer to it. container_port = Int(8888) #: The docker client configuration docker_config = Dict() # Docker realm this container manager handles. Useful to have # different instances of simphony-remote access the same docker. realm = Unicode("remoteexec") #: Tracks if a given mapping id is starting up. _start_pending = Set() #: Tracks if a given container id is stopping down. _stop_pending = Set() #: The asynchronous docker client. _docker_client = Instance(AsyncDockerClient) def __init__(self, docker_config, *args, **kwargs): """Initializes the Container manager. Parameters ---------- docker_config: Dict A dictionary containing the keywords for the configuration of the docker client in agreement to docker py documentation. realm: Unicode The docker realm """ self.docker_config = docker_config super().__init__(*args, **kwargs) @gen.coroutine def start_container(self, user_name, image_name, mapping_id, base_urlpath, volumes, environment=None): """ Starts a container using the given image name. Parameters ---------- user_name: string The name of the user image_name: string A string identifying the image name. mapping_id: str A generic id used to recognize the container. it is expected to be unique (and persistent) for a specific combination of docker image (i.e. application) and setup (i.e. configuration). base_urlpath: str The base urlpath for the current user. volumes: dict or None {volume_source: {'bind': volume_target, 'mode': volume_mode} environment: dict or None Contains additional keyvalue pairs that will be exported as environment variables inside the container. Return ------ A container object containing information about the started container. Raises ------ OperationInProgres: if the requested mapping id is already scheduled for addition """ if mapping_id in self._start_pending: raise OperationInProgress("start {}".format(mapping_id)) if environment is None: environment = {} try: self._start_pending.add(mapping_id) result = yield self._start_container(user_name, image_name, mapping_id, base_urlpath, volumes, environment) finally: self._start_pending.remove(mapping_id) return result @gen.coroutine def stop_and_remove_container(self, container_id): """Idempotent removal of a container by id. If the container is there, it will be removed. If it's not there, the unexpected conditions will be logged. Parameters ---------- container_id : str A string containing the container identifier. Raises ------ OperationInProgres: if the requested container id is already scheduled for removal. """ if container_id in self._stop_pending: raise OperationInProgress("stop {}".format(container_id)) try: self._stop_pending.add(container_id) yield self._stop_and_remove_container(container_id) finally: self._stop_pending.remove(container_id) @gen.coroutine def containers_with_labels(self, labels): filters = { 'label': ['{0}={1}'.format(k, v) for k, v in labels.items()]} containers = yield self.containers_from_filters(filters) return containers @gen.coroutine def containers_from_filters(self, filters): """Returns the currently running containers for a given filter Parameters ---------- filters: dict A dictionary of filters as in dockerpy Return ------ A list of Container objects, or an empty list if nothing is found. """ containers = [] infos = yield self._docker_client.containers(filters=filters) for info in infos: try: container = Container.from_docker_dict(info) except ValueError: self.log.exception("Unable to parse container info.") continue # override the ip and port obtained by the docker info with the # appropriate ip and port, considering that we might be using a # separate docker machine try: ip, port = yield from self._get_ip_and_port( container.docker_id) except RuntimeError: self.log.exception( "Unable to retrieve ip/port " "for container {}".format(container.docker_id)) continue container.ip = ip container.port = port containers.append(container) return containers @gen.coroutine def find_containers(self, *, url_id=None, mapping_id=None, user_name=None): """Finds and returns containers matching all the specified arguments. """ labels = { SIMPHONY_NS_RUNINFO.realm: self.realm } if url_id is not None: labels[SIMPHONY_NS_RUNINFO.url_id] = url_id if mapping_id is not None: labels[SIMPHONY_NS_RUNINFO.mapping_id] = mapping_id if user_name is not None: labels[SIMPHONY_NS_RUNINFO.user] = user_name containers = yield self.containers_with_labels(labels) return containers @gen.coroutine def find_container(self, *, url_id=None, mapping_id=None, user_name=None): """Find and returns a container matching the specified arguments. Returns the found container or None. If multiple containers match the query, it will raise MultipleResultsFound""" containers = yield self.find_containers(url_id=url_id, mapping_id=mapping_id, user_name=user_name) if len(containers) > 1: raise MultipleResultsFound( "Found {} results for request {}, {}, {}".format( len(containers), url_id, mapping_id, user_name )) if len(containers) == 1: return containers[0] return None @gen.coroutine def image(self, image_id_or_name): """Returns the Image object associated to a given id """ try: image_dict = yield self._docker_client.inspect_image( image_id_or_name) except NotFound: return None return Image.from_docker_dict(image_dict) # Private @gen.coroutine def _start_container(self, user_name, image_name, mapping_id, base_urlpath, volumes, environment): """Helper method that performs the physical operation of starting the container. If successful, returns a Container object. If any exception occurs, it logs it and re-raises an exception. """ try: image_info = yield self._docker_client.inspect_image(image_name) image_id = image_info["Id"] except NotFound as e: self.log.error('Could not find requested image {}'.format( image_name)) raise e except Exception as e: self.log.exception("Could not inspect image {}".format( image_name )) raise e self.log.info('Got container image: {}'.format(image_name)) # Check if the container is present. container = yield self.find_container( user_name=user_name, mapping_id=mapping_id) if container is not None: # Make sure we stop and remove it if by any chance is already # there. This will guarantee a fresh start every time. self.log.info('Container for image {} ' 'already present. Stopping.'.format(image_name)) yield self.stop_and_remove_container(container.docker_id) # Data volume binding to be used with Docker Client # volumes = {volume_source: {'bind': volume_target, # 'mode': volume_mode} volumes = volumes if volumes else {} # Filter away the volume sources that do not exist, # otherwise Docker would create non-existing host directory # See Docker PR #21666 filtered_volumes = {source: volumes[source] for source in volumes if os.path.exists(source)} volume_targets = [binding['bind'] for binding in filtered_volumes.values()] # Log the paths that are not being mounted if volumes.keys() - filtered_volumes.keys(): self.log.error('Path(s) does not exist, not mounting:\n%s', '\n'.join(volumes.keys() - filtered_volumes.keys())) self.log.info( 'Mounting these volumes: \n%s', '\n'.join('{0} -> {1}'.format(source, target['bind']) for source, target in filtered_volumes.items())) container_url_id = _generate_container_url_id(image_name) container_urlpath = without_end_slash( url_path_join(base_urlpath, "containers", container_url_id)) container_name = _generate_container_name(self.realm, user_name, mapping_id) create_kwargs = dict( image=image_name, name=container_name, environment=_get_container_env(user_name, container_url_id, environment, base_urlpath), volumes=volume_targets, labels=_get_container_labels(user_name, mapping_id, container_url_id, container_urlpath, self.realm)) # build the dictionary of keyword arguments for host_config host_config = dict( port_bindings={ self.container_port: None }, binds=filtered_volumes ) self.log.debug("Starting host with config: %s", host_config) host_config = yield self._docker_client.create_host_config( **host_config) # Get the host_config configuration in create_kwargs. # If it's not there, create an empty one. # Then update it with the current configuration. create_kwargs.setdefault('host_config', {}).update(host_config) resp = yield self._docker_client.create_container(**create_kwargs) container_id = resp['Id'] self.log.info("Created container '%s' (id: %s) from image %s", container_name, container_id, image_name) # start the container try: yield self._docker_client.start(container_id) except Exception as e: self.log.exception("Could not start container {}".format( container_id)) yield self.stop_and_remove_container(container_id) raise e try: ip, port = yield from self._get_ip_and_port(container_id) except Exception as e: self.log.exception( "Could not retrieve ip/port information " "for container {}".format(container_id)) yield self.stop_and_remove_container(container_id) raise e extended_id = f"{container_url_id}_{port}" container = Container( docker_id=container_id, name=container_name, image_name=image_name, image_id=image_id, mapping_id=mapping_id, ip=ip, port=port, url_id=extended_id, urlpath=container_urlpath, ) self.log.info( ("Started container '{}' (id: {}). " "Exported port reachable at {}:{}").format( container_name, container_id, ip, port ) ) return container def _get_ip_and_port(self, container_id): """Returns the ip and port where the container service can be reached. Note that this is _not_ equivalent to docker port. docker port may return 0.0.0.0 as ip when the binding is on all interfaces of the docker container. We want the public ip of the exported port of a given container. If the container is no longer present, raises RuntimeError. Parameters ---------- container_id: str The container id Return ------ A tuple (ip, port) Raises ------ RuntimeError: If for some reason it cannot retrieve the information """ # retrieve the actual port binding try: resp = yield self._docker_client.port(container_id, self.container_port) except Exception as e: raise RuntimeError("Failed to get port info for {}. " "Exception: {}.".format(container_id, str(e))) if resp is None: raise RuntimeError("Failed to get port info for {}. " "Port response was None.".format(container_id)) # We assume we are running on linux without any additional docker # machine. The container will therefore be reachable at 127.0.0.1. # If we instead have a docker machine configuration, we use the # docker url to extract the ip. ip = '127.0.0.1' base_url = self.docker_config.get("base_url") if base_url: url = urlparse(base_url) if url.scheme != 'unix': ip = url.hostname try: port = int(resp[0]['HostPort']) except (KeyError, IndexError, ValueError, TypeError) as e: raise RuntimeError("Failed to get port info for {}. " "Exception: {}.".format(container_id, str(e))) return ip, port @gen.coroutine def _get_container_info(self, container_id): """Retrieves the information about the given container id or name, and returns it. If the container is not available anymore, returns None. Parameters ---------- container_id: str The container id. The name is accepted as well Return ------ The container information as a dictionary, or None if not found """ self.log.debug("Getting container '%s'", container_id) try: container_info = yield self._docker_client.inspect_container( container_id ) except APIError as e: if e.response.status_code == 404: self.log.error("Container '%s' is gone", container_id) container_info = None elif e.response.status_code == 500: self.log.error("Container '%s' is on unhealthy node", container_id) container_info = None else: raise return container_info @gen.coroutine def _stop_and_remove_container(self, container_id): """Idempotent removal of a container by id. If the container is there, it will be removed. If it's not there, the unexpected conditions will be logged. Note: The container is only stopped if it belongs to the same realm. """ self.log.info("Stopping container {}".format(container_id)) container_info = yield self._get_container_info(container_id) if container_info is None: self.log.error('Could not find requested container {} ' 'during removal'.format(container_id)) return container = Container.from_docker_dict(container_info) if container.realm != self.realm: self.log.error( 'Container {} belongs to realm {} ' 'instead of {}. Refusing to stop.'.format( container_id, container.realm, self.realm) ) return # Technically, we have a race condition here, but it's pretty much # impossible to solve, and would only affect us if the container # id is identical. # Stop the container try: yield self._docker_client.stop(container_id) except APIError: self.log.exception( "Container '{}' could not be stopped.".format( container_id, ) ) else: self.log.info("Container '{}' is stopped.".format(container_id)) # Remove the container from docker try: yield self._docker_client.remove_container(container_id) except NotFound: self.log.error('Could not find requested container {} ' 'during removal'.format(container_id)) except APIError: self.log.exception( "Removal failed for container '{}'.".format(container_id) ) else: self.log.info("Container '{}' is removed.".format(container_id)) @default("_docker_client") def _docker_client_default(self): return AsyncDockerClient(**self.docker_config) def _get_container_env(user_name, url_id, environment, base_urlpath): """Introduces the environment variables that are available at container startup time. Parameters ---------- user_name: str The user name url_id: str A string containing the container identifier that will be used in the user-exposed URL. environment: dict Additional environment keys to add to the final result. Note that these will not take precedence. base_urlpath: str the user's base urlpath Return ------ a dictionary containing the envvars to export. """ result = {} if environment: result.update(environment) result.update(dict( # Username used to login to jupyterhub. Generally an email address. JPY_USER=user_name, # The base url. We use this one because the JPY username might # have been escaped. JPY_BASE_USER_URL=without_end_slash(base_urlpath), # A unix username. used in the container to create the user. USER=_unix_user(user_name), # The identifier that will be used for the URL. URL_ID=url_id, )) return result def _get_container_labels(user_name, mapping_id, url_id, urlpath, realm): """Returns a dictionary that will become container run-time labels. Each of these labels must be namespaced in reverse DNS style, in agreement to docker guidelines.""" return { SIMPHONY_NS_RUNINFO.user: user_name, SIMPHONY_NS_RUNINFO.mapping_id: mapping_id, SIMPHONY_NS_RUNINFO.url_id: url_id, SIMPHONY_NS_RUNINFO.urlpath: urlpath, SIMPHONY_NS_RUNINFO.realm: realm } def _generate_container_name(realm, user_name, mapping_id): """Generates a proper name for the container. It combines the prefix, username and image name after escaping. Parameters ---------- realm : string The docker realm user_name: string the user name mapping_id: string the mapping id Return ------ A string combining the three parameters in an appropriate container name, plus a random token to prevent collisions with a similarly named rogue container. NOTE: the container name is not meant for parsing. It's only for human consumption in the docker list. All information and all searching should be extracted from labels. """ escaped_realm = escape(realm, safe=_CONTAINER_SAFE_CHARS, escape_char=_CONTAINER_ESCAPE_CHAR) escaped_user_name = escape(user_name, safe=_CONTAINER_SAFE_CHARS, escape_char=_CONTAINER_ESCAPE_CHAR) escaped_mapping_id = escape(mapping_id, safe=_CONTAINER_SAFE_CHARS, escape_char=_CONTAINER_ESCAPE_CHAR) random_token = ''.join(random.choice(string.ascii_lowercase) for _ in range(10)) return "{}-{}-{}-{}".format(escaped_realm, escaped_user_name, escaped_mapping_id, random_token ) def _generate_container_url_id(image_name=None): """ If a complete image name is provided, parses and returns the image name, otherwise generates a unique string to identify the container. Parameters ---------- image_name : str, default None The image name, in the format simphony-remote/simphony-application. Returns ------- str The container identifier. """ if image_name: return image_name.split("/")[-1] return uuid.uuid4().hex def _unix_user(username): """Converts a username from the jupyterhub login to a proper username for the docker virtual machine""" if '@' in username: return username.split('@')[0] # The username in the container is not critical, it's mostly a gimmick. # If it's not an email address (which is what we use to authenticate # users, we just prevent it to return a potentially invalid username # and just return the string user, which is good enough return 'user'