helpers.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. import fnmatch
  2. import functools
  3. import os
  4. import re
  5. import sys
  6. import traceback
  7. import warnings
  8. from contextlib import contextmanager
  9. from cryptography.hazmat.backends import default_backend
  10. from cryptography.hazmat.primitives import serialization as crypt_serialization
  11. from cryptography.hazmat.primitives.asymmetric import rsa
  12. import six
  13. def generate_key_pair():
  14. """
  15. This method generates a keypair and returns it as a tuple
  16. of (public, private) keys.
  17. The public key format is OpenSSH and private key format is PEM.
  18. """
  19. key_pair = rsa.generate_private_key(
  20. backend=default_backend(),
  21. public_exponent=65537,
  22. key_size=2048)
  23. private_key = key_pair.private_bytes(
  24. crypt_serialization.Encoding.PEM,
  25. crypt_serialization.PrivateFormat.PKCS8,
  26. crypt_serialization.NoEncryption()).decode('utf-8')
  27. public_key = key_pair.public_key().public_bytes(
  28. crypt_serialization.Encoding.OpenSSH,
  29. crypt_serialization.PublicFormat.OpenSSH).decode('utf-8')
  30. return public_key, private_key
  31. def filter_by(prop_name, kwargs, objs):
  32. """
  33. Utility method for filtering a list of objects by a property.
  34. If the given property has a non empty value in kwargs, then
  35. the list of objs is filtered by that value. Otherwise, the
  36. list of objs is returned as is.
  37. """
  38. prop_val = kwargs.pop(prop_name, None)
  39. if prop_val:
  40. if isinstance(prop_val, six.string_types):
  41. regex = fnmatch.translate(prop_val)
  42. results = [o for o in objs
  43. if getattr(o, prop_name)
  44. and re.search(regex, getattr(o, prop_name))]
  45. else:
  46. results = [o for o in objs
  47. if getattr(o, prop_name) == prop_val]
  48. return results
  49. else:
  50. return objs
  51. def generic_find(filter_names, kwargs, objs):
  52. """
  53. Utility method for filtering a list of objects by a list of filters.
  54. """
  55. matches = objs
  56. for name in filter_names:
  57. matches = filter_by(name, kwargs, matches)
  58. # All kwargs should have been popped at this time.
  59. if len(kwargs) > 0:
  60. raise TypeError(
  61. "Unrecognised parameters for search: %s. Supported attributes: %s"
  62. % (kwargs, filter_names))
  63. return matches
  64. @contextmanager
  65. def cleanup_action(cleanup_func):
  66. """
  67. Context manager to carry out a given
  68. cleanup action after carrying out a set
  69. of tasks, or when an exception occurs.
  70. If any errors occur during the cleanup
  71. action, those are ignored, and the original
  72. traceback is preserved.
  73. :params func: This function is called if
  74. an exception occurs or at the end of the
  75. context block. If any exceptions raised
  76. by func are ignored.
  77. Usage:
  78. with cleanup_action(lambda e: print("Oops!")):
  79. do_something()
  80. """
  81. try:
  82. yield
  83. except Exception:
  84. ex_class, ex_val, ex_traceback = sys.exc_info()
  85. try:
  86. cleanup_func()
  87. except Exception as e:
  88. print("Error during exception cleanup: {0}".format(e))
  89. traceback.print_exc()
  90. six.reraise(ex_class, ex_val, ex_traceback)
  91. try:
  92. cleanup_func()
  93. except Exception as e:
  94. print("Error during cleanup: {0}".format(e))
  95. traceback.print_exc()
  96. def get_env(varname, default_value=None):
  97. """
  98. Return the value of the environment variable or default_value.
  99. This is a helper method that wraps ``os.environ.get`` to ensure type
  100. compatibility across py2 and py3. For py2, any value obtained from an
  101. environment variable, ensure ``unicode`` type and ``str`` for py3. The
  102. casting is done only for string variables.
  103. :type varname: ``str``
  104. :param varname: Name of the environment variable for which to check.
  105. :param default_value: Return this value is the env var is not found.
  106. Defaults to ``None``.
  107. :return: Value of the supplied environment if found; value of
  108. ``default_value`` otherwise.
  109. """
  110. value = os.environ.get(varname, default_value)
  111. if isinstance(value, six.string_types) and not isinstance(
  112. value, six.text_type):
  113. return six.u(value)
  114. return value
  115. # Alias deprication decorator, following:
  116. # https://stackoverflow.com/questions/49802412/
  117. # how-to-implement-deprecation-in-python-with-argument-alias
  118. def deprecated_alias(**aliases):
  119. def deco(f):
  120. @functools.wraps(f)
  121. def wrapper(*args, **kwargs):
  122. rename_kwargs(f.__name__, kwargs, aliases)
  123. return f(*args, **kwargs)
  124. return wrapper
  125. return deco
  126. def rename_kwargs(func_name, kwargs, aliases):
  127. for alias, new in aliases.items():
  128. if alias in kwargs:
  129. if new in kwargs:
  130. raise TypeError('{} received both {} and {}'.format(
  131. func_name, alias, new))
  132. warnings.warn('{} is deprecated; use {}'.format(alias, new),
  133. DeprecationWarning)
  134. kwargs[new] = kwargs.pop(alias)