Source code for towerlib.entities.team

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: team.py
#
# Copyright 2018 Costas Tyfoxylos
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to
#  deal in the Software without restriction, including without limitation the
#  rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
#  sell copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
#  all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
#  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
#  DEALINGS IN THE SOFTWARE.
#

"""
Main code for team.

.. _Google Python Style Guide:
   http://google.github.io/styleguide/pyguide.html

"""

import logging

from towerlib.towerlibexceptions import (InvalidUser,
                                         PermissionNotFound,
                                         InvalidProject,
                                         InvalidJobTemplate,
                                         InvalidInventory,
                                         InvalidCredential,
                                         InvalidValue,
                                         InvalidOrganization)
from .core import (Entity,
                   EntityManager,
                   validate_max_length)

__author__ = '''Costas Tyfoxylos <ctyfoxylos@schubergphilis.com>'''
__docformat__ = '''google'''
__date__ = '''2018-01-03'''
__copyright__ = '''Copyright 2018, Costas Tyfoxylos'''
__credits__ = ["Costas Tyfoxylos"]
__license__ = '''MIT'''
__maintainer__ = '''Costas Tyfoxylos'''
__email__ = '''<ctyfoxylos@schubergphilis.com>'''
__status__ = '''Development'''  # "Prototype", "Development", "Production".

# This is the main prefix used for logging
LOGGER_BASENAME = '''team'''
LOGGER = logging.getLogger(LOGGER_BASENAME)
LOGGER.addHandler(logging.NullHandler())


[docs]class Team(Entity): """Models the team entity of ansible tower.""" def __init__(self, tower_instance, data): Entity.__init__(self, tower_instance, data) self._object_roles = None @property def name(self): """The name of the team. Returns: string: The name of the team. """ return self._data.get('name') @name.setter def name(self, value): """Update the name of the team. Returns: None: """ max_characters = 512 conditions = [validate_max_length(value, max_characters)] if all(conditions): self._update_values('name', value) else: raise InvalidValue(f'{value} is invalid. Condition max_characters must be less or equal to ' f'{max_characters}') @property def description(self): """The description of the team. Returns: string: The description of the team. """ return self._data.get('description') @description.setter def description(self, value): """Update the description of the team. Returns: None: """ self._update_values('description', value) @property def organization(self): """The Organization of the team. Returns: Organization: The organization of the team. """ return self._tower.get_organization_by_id(self._data.get('organization')) @organization.setter def organization(self, value): """Update the organization of the team. Returns: None: """ organization = self._tower.get_organization_by_name(value) if not organization: raise InvalidOrganization(value) self._update_values('organization', organization.id) @property def roles(self): """The roles. Returns: EntityManager: EntityManager of the roles. """ url = self._data.get('related', {}).get('roles') return EntityManager(self._tower, entity_object='Role', primary_match_field='name', url=url) @property def object_roles(self): """The object roles. Returns: EntityManager: EntityManager of the object roles supported. """ if not self._object_roles: url = self._data.get('related', {}).get('object_roles') self._object_roles = EntityManager(self._tower, entity_object='ObjectRole', primary_match_field='name', url=url) return self._object_roles @property def object_role_names(self): """The names of the object roles. Returns: list: A list of strings for the object_roles. """ return (object_role.name for object_role in self.object_roles) @property def users(self): """The users of the team. Returns: EntityManager: EntityManager of the users. """ url = self._data.get('related', {}).get('users') return EntityManager(self._tower, entity_object='User', primary_match_field='username', url=url) @property def credentials(self): """The credentials of the team. Returns: EntityManager: EntityManager of the credentials. """ url = self._data.get('related', {}).get('credentials') return EntityManager(self._tower, entity_object='Credential', primary_match_field='name', url=url) @property def projects(self): """The projects of the team. Returns: EntityManager: EntityManager of the projects. """ url = self._data.get('related', {}).get('projects') return EntityManager(self._tower, entity_object='Project', primary_match_field='name', url=url)
[docs] def get_user_by_username(self, username): """Retrieves a user of the team by its username. Args: username: The username of the user to retrieve. Returns: user (User) on success, None otherwise. """ return next((user for user in self.users if user.username.lower() == username.lower()), None)
[docs] def add_user_as_member(self, username): """Adds a user as a member of the team. Args: username: The username of the user to add. Returns: True on success, False otherwise. """ return self._post_user_with_permission(username, 'member')
[docs] def remove_user_as_member(self, username): """Removes a user as a member of the team. Args: username: The username of the user to remove. Returns: True on success, False otherwise. """ return self._post_user_with_permission(username, 'member', remove=True)
[docs] def add_user_as_admin(self, username): """Adds a user as an admin of the team. Args: username: The username of the user to add. Returns: True on success, False otherwise. """ return self._post_user_with_permission(username, 'admin')
[docs] def remove_user_as_admin(self, username): """Removes a user as an admin of the team. Args: username: The username of the user to remove. Returns: True on success, False otherwise. """ return self._post_user_with_permission(username, 'admin', remove=True)
@staticmethod def _get_permission(role_name, object_roles): permission = next((role for role in object_roles if role.name.lower() == role_name.lower()), None) if not permission: raise PermissionNotFound(role_name) return permission def _post_user_with_permission(self, username, role_name, remove=False): permission = self._get_permission(role_name, self.object_roles) user = self._tower.get_user_by_username(username) if not user: raise InvalidUser(username) url = f'{self._tower.api}/users/{user.id}/roles/' payload = {'id': permission.id} if remove: roles_ids = [role.id for role in user.roles] if permission.id not in roles_ids: self._logger.warning('"%s" is not part of the team', username) return False payload['disassociate'] = True response = self._tower.session.post(url, json=payload) if not response.ok: self._logger.error('Error posting to url "%s", response was: "%s"', url, response.text) return response.ok
[docs] def add_project_permission_admin(self, project_name): """Adds a project with admin permissions. Args: project_name: The name of the project to add. Returns: True on success, False otherwise. """ return self._post_project_permission(project_name, 'admin')
[docs] def remove_project_permission_admin(self, project_name): """Removes a project with admin permissions. Args: project_name: The name of the project to remove. Returns: True on success, False otherwise. """ return self._post_project_permission(project_name, 'admin', remove=True)
[docs] def add_project_permission_update(self, project_name): """Adds a project with update permissions. Args: project_name: The name of the project to add. Returns: True on success, False otherwise. """ return self._post_project_permission(project_name, 'update')
[docs] def remove_project_permission_update(self, project_name): """Removes a project with update permissions. Args: project_name: The name of the project to remove. Returns: True on success, False otherwise. """ return self._post_project_permission(project_name, 'update', remove=True)
[docs] def add_project_permission_use(self, project_name): """Adds a project with use permissions. Args: project_name: The name of the project to add. Returns: True on success, False otherwise. """ return self._post_project_permission(project_name, 'use')
[docs] def remove_project_permission_use(self, project_name): """Removes a project with use permissions. Args: project_name: The name of the project to remove. Returns: True on success, False otherwise. """ return self._post_project_permission(project_name, 'use', remove=True)
[docs] def add_job_template_permission_admin(self, job_template_name): """Adds a job template with admin permissions. Args: job_template_name: The name of the job template to add. Returns: True on success, False otherwise. """ return self._post_job_template_permission(job_template_name, 'admin')
[docs] def remove_job_template_permission_admin(self, job_template_name): """Removes a job template with admin permissions. Args: job_template_name: The name of the job template to remove. Returns: True on success, False otherwise. """ return self._post_job_template_permission(job_template_name, 'admin', remove=True)
[docs] def add_job_template_permission_execute(self, job_template_name): """Adds a job template with execute permissions. Args: job_template_name: The name of the job template to add. Returns: True on success, False otherwise. """ return self._post_job_template_permission(job_template_name, 'execute')
[docs] def remove_job_template_permission_execute(self, job_template_name): """Removes a job template with execute permissions. Args: job_template_name: The name of the job template to remove. Returns: True on success, False otherwise. """ return self._post_job_template_permission(job_template_name, 'execute', remove=True)
[docs] def add_inventory_permission_admin(self, inventory_name): """Adds an inventory with admin permissions. Args: inventory_name: The name of the inventory to add. Returns: True on success, False otherwise. """ return self._post_inventory_permission(inventory_name, 'admin')
[docs] def remove_inventory_permission_admin(self, inventory_name): """Removes an inventory with admin permissions. Args: inventory_name: The name of the inventory to remove. Returns: True on success, False otherwise. """ return self._post_inventory_permission(inventory_name, 'admin', remove=True)
[docs] def add_inventory_permission_use(self, inventory_name): """Adds an inventory with use permissions. Args: inventory_name: The name of the inventory to add. Returns: True on success, False otherwise. """ return self._post_inventory_permission(inventory_name, 'use')
[docs] def remove_inventory_permission_use(self, inventory_name): """Removes an inventory with use permissions. Args: inventory_name: The name of the inventory to remove. Returns: True on success, False otherwise. """ return self._post_inventory_permission(inventory_name, 'use', remove=True)
[docs] def add_inventory_permission_update(self, inventory_name): """Adds an inventory with update permissions. Args: inventory_name: The name of the inventory to add. Returns: True on success, False otherwise. """ return self._post_inventory_permission(inventory_name, 'update')
[docs] def remove_inventory_permission_update(self, inventory_name): """Removes an inventory with update permissions. Args: inventory_name: The name of the inventory to remove. Returns: True on success, False otherwise. """ return self._post_inventory_permission(inventory_name, 'update', remove=True)
[docs] def add_inventory_permission_ad_hoc(self, inventory_name): """Adds an inventory with ad hoc permissions. Args: inventory_name: The name of the inventory to add. Returns: True on success, False otherwise. """ return self._post_inventory_permission(inventory_name, 'ad hoc')
[docs] def remove_inventory_permission_ad_hoc(self, inventory_name): """Removes an inventory with ad hoc permissions. Args: inventory_name: The name of the inventory to remove. Returns: True on success, False otherwise. """ return self._post_inventory_permission(inventory_name, 'ad hoc', remove=True)
[docs] def add_credential_permission_admin(self, credential_name, credential_type): """Adds a credential with admin permissions. Args: credential_name: The name of the credential to add. credential_type (str): The type of the credential to use Returns: True on success, False otherwise. """ return self._post_credential_permission(credential_name, credential_type, 'admin')
[docs] def remove_credential_permission_admin(self, credential_name, credential_type): """Removes a credential with admin permissions. Args: credential_name: The name of the credential to remove. credential_type (str): The type of the credential to use Returns: True on success, False otherwise. """ return self._post_credential_permission(credential_name, credential_type, 'admin', remove=True)
[docs] def add_credential_permission_use(self, credential_name, credential_type): """Adds a credential with admin permissions. Args: credential_name: The name of the credential to add. credential_type (str): The type of the credential to use Returns: True on success, False otherwise. """ return self._post_credential_permission(credential_name, credential_type, 'use')
[docs] def remove_credential_permission_use(self, credential_name, credential_type): """Removes a credential with use permissions. Args: credential_name: The name of the credential to remove. credential_type (str): The type of the credential to use Returns: True on success, False otherwise. """ return self._post_credential_permission(credential_name, credential_type, 'use', remove=True)
[docs] def add_organization_role_by_name(self, organization_name, role_name): """Adds an organization role to the team. Args: organization_name (str): The name of the organization to search roles for. role_name (str): The name of the role to add. Returns: True on success, False otherwise. """ return self._post_organization_role(organization_name, role_name)
[docs] def remove_organization_role_by_name(self, organization_name, role_name): """Removes an organization role from the team. Args: organization_name (str): The name of the organization to search roles for. role_name (str): The name of the role to add. Returns: True on success, False otherwise. """ return self._post_organization_role(organization_name, role_name, remove=True)
def _post_organization_role(self, organization_name, role_name, remove=False): organization = self._tower.get_organization_by_name(organization_name) if not organization: raise InvalidOrganization(organization_name) return self._post_permission(organization.object_roles, role_name, remove) def _post_project_permission(self, project_name, permission_name, remove=False): project = self.organization.get_project_by_name(project_name) if not project: raise InvalidProject(project_name) return self._post_permission(project.object_roles, permission_name, remove) def _post_job_template_permission(self, job_template_name, permission_name, remove=False): job_template = self._tower.get_job_template_by_name(job_template_name) if not job_template: raise InvalidJobTemplate(job_template_name) return self._post_permission(job_template.object_roles, permission_name, remove) def _post_inventory_permission(self, inventory_name, permission_name, remove=False): inventory = self.organization.get_inventory_by_name(inventory_name) if not inventory: raise InvalidInventory(inventory_name) return self._post_permission(inventory.object_roles, permission_name, remove) def _post_credential_permission(self, credential_name, credential_type, permission_name, remove=False): credential = self.organization.get_credential_by_name(credential_name, credential_type) if not credential: raise InvalidCredential(credential_name) return self._post_permission(credential.object_roles, permission_name, remove) def _post_permission(self, roles, permission_name, remove=False): permission = self._get_permission(permission_name, roles) if remove: url = f'{self._tower.api}/roles/{permission.id}/teams/' payload = {'id': self.id, 'disassociate': True} else: url = f'{self._tower.api}/teams/{self.id}/roles/' payload = {'id': permission.id} response = self._tower.session.post(url, json=payload) if not response.ok: self._logger.error('Error posting to url "%s", response was "%s"', url, response.text) return response.ok