Source code for towerlib.entities.credential

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: credentials.py
#
# Copyright 2018 Costas Tyfoxylos
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to
#  deal in the Software without restriction, including without limitation the
#  rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
#  sell copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
#  all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
#  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
#  DEALINGS IN THE SOFTWARE.
#

"""
Main code for credentials.

.. _Google Python Style Guide:
   http://google.github.io/styleguide/pyguide.html

"""

import importlib
import logging

from towerlib.towerlibexceptions import (InvalidCredentialType,
                                         InvalidOrganization, InvalidValue)

from .core import Entity, EntityManager, validate_max_length

__author__ = '''Costas Tyfoxylos <ctyfoxylos@schubergphilis.com>'''
__docformat__ = '''google'''
__date__ = '''2018-01-03'''
__copyright__ = '''Copyright 2018, Costas Tyfoxylos'''
__credits__ = ["Costas Tyfoxylos"]
__license__ = '''MIT'''
__maintainer__ = '''Costas Tyfoxylos'''
__email__ = '''<ctyfoxylos@schubergphilis.com>'''
__status__ = '''Development'''  # "Prototype", "Development", "Production".

# This is the main prefix used for logging
LOGGER_BASENAME = '''credentials'''
LOGGER = logging.getLogger(LOGGER_BASENAME)
LOGGER.addHandler(logging.NullHandler())


