x509_certificate.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. #!/usr/bin/python
  2. DOCUMENTATION = """
  3. ---
  4. module: x509_certificate
  5. short_description: Create X.509 certificates.
  6. requirements:
  7. - cryptography (python 2)
  8. author: Fabian Peter Hammerle
  9. """
  10. from ansible.module_utils.basic import AnsibleModule
  11. from cryptography import utils
  12. from cryptography import x509
  13. from cryptography.hazmat.backends import default_backend
  14. from cryptography.hazmat.primitives import hashes
  15. from cryptography.hazmat.primitives import serialization
  16. from cryptography.hazmat.primitives.asymmetric import rsa
  17. from cryptography.x509.oid import NameOID
  18. import datetime
  19. import os
  20. def random_serial_number():
  21. """ https://cryptography.io/en/latest/_modules/cryptography/x509/base/#random_serial_number """
  22. return utils.int_from_bytes(os.urandom(20), "big") >> 1
  23. def create_key(path):
  24. return rsa.generate_private_key(
  25. public_exponent = 65537,
  26. key_size = 4096,
  27. backend = default_backend(),
  28. )
  29. def save_key(path, key):
  30. with open(path, 'wb') as f:
  31. f.write(key.private_bytes(
  32. encoding = serialization.Encoding.PEM,
  33. format = serialization.PrivateFormat.TraditionalOpenSSL,
  34. encryption_algorithm = serialization.NoEncryption(),
  35. ))
  36. os.chmod(path, 0600)
  37. def save_cert(path, cert):
  38. with open(path, 'wb') as f:
  39. f.write(cert.public_bytes(
  40. encoding = serialization.Encoding.PEM,
  41. ))
  42. os.chmod(path, 0644)
  43. def load_key(path):
  44. with open(path, 'rb') as f:
  45. return serialization.load_pem_private_key(
  46. data = f.read(),
  47. password = None,
  48. backend = default_backend(),
  49. )
  50. def load_cert(path):
  51. with open(path, 'rb') as f:
  52. return x509.load_pem_x509_certificate(
  53. data = f.read(),
  54. backend = default_backend(),
  55. )
  56. def create_name(common_name, organization_name = None):
  57. attr = []
  58. if organization_name:
  59. attr.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization_name))
  60. attr.append(x509.NameAttribute(NameOID.COMMON_NAME, common_name))
  61. return x509.Name(attr)
  62. def main(argv):
  63. module = AnsibleModule(
  64. argument_spec = {
  65. 'cert_path': {'required': True, 'type': 'str'},
  66. 'common_name': {'required': True, 'type': 'str'},
  67. 'key_path': {'required': True, 'type': 'str'},
  68. 'organization_name': {'required': False, 'type': 'str', 'default': None},
  69. 'recreate_cert': {'required': False, 'type': 'bool', 'default': False},
  70. }
  71. )
  72. changed = False
  73. if os.path.exists(module.params['key_path']) and not module.params['recreate_cert']:
  74. key = load_key(module.params['key_path'])
  75. else:
  76. key = create_key(module.params['key_path'])
  77. save_key(path = module.params['key_path'], key = key)
  78. changed = True
  79. if os.path.exists(module.params['cert_path']):
  80. cert = load_cert(module.params['cert_path'])
  81. else:
  82. subject = create_name(
  83. common_name = module.params['common_name'].decode('utf-8'),
  84. organization_name = module.params['organization_name'].decode('utf-8')
  85. if module.params['organization_name'] else None,
  86. )
  87. cert_builder = (
  88. x509.CertificateBuilder()
  89. .subject_name(subject)
  90. .issuer_name(subject)
  91. .public_key(key.public_key())
  92. .serial_number(random_serial_number())
  93. .not_valid_before(datetime.datetime.utcnow())
  94. .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days = 356 * 10))
  95. # The cA boolean indicates whether the certified public key may be used
  96. # to verify certificate signatures.
  97. # https://tools.ietf.org/html/rfc5280.html#section-4.2.1.9
  98. .add_extension(
  99. x509.BasicConstraints(ca = True, path_length = None),
  100. critical = False,
  101. )
  102. # To facilitate certification path construction, this extension MUST
  103. # appear in all conforming CA certificates, that is, all certificates
  104. # including the basic constraints extension
  105. # https://tools.ietf.org/html/rfc5280.html#section-4.2.1.2
  106. .add_extension(
  107. x509.SubjectKeyIdentifier.from_public_key(key.public_key()),
  108. critical = False,
  109. )
  110. )
  111. cert = cert_builder.sign(
  112. private_key = key,
  113. algorithm = hashes.SHA256(),
  114. backend = default_backend(),
  115. )
  116. save_cert(path = module.params['cert_path'], cert = cert)
  117. changed = True
  118. organization_name_attrs = cert.subject.get_attributes_for_oid(NameOID.ORGANIZATION_NAME)
  119. module.exit_json(
  120. changed = changed,
  121. cert_path = module.params['cert_path'],
  122. key_path = module.params['key_path'],
  123. subject_common_name = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value,
  124. organization_name = organization_name_attrs[0].value if len(organization_name_attrs) > 0 else None,
  125. )
  126. return 0
  127. if __name__ == "__main__":
  128. import sys
  129. sys.exit(main(sys.argv[1:]))