feat(gpg_secretstore): add file locking

This commit is contained in:
Jadyn Emma Jaeger 2021-09-07 09:32:00 +02:00 committed by Jan Christian Grünhage
parent 06b6402b00
commit d22db019f4
No known key found for this signature in database
GPG key ID: EEC1170CE56FA2ED
2 changed files with 54 additions and 44 deletions

View file

@ -79,7 +79,7 @@ class SecretStore:
self.gpg = self.__gpg
def __load(self, slug: str) -> str:
file = (
file = Path(
(self.password_store_path / (slug + self.file_extension))
.expanduser()
.absolute()

View file

@ -8,6 +8,7 @@ from __future__ import absolute_import, division, print_function
__metaclass__ = type
import hashlib
import traceback
ANSIBLE_METADATA = {
@ -24,6 +25,7 @@ author:
- Lars Kaiser (@lrsksr)
requirements:
- PyYAML >= 6.0
- filelock >= 3.0.12
- python >= 3.7
- python-gnupg >= 0.4.7
short_description: Save and retrieve secrets from pass compatible files
@ -174,6 +176,7 @@ from ansible.utils.display import Display
LIB_IMP_ERR = None
try:
from filelock import FileLock
import gnupg
HAS_LIB = True
@ -302,44 +305,24 @@ def main():
recrypt = module.params["recrypt"]
secret_type = module.params["secret_type"]
if overwrite or recrypt:
try:
secret = store.get(slug=password_slug, data_type=data_type)
if recrypt and not overwrite:
logging.v("Recrypt mode: Secret will be reused")
secretGenerator = SecretGenerator(
secret_type="user_supplied", user_supplied_secret=secret
)
else:
logging.v("Overwrite mode: New secret will be generated")
result["action"] = "update"
except (PasswordDecodeError, FileNotFoundError):
result["action"] = "add"
try:
if secretGenerator.secret_type == "binary":
save_as_data_type = "plain"
else:
save_as_data_type = data_type
store.put(
slug=password_slug,
data=secretGenerator.getSecret(),
data_type=save_as_data_type,
)
except (FileNotFoundError, PasswordStoreException):
failed = True
result[
"msg"
] = "Not a pass-compatible secret database! Couldn't find .gpg-id file."
else:
result["changed"] = True
lock = FileLock(
(Path("/tmp/") / hashlib.md5(password_slug.encode()).hexdigest()).as_posix()
)
# Try to read the password from storage
try:
result["secret"] = store.get(slug=password_slug, data_type=data_type)
except (PasswordDecodeError, FileNotFoundError) as e:
if isinstance(e, FileNotFoundError):
logging.v("No secret found, new secret will be generated")
result["secret"] = secretGenerator.getSecret()
with lock:
if overwrite or recrypt:
try:
secret = store.get(slug=password_slug, data_type=data_type)
if recrypt and not overwrite:
logging.v("Recrypt mode: Secret will be reused")
secretGenerator = SecretGenerator(
secret_type="user_supplied", user_supplied_secret=secret
)
else:
logging.v("Overwrite mode: New secret will be generated")
result["action"] = "update"
except (PasswordDecodeError, FileNotFoundError):
result["action"] = "add"
try:
if secretGenerator.secret_type == "binary":
save_as_data_type = "plain"
@ -356,13 +339,40 @@ def main():
"msg"
] = "Not a pass-compatible secret database! Couldn't find .gpg-id file."
else:
result["secret"] = store.get(slug=password_slug, data_type=data_type)
result["changed"] = True
result["action"] = "add"
if isinstance(e, PasswordDecodeError):
logging.v(e.__str__())
result["msg"] = "Secret decoding failed, is the data type correct?"
failed = True
# Try to read the password from storage
try:
result["secret"] = store.get(slug=password_slug, data_type=data_type)
except (PasswordDecodeError, FileNotFoundError) as e:
if isinstance(e, FileNotFoundError):
logging.v("No secret found, new secret will be generated")
result["secret"] = secretGenerator.getSecret()
try:
if secretGenerator.secret_type == "binary":
save_as_data_type = "plain"
else:
save_as_data_type = data_type
store.put(
slug=password_slug,
data=secretGenerator.getSecret(),
data_type=save_as_data_type,
)
except (FileNotFoundError, PasswordStoreException):
failed = True
result[
"msg"
] = "Not a pass-compatible secret database! Couldn't find .gpg-id file."
else:
result["secret"] = store.get(
slug=password_slug, data_type=data_type
)
result["changed"] = True
result["action"] = "add"
if isinstance(e, PasswordDecodeError):
logging.v(e.__str__())
result["msg"] = "Secret decoding failed, is the data type correct?"
failed = True
if failed:
module.fail_json(**result)