[docs]class CredentialType(Entity): """Models the credential_type entity of ansible tower.""" def __init__(self, tower_instance, data): Entity.__init__(self, tower_instance, data) @property def name(self): """The name of the credential type. Returns: string: The name of the credential type. """ return self._data.get('name') @name.setter def name(self, value): """Update the name of the credential type. Returns: None: """ max_characters = 512 conditions = [validate_max_length(value, max_characters)] if all(conditions): self._update_values('name', value) else: raise InvalidValue(f'{value} is invalid. Condition max_characters must be less or equal ' f'{max_characters}') @property def description(self): """The description of the credential type. Returns: string: The description of the credential type. """ return self._data.get('description') @description.setter def description(self, value): """Set the description of the credential type. Returns: None. """ self._update_values('description', value) @property def kind(self): """The kind of the credential type. Accepted values are : (u'scm', u'ssh', u'vault', u'net', u'cloud', u'insights'). Returns: string: The kind of the credential type. """ return self._data.get('kind') @property def managed_by_tower(self): """Flag indicating whether the credential is internal to tower. Returns: bool: True if credential is internal to tower, False if it is user created. """ return self._data.get('managed_by_tower') @property def inputs(self): """The inputs of the credential type. Returns: dictionary: A structure of the credential type supported inputs. """ return self._data.get('inputs') @inputs.setter def inputs(self, value): """Update the inputs of the credential type. Returns: None: """ if isinstance(value, dict): self._update_values('inputs', value) else: raise InvalidValue(f'Value is not valid dictionary received: {value}') @property def injectors(self): """The injectors of the credential type. Returns: dictionary: A structure of the credential type supported injectors. """ return self._data.get('injectors') @injectors.setter def injectors(self, value): """Update the injectors of the credential type. Returns: None: """ if isinstance(value, dict): self._update_values('injectors', value) else: raise InvalidValue(f'Value is not valid dictionary received: {value}') @property def input_sources(self): """The input sources of the credential. Returns: dictionary: A structure of the credential input sources. """ response = self._tower.session.get(f'{self.url}input_sources') if not response.ok: self._logger.exception(f'Could not retrieve the input sources for credential {self.name}') response.raise_for_status() return next(iter(response.json().get('results', [])), None)
[docs]class Credential: """Credential factory to handle the different credential types returned.""" def __new__(cls, tower_instance, data): type_id = data.get('credential_type') if not type_id: raise InvalidCredentialType(f'Could not get credential type from the data provided: {data}.') try: credential_type = tower_instance.get_credential_type_by_id(type_id) if not credential_type: LOGGER.warning(f'Could not get credential with type "{type_id}" from tower, trying a generic one.') return GenericCredential(tower_instance, data) credential_type_name = f'{"".join(credential_type.name.split())}Credential' credential_class = getattr(importlib.import_module('towerlib.entities.credential'), credential_type_name) credential = credential_class(tower_instance, data) except (AttributeError, ImportError): LOGGER.warning(f'Could not dynamically load credential with type : "{type_id}", trying a generic one.') credential = GenericCredential(tower_instance, data) return credential
[docs]class GenericCredential(Entity): """Models the credential entity of ansible tower.""" def __init__(self, tower_instance, data): Entity.__init__(self, tower_instance, data) self._object_roles = None @property def host(self): """The host of the credential. Returns: dictionary: The host structure of the credential. """ return self._data.get('summary_fields', {}).get('host') @property def project(self): """The project of the credential. Returns: dictionary: The project structure of the credential. """ return self._data.get('summary_fields', {}).get('project') @property def created_by(self): """The user that created the credential. Returns: User: The user that created the credential. """ url = self._data.get('related', {}).get('created_by') return self._tower._get_object_by_url('User', url) # pylint: disable=protected-access @property def modified_by(self): """The person that modified the credential last. Returns: User: The user that modified the credential in tower last. """ url = self._data.get('related', {}).get('modified_by') return self._tower._get_object_by_url('User', url) # pylint: disable=protected-access @property def object_roles(self): """The object roles. Returns: EntityManager: EntityManager of the object roles supported. """ if not self._object_roles: url = self._data.get('related', {}).get('object_roles') self._object_roles = EntityManager(self._tower, entity_object='ObjectRole', primary_match_field='name', url=url) return self._object_roles @property def owner_users(self): """The owners of the credential. Returns: EntityManager: EntityManager of the users that are owners. """ url = self._data.get('related', {}).get('owner_users') return EntityManager(self._tower, entity_object='User', primary_match_field='username', url=url) @property def owner_teams(self): """The owner teams of the credential. Returns: EntityManager: EntityManager of the teams that are owners. """ url = self._data.get('related', {}).get('owner_teams') return EntityManager(self._tower, entity_object='Team', primary_match_field='name', url=url) @property def name(self): """The name of the credential. Returns: string: The name of the credential. """ return self._data.get('name') @name.setter def name(self, value): """Update the name of the credential. Returns: None: """ max_characters = 512 conditions = [validate_max_length(value, max_characters)] if all(conditions): self._update_values('name', value) else: raise InvalidValue(f'{value} is invalid. Condition max_characters must be less or equal to ' f'{max_characters}') @property def description(self): """The description of the credential. Returns: string: The description of the credential. """ return self._data.get('description') @description.setter def description(self, value): """Set the description of the credential. Returns: None. """ self._update_values('description', value) @property def organization(self): """The organization the credential is part of. Returns: Organization: The organization the credential is part of. """ return self._tower.get_organization_by_id(self._data.get('organization')) @organization.setter def organization(self, value): """Set the organization of the credential. Returns: None. """ organization = self._tower.get_organization_by_name(value) if not organization: raise InvalidOrganization(value) self._update_values('organization', organization.id) @property def credential_type(self): """The type of the credential. Returns: CredentialType: The type of the credential. """ return self._tower.get_credential_type_by_id(self._data.get('credential_type')) @credential_type.setter def credential_type(self, value): """Set the credential_type of the credential. Returns: None. """ credential_type = self._tower.get_credential_type_by_name(value) if not credential_type: raise InvalidCredentialType(value) self._update_values('credential_type', credential_type.id) @property def inputs(self): """The inputs of the credential. Returns: dictionary: A structure of the credential supported inputs. """ return self._data.get('inputs') @inputs.setter def inputs(self, value): """Update the inputs of the credential. Returns: None: """ if isinstance(value, dict): self._update_values('inputs', value) else: raise InvalidValue(f'Value is not valid json received: {value}')
[docs]class MachineCredential(GenericCredential): """Models the machine credential.""" def __init__(self, tower_instance, data): GenericCredential.__init__(self, tower_instance, data) @property def username(self): """The username that is set in the credential. Returns: basestring: The username that is set in the credential. """ return self._data.get('inputs', {}).get('username') @username.setter def username(self, value): """Set the username of the credential. Returns: None. """ self._update_values('username', value, parent_attribute='inputs') @property def password(self): """The password that is set in the credential. Returns: basestring: The password that is set in the credential. """ return self._data.get('inputs', {}).get('password') @password.setter def password(self, value): """Set the password of the credential. Returns: None. """ self._update_values('password', value, parent_attribute='inputs')
[docs]class HashicorpVaultCredential(GenericCredential): """Models the hashicorp vault credential.""" def __init__(self, tower_instance, data): GenericCredential.__init__(self, tower_instance, data) @property def token(self): """The token that is set in the credential. Returns: basestring: The token that is set in the credential. """ return self._data.get('inputs', {}).get('hashi_vault_token') @token.setter def token(self, value): """Set the token of the credential. Returns: None. """ self._update_values('hashi_vault_token', value, parent_attribute='inputs') @property def vault_address(self): """The vault address that is set in the credential. Returns: basestring: The vault address that is set in the credential. """ return self._data.get('inputs', {}).get('hashi_vault_addr') @vault_address.setter def vault_address(self, value): """Set the password of the credential. Returns: None. """ self._update_values('hashi_vault_addr', value, parent_attribute='inputs') @property def ca_host_verify(self): """The ca host verify setting that is set in the credential. Returns: basestring: The vault address that is set in the credential. """ return self._data.get('inputs', {}).get('hashi_vault_pre_python_279_cahostverify') @ca_host_verify.setter def ca_host_verify(self, value): """Set the ca host verify of the credential. Returns: None. """ if value.lower() not in ['no', 'yes']: raise ValueError('Value should be either no/yes') self._update_values('hashi_vault_pre_python_279_cahostverify', value, parent_attribute='inputs')
[docs]class AmazonWebServicesCredential(GenericCredential): """Models the Amazon Web Services credential.""" def __init__(self, tower_instance, data): GenericCredential.__init__(self, tower_instance, data) @property def access_key(self): """The access_key that is set in the credential. Returns: basestring: The access_key that is set in the credential. """ return self._data.get('inputs', {}).get('username') @access_key.setter def access_key(self, value): """Set the access_key of the credential. Returns: None. """ self._update_values('username', value, parent_attribute='inputs') @property def secret_key(self): """The secret_key that is set in the credential. Returns: basestring: The secret_key that is set in the credential. """ return self._data.get('inputs', {}).get('password') @secret_key.setter def secret_key(self, value): """Set the secret_key of the credential. Returns: None. """ self._update_values('password', value, parent_attribute='inputs')