x509_certificate.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. #!/usr/bin/python
  2. DOCUMENTATION = """
  3. ---
  4. module: x509_certificate
  5. short_description: Create X.509 certificates.
  6. requirements:
  7. - cryptography (python 2)
  8. author: Fabian Peter Hammerle
  9. """
  10. from ansible.module_utils.basic import AnsibleModule
  11. from cryptography import utils
  12. from cryptography import x509
  13. from cryptography.hazmat.backends import default_backend
  14. from cryptography.hazmat.primitives import hashes
  15. from cryptography.hazmat.primitives import serialization
  16. from cryptography.hazmat.primitives.asymmetric import rsa
  17. from cryptography.x509.oid import NameOID
  18. import datetime
  19. import os
  20. def random_serial_number():
  21. """ https://cryptography.io/en/latest/_modules/cryptography/x509/base/#random_serial_number """
  22. return utils.int_from_bytes(os.urandom(20), "big") >> 1
  23. def create_key(path):
  24. return rsa.generate_private_key(
  25. public_exponent = 65537,
  26. key_size = 4096,
  27. backend = default_backend(),
  28. )
  29. def save_key(path, key):
  30. with open(path, 'wb') as f:
  31. f.write(key.private_bytes(
  32. encoding = serialization.Encoding.PEM,
  33. format = serialization.PrivateFormat.TraditionalOpenSSL,
  34. encryption_algorithm = serialization.NoEncryption(),
  35. ))
  36. os.chmod(path, 0600)
  37. def save_cert(path, cert):
  38. with open(path, 'wb') as f:
  39. f.write(cert.public_bytes(
  40. encoding = serialization.Encoding.PEM,
  41. ))
  42. os.chmod(path, 0600)
  43. def load_key(path):
  44. with open(path, 'rb') as f:
  45. return serialization.load_pem_private_key(
  46. data = f.read(),
  47. password = None,
  48. backend = default_backend(),
  49. )
  50. def load_cert(path):
  51. with open(path, 'rb') as f:
  52. return x509.load_pem_x509_certificate(
  53. data = f.read(),
  54. backend = default_backend(),
  55. )
  56. def create_name(common_name, organization_name = None):
  57. attr = []
  58. if organization_name:
  59. attr.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization_name))
  60. attr.append(x509.NameAttribute(NameOID.COMMON_NAME, common_name))
  61. return x509.Name(attr)
  62. def main(argv):
  63. module = AnsibleModule(
  64. argument_spec = {
  65. 'cert_path': {'required': True, 'type': 'str'},
  66. 'common_name': {'required': True, 'type': 'str'},
  67. 'key_path': {'required': True, 'type': 'str'},
  68. 'organization_name': {'required': False, 'type': 'str', 'default': None},
  69. }
  70. )
  71. changed = False
  72. if os.path.exists(module.params['key_path']):
  73. key = load_key(module.params['key_path'])
  74. else:
  75. key = create_key(module.params['key_path'])
  76. save_key(path = module.params['key_path'], key = key)
  77. changed = True
  78. if os.path.exists(module.params['cert_path']):
  79. cert = load_cert(module.params['cert_path'])
  80. else:
  81. issuer = create_name(
  82. common_name = module.params['common_name'].decode('utf-8'),
  83. organization_name = module.params['organization_name'].decode('utf-8')
  84. if module.params['organization_name'] else None,
  85. )
  86. cert_builder = (
  87. x509.CertificateBuilder()
  88. .subject_name(issuer)
  89. .issuer_name(issuer)
  90. .public_key(key.public_key())
  91. .serial_number(random_serial_number())
  92. .not_valid_before(datetime.datetime.utcnow())
  93. .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days = 356 * 10))
  94. )
  95. cert = cert_builder.sign(
  96. private_key = key,
  97. algorithm = hashes.SHA256(),
  98. backend = default_backend(),
  99. )
  100. save_cert(path = module.params['cert_path'], cert = cert)
  101. changed = True
  102. module.exit_json(
  103. changed = changed,
  104. cert_path = module.params['cert_path'],
  105. key_path = module.params['key_path'],
  106. subject_common_name = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value,
  107. )
  108. return 0
  109. if __name__ == "__main__":
  110. import sys
  111. sys.exit(main(sys.argv[1:]))