Source code for remoteappmanager.cli.remoteappdb.__main__

#!/usr/bin/env python
"""Script to perform operations on the database of our application."""
import os
import sys
import uuid
from requests.exceptions import ConnectionError

import sqlalchemy.exc
import sqlalchemy.orm.exc
import click
import tabulate

from remoteappmanager.db import orm
from remoteappmanager.utils import parse_volume_string


[docs]def sqlite_url_to_path(url): """Converts a sqlalchemy sqlite url to the disk path. Parameters ---------- url: str A "sqlite:///" path Returns ------- str: The disk path. """ if not url.startswith("sqlite:///"): raise ValueError("Cannot find sqlite") return url[len("sqlite:///"):]
[docs]def normalise_to_url(url_or_path): """Normalises a disk path to a sqlalchemy url Parameters ---------- url_or_path: str a sqlalchemy url or a disk path Returns ------- str A sqlalchemy url """ if ":" not in url_or_path: db_url = "sqlite:///"+os.path.expanduser(url_or_path) else: db_url = url_or_path return db_url
[docs]def database(db_url): """Retrieves the orm.Database object from the passed db url. Parameters ---------- db_url : str A string containing a db sqlalchemy url. Returns ------- orm.Database instance. """ return orm.Database(url=db_url)
[docs]def get_docker_client(): """ Returns docker.APIClient object using the local environment variables """ # dependencies of docker-py is optional for this script try: import docker except ImportError: print_error('docker-py is not installed. ' 'Try pip install docker-py') raise client = docker.from_env() try: client.info() except ConnectionError as exception: # ConnectionError occurs, say, if the docker machine is not running # or if the shell is not in a docker VM (for Mac/Windows) print_error('docker client fails to connect.') raise exception return client.api
[docs]def is_sqlitedb_url(db_url): """Returns True if the url refers to a sqlite database""" return db_url.startswith("sqlite:///")
[docs]def sqlitedb_present(db_url): """Checks if the db url is present. Remote urls are always assumed to be present, so this method concerns mostly sqlite databases.""" if not db_url.startswith("sqlite:///"): raise ValueError("db_url {} does not refer to a " "sqlite database.".format(db_url)) path = sqlite_url_to_path(db_url) return os.path.exists(path)
class RemoteAppDBContext(object): def __init__(self, db_url): db_url = normalise_to_url(db_url) self.db = database(db_url) @click.group() @click.argument("db", type=click.STRING, default="sqlite:///sqlite.db") @click.pass_context def cli(ctx, db): """Remote application database manager. Performs administrative operations on the database contents.""" ctx.obj = RemoteAppDBContext(db_url=db) ctx.obj.session = ctx.obj.db.create_session() @ctx.call_on_close def close_session(): ctx.obj.session.close() @cli.command() @click.pass_context def init(ctx): """Initializes the database.""" db = ctx.obj.db db_url = db.url # Check if the database already exists if is_sqlitedb_url(db_url) and sqlitedb_present(db_url): raise click.UsageError("Refusing to overwrite database " "at {}".format(db_url)) db.reset() # ------------------------------------------------------------------------- # User commands @cli.group() @click.pass_context def user(ctx): """Subcommand to manage users.""" db = ctx.obj.db db_url = db.url # sqlite driver for sqlalchemy creates an empty file on commit as a side # effect. We don't want this creation to happen, so before attempting # the creation we stop short if we already find out that the file is # missing and cannot possibly be initialized. if is_sqlitedb_url(db_url) and not sqlitedb_present(db_url): raise click.UsageError("Could not find database at {}".format(db_url)) @user.command() @click.argument("user") @click.pass_context def create(ctx, user): # noqa: F811 """Creates a user USER in the database.""" session = ctx.obj.session orm_user = orm.User(name=user) try: with orm.transaction(session): session.add(orm_user) except sqlalchemy.exc.IntegrityError: print_error("User {} already exists".format(user)) else: # Print out the id, so that we can use it if desired. print(orm_user.id) @user.command() @click.argument("user") @click.pass_context def remove(ctx, user): # noqa: F811 """Removes a user.""" session = ctx.obj.session try: with orm.transaction(session): orm_user = session.query(orm.User).filter( orm.User.name == user).one() session.delete(orm_user) except sqlalchemy.orm.exc.NoResultFound: print_error("Could not find user {}".format(user)) @user.command() @click.option('--no-decoration', is_flag=True, help="Disable table decorations") @click.option('--show-apps', is_flag=True, help="Shows the applications each user " "is allowed to run") @click.pass_context def list(ctx, no_decoration, show_apps): # noqa: F811 """Show a list of the available users.""" if no_decoration: format = "plain" headers = [] else: format = "simple" headers = ["ID", "Name"] if show_apps: headers += ["App", "License", "Home", "View", "Common", "Vol. Source", "Vol. Target", "Vol. Mode", "Allow Startup Data"] session = ctx.obj.session table = [] with orm.transaction(session): for user in session.query(orm.User).all(): cur = [user.id, user.name] table.append(cur) if show_apps: apps = [[entry.application.image, entry.application_policy.app_license, entry.application_policy.allow_home, entry.application_policy.allow_view, entry.application_policy.allow_common, entry.application_policy.volume_source, entry.application_policy.volume_target, entry.application_policy.volume_mode, entry.application_policy.allow_startup_data] for entry in orm.accounting_for_user(session, user)] if len(apps) == 0: apps = [['']*8] cur.extend(apps[0]) for app in apps[1:]: table.append(['', ''] + app) print(tabulate.tabulate(table, headers=headers, tablefmt=format)) # ------------------------------------------------------------------------- # App commands @cli.group() @click.pass_context def app(ctx): """Subcommand to manage applications.""" db = ctx.obj.db db_url = db.url if is_sqlitedb_url(db_url) and not sqlitedb_present(db_url): raise click.UsageError("Could not find database at {}".format(db_url)) @app.command() # noqa @click.argument("image") @click.option('--verify/--no-verify', default=True, help="Verify image name against docker.") @click.pass_context def create(ctx, image, verify): # noqa: F811 """Creates a new application for image IMAGE.""" # Verify if `image` is an existing docker image # in this machine if verify: msg = ('{error}. You may consider skipping verifying ' 'image name against docker with --no-verify.') try: client = get_docker_client() client.inspect_image(image) except Exception as exception: raise click.BadParameter(msg.format(error=str(exception)), ctx=ctx) session = ctx.obj.session try: with orm.transaction(session): orm_app = orm.Application(image=image) session.add(orm_app) except sqlalchemy.exc.IntegrityError: print_error("Application for image {} already exists".format(image)) else: print(orm_app.id) @app.command() # noqa @click.argument("image") @click.pass_context def remove(ctx, image): # noqa: F811 """Removes an application from the list.""" session = ctx.obj.session try: with orm.transaction(session): app = session.query(orm.Application).filter( orm.Application.image == image).one() session.delete(app) except sqlalchemy.orm.exc.NoResultFound: print_error("Could not find application for image {}".format(image)) @app.command() # noqa @click.option('--no-decoration', is_flag=True, help="Disable table decorations") @click.pass_context def list(ctx, no_decoration): # noqa: F811 """List all registered applications.""" if no_decoration: tablefmt = "plain" headers = [] else: tablefmt = "simple" headers = ["ID", "Image"] table = [] session = ctx.obj.session with orm.transaction(session): for orm_app in session.query(orm.Application).all(): table.append([orm_app.id, orm_app.image]) print(tabulate.tabulate(table, headers=headers, tablefmt=tablefmt)) @app.command() @click.argument("image") @click.argument("user") @click.option("--app_license", type=click.STRING, help="Application license (if required)") @click.option("--allow-home", is_flag=True, help="Enable mounting of home directory") @click.option("--allow-view", is_flag=True, help="Enable third-party visibility of the running container.") @click.option("--volume", type=click.STRING, help="Application data volume, format=SOURCE:TARGET:MODE, " "where mode is 'ro' or 'rw'.") @click.option("--allow-startup-data", is_flag=True, help="Allow user to provide a file for the container to load" "at startup.") @click.pass_context def grant(ctx, image, user, app_license, allow_home, allow_view, volume, allow_startup_data): """Grants access to application identified by IMAGE to a specific user USER and specified access policy.""" allow_common = False source = target = mode = None if volume is not None: allow_common = True try: source, target, mode = parse_volume_string(volume) except ValueError as e: raise click.BadOptionUsage("volume", str(e)) session = ctx.obj.session with orm.transaction(session): orm_app = session.query(orm.Application).filter( orm.Application.image == image).one_or_none() if orm_app is None: raise click.BadParameter("Unknown application image {}".format( image), param_hint="image") orm_user = session.query(orm.User).filter( orm.User.name == user).one_or_none() if orm_user is None: raise click.BadParameter("Unknown user {}".format(user), param_hint="user") orm_policy = session.query(orm.ApplicationPolicy).filter( orm.ApplicationPolicy.app_license == app_license, orm.ApplicationPolicy.allow_home == allow_home, orm.ApplicationPolicy.allow_common == allow_common, orm.ApplicationPolicy.allow_view == allow_view, orm.ApplicationPolicy.volume_source == source, orm.ApplicationPolicy.volume_target == target, orm.ApplicationPolicy.volume_mode == mode, orm.ApplicationPolicy.allow_startup_data == allow_startup_data ).one_or_none() if orm_policy is None: orm_policy = orm.ApplicationPolicy( app_license=app_license, allow_home=allow_home, allow_common=allow_common, allow_view=allow_view, volume_source=source, volume_target=target, volume_mode=mode, allow_startup_data=allow_startup_data, ) session.add(orm_policy) # Check if we already have the entry acc = session.query(orm.Accounting).filter( orm.Accounting.user == orm_user, orm.Accounting.application == orm_app, orm.Accounting.application_policy == orm_policy ).one_or_none() if acc is None: id = uuid.uuid4().hex accounting = orm.Accounting( id=id, user=orm_user, application=orm_app, application_policy=orm_policy, ) session.add(accounting) @app.command() @click.argument("image") @click.argument("user") @click.option("--revoke-all", is_flag=True, help="revoke all grants for that specific user and application, " "regardless of policy.") @click.option("--app_license", type=click.STRING, help="Application license (if required)") @click.option("--allow-home", is_flag=True, help="Policy for mounting of home directory") @click.option("--allow-view", is_flag=True, help="Policy for third-party visibility " "of the running container.") @click.option("--volume", type=click.STRING, help="Application data volume, format=SOURCE:TARGET:MODE, " "where mode is 'ro' or 'rw'.") @click.option("--allow-startup-data", is_flag=True, help="Allow user to provide a file for the container to load" "at startup.") @click.pass_context def revoke( ctx, image, user, revoke_all, app_license, allow_home, allow_view, volume, allow_startup_data): """Revokes access to application identified by IMAGE to a specific user USER and specified parameters.""" allow_common = False source = target = mode = None if volume is not None: allow_common = True try: source, target, mode = parse_volume_string(volume) except ValueError as e: raise click.BadOptionUsage("volume", str(e)) session = ctx.obj.session with orm.transaction(session): orm_app = session.query(orm.Application).filter( orm.Application.image == image).one() orm_user = session.query(orm.User).filter( orm.User.name == user).one() if revoke_all: session.query(orm.Accounting).filter( orm.Accounting.application == orm_app, orm.Accounting.user == orm_user, ).delete() else: orm_policy = session.query(orm.ApplicationPolicy).filter( orm.ApplicationPolicy.app_license == app_license, orm.ApplicationPolicy.allow_home == allow_home, orm.ApplicationPolicy.allow_common == allow_common, orm.ApplicationPolicy.allow_view == allow_view, orm.ApplicationPolicy.volume_source == source, orm.ApplicationPolicy.volume_target == target, orm.ApplicationPolicy.volume_mode == mode, orm.ApplicationPolicy.allow_startup_data == allow_startup_data ).one() session.query(orm.Accounting).filter( orm.Accounting.application == orm_app, orm.Accounting.user == orm_user, orm.Accounting.application_policy == orm_policy, ).delete()
[docs]def main(): cli(obj={})
if __name__ == '__main__': main()