diff --git a/plugins/module_utils/gpg_utils.py b/plugins/module_utils/gpg_utils.py index 05313d8..71c5d23 100644 --- a/plugins/module_utils/gpg_utils.py +++ b/plugins/module_utils/gpg_utils.py @@ -79,7 +79,7 @@ class SecretStore: self.gpg = self.__gpg def __load(self, slug: str) -> str: - file = ( + file = Path( (self.password_store_path / (slug + self.file_extension)) .expanduser() .absolute() diff --git a/plugins/modules/gpg_secretstore.py b/plugins/modules/gpg_secretstore.py index 8f57102..c105b33 100644 --- a/plugins/modules/gpg_secretstore.py +++ b/plugins/modules/gpg_secretstore.py @@ -8,6 +8,7 @@ from __future__ import absolute_import, division, print_function __metaclass__ = type +import hashlib import traceback ANSIBLE_METADATA = { @@ -24,6 +25,7 @@ author: - Lars Kaiser (@lrsksr) requirements: - PyYAML >= 6.0 + - filelock >= 3.0.12 - python >= 3.7 - python-gnupg >= 0.4.7 short_description: Save and retrieve secrets from pass compatible files @@ -174,6 +176,7 @@ from ansible.utils.display import Display LIB_IMP_ERR = None try: + from filelock import FileLock import gnupg HAS_LIB = True @@ -302,44 +305,24 @@ def main(): recrypt = module.params["recrypt"] secret_type = module.params["secret_type"] - if overwrite or recrypt: - try: - secret = store.get(slug=password_slug, data_type=data_type) - if recrypt and not overwrite: - logging.v("Recrypt mode: Secret will be reused") - secretGenerator = SecretGenerator( - secret_type="user_supplied", user_supplied_secret=secret - ) - else: - logging.v("Overwrite mode: New secret will be generated") - result["action"] = "update" - except (PasswordDecodeError, FileNotFoundError): - result["action"] = "add" - try: - if secretGenerator.secret_type == "binary": - save_as_data_type = "plain" - else: - save_as_data_type = data_type - store.put( - slug=password_slug, - data=secretGenerator.getSecret(), - data_type=save_as_data_type, - ) - except (FileNotFoundError, PasswordStoreException): - failed = True - result[ - "msg" - ] = "Not a pass-compatible secret database! Couldn't find .gpg-id file." - else: - result["changed"] = True + lock = FileLock( + (Path("/tmp/") / hashlib.md5(password_slug.encode()).hexdigest()).as_posix() + ) - # Try to read the password from storage - try: - result["secret"] = store.get(slug=password_slug, data_type=data_type) - except (PasswordDecodeError, FileNotFoundError) as e: - if isinstance(e, FileNotFoundError): - logging.v("No secret found, new secret will be generated") - result["secret"] = secretGenerator.getSecret() + with lock: + if overwrite or recrypt: + try: + secret = store.get(slug=password_slug, data_type=data_type) + if recrypt and not overwrite: + logging.v("Recrypt mode: Secret will be reused") + secretGenerator = SecretGenerator( + secret_type="user_supplied", user_supplied_secret=secret + ) + else: + logging.v("Overwrite mode: New secret will be generated") + result["action"] = "update" + except (PasswordDecodeError, FileNotFoundError): + result["action"] = "add" try: if secretGenerator.secret_type == "binary": save_as_data_type = "plain" @@ -356,13 +339,40 @@ def main(): "msg" ] = "Not a pass-compatible secret database! Couldn't find .gpg-id file." else: - result["secret"] = store.get(slug=password_slug, data_type=data_type) result["changed"] = True - result["action"] = "add" - if isinstance(e, PasswordDecodeError): - logging.v(e.__str__()) - result["msg"] = "Secret decoding failed, is the data type correct?" - failed = True + + # Try to read the password from storage + try: + result["secret"] = store.get(slug=password_slug, data_type=data_type) + except (PasswordDecodeError, FileNotFoundError) as e: + if isinstance(e, FileNotFoundError): + logging.v("No secret found, new secret will be generated") + result["secret"] = secretGenerator.getSecret() + try: + if secretGenerator.secret_type == "binary": + save_as_data_type = "plain" + else: + save_as_data_type = data_type + store.put( + slug=password_slug, + data=secretGenerator.getSecret(), + data_type=save_as_data_type, + ) + except (FileNotFoundError, PasswordStoreException): + failed = True + result[ + "msg" + ] = "Not a pass-compatible secret database! Couldn't find .gpg-id file." + else: + result["secret"] = store.get( + slug=password_slug, data_type=data_type + ) + result["changed"] = True + result["action"] = "add" + if isinstance(e, PasswordDecodeError): + logging.v(e.__str__()) + result["msg"] = "Secret decoding failed, is the data type correct?" + failed = True if failed: module.fail_json(**result)