#!/usr/bin/env python3 """ This script checks for old SHA1-signed x509 certificates in Lighthouse and replaces them with SHA256-signed equivalents. Only Lighthouse VPN and node client certificates are handled. Updated client certificates are pushed to the node via SSH. This requires nodes to still be connected to Lighthouse (using their existing certificate), so the nodes must be on an appropriate firmware version that still supports SHA1-signed certificates. For OGCS this means 4.x, for NGCS this means 22.11 or earlier. In practice, there are unlikely to be any NGCS nodes in the field with SHA1-signed client certificates. NOTES: - This script may take a while to run, depending on the number of nodes enrolled. Nodes are processed sequentially (no background jobs etc). - Any disconnected nodes will be skipped after a timeout period. Run the script again to retry these nodes later. - Replacing the LHVPN server certificate will cause ALL nodes to temporarily disconnect, even if they have a SHA256-signed client certificate. - This script is intended to be run on Lighthouse 23.04 instances. """ import argparse import io from pathlib import Path import sys from typing import List, Tuple import paramiko from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa from sqlalchemy.orm.session import Session import lipy import lipy.cli.common from lipy import models from lipy.configurators.ssh import FILE_LH_KNOWN_NODES UPGRADE_OVERRIDE_FILE="/mnt/nvram/.allow_upgrade_with_old_ca_cert" THIS_FILE = Path(__file__).name """ Partial Lua script for updating LHVPN certificate details on NGCS nodes. Values are set through ogconfig, and files get updated by configurators. See also (on node): /var/www/localhost/wsapi/og-rest-api/commands/Enrollment.lua """ NGCS_LUA = r""" package.path = '/var/www/localhost/wsapi/og-rest-api/backend/?.lua;' .. package.path local json = require "json" local o = require "libogobject_lua" local Config = require "libogconfig_helper" local handle = Config.connect_to_server() function update_tunnel_cert(lighthouse_names, cert, key, handle) local config = Config:open(handle) local tlist = o.cfg_lhvpn_tunnel_list_new (handle) if not tlist then return false end -- Iterate over each Lighthouse tunnel local list_size = o.cfg_lhvpn_tunnel_list_count (tlist) local tunnelUpdated = false for i = 0, (list_size - 1) do local tobj = o.cfg_lhvpn_tunnel_list_get_tunnel (tlist, i) if tobj == nil then return false end -- Update tunnel if its lighthouse name is a match name = o.cfg_lhvpn_tunnel_get_lighthouse_name (tobj) if lighthouse_names[name] then print("Updating tunnel for '" .. name .. "', VPN address " .. o.cfg_lhvpn_tunnel_get_server_vpn_address(tobj)) o.cfg_lhvpn_tunnel_set_cert(tobj, cert, string.len(cert)) o.cfg_lhvpn_tunnel_set_key(tobj, key, string.len(key)) o.cfg_lhvpn_tunnel_save (tobj) tunnelUpdated = true end o.cfg_lhvpn_tunnel_free (tobj) end o.cfg_lhvpn_tunnel_list_free (tlist) if not tunnelUpdated then io.stderr:write("Tunnel not updated") return false end local ok, _, error = config:commit() if not ok then io.stderr:write(json.encode(error)) return false end return true end if update_tunnel_cert(lighthouse_names, new_cert, new_key, handle) then os.exit(0) else os.exit(1) end """ """ Partial Lua script for updating LHVPN certificate details on OGCS nodes. Unlike NGCS, values are written directly to the appropriate config files. See also (on node): /home/httpd/cgi-bin/api/commands/Enrollment_b.lua """ OGCS_LUA = r""" require "luaconfig" function update_tunnel_cert(lighthouse_names, cert, key, handle) local mgr = luaconfig.ConfigManager () if mgr == nil then return false end mgr:read () local prefix = "config.lhvpn.tunnels" local count = mgr:getSize (prefix .. ".total") local tunnelUpdated = false -- find and update each tunnel that matches the parameter for i = 1, count do local tun = luaconfig.LighthouseVPNTunnel () local path = prefix .. ".tunnel" .. tostring(i) tun:load (mgr, path) if lighthouse_names[tun.lighthouse_name] then print("Updating tunnel for '" .. tun.lighthouse_name .. "', VPN address " .. tun.server_vpn_address) server = mgr:getElement(path ..".server") file_prefix = "/etc/config/lhvpn/"..server.."."..tun.uuid cert_file = file_prefix..".cert" key_file = file_prefix..".key" ca_file = file_prefix..".ca" print(" cert file " .. cert_file) cert_f = io.open (cert_file, "w") cert_f:write(cert .. "\n") cert_f:close() print(" key file " .. key_file) key_f = io.open (key_file, "w") key_f:write(key .. "\n") key_f:close() tunnelUpdated = true end end if not tunnelUpdated then io.stderr:write("Tunnel not updated") return false end mgr:schedule ("lhvpn_tunnel") if not mgr:run () then io.stderr:write("ConfigManager run failed") return false end return true end if update_tunnel_cert(lighthouse_names, new_cert, new_key, handle) then os.exit(0) else os.exit(1) end """ def lua_code(cert: models.Certificate, lighthouse_names: List[str], is_ngcs: bool) -> str: """ Returns Lua script for updating LHVPN certificate and key on nodes. The appropriate code is returned for NGCS or OGCS devices. """ # lighthouse_names are stored as dictionary-style table in Lua lh_names = ",\n".join([f'\t["{n}"] = true' for n in lighthouse_names]) header = f""" local new_cert = [[{cert.certificate.strip().decode("utf-8")}]] local new_key = [[{cert.private_key.strip().decode("utf-8")}]] local lighthouse_names = {{{lh_names}}} """ if is_ngcs: return header + NGCS_LUA return header + OGCS_LUA def renew_x509(ca_certificate: bytes, ca_private_key: bytes, certbytes: bytes, node_extensions: bool = True, sha1_sign: bool = False) -> Tuple[bytes, bytes]: """ Creates a replacement x509 certificate and key, copying details from an existing x509 certificate. """ old_x509 = x509.load_pem_x509_certificate(certbytes, default_backend()) private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend()) csr = x509.CertificateSigningRequestBuilder().subject_name(old_x509.subject).sign(private_key, hashes.SHA256(), default_backend()) builder = ( x509.CertificateBuilder() .subject_name(csr.subject) .issuer_name(ca_certificate.subject) .public_key(csr.public_key()) .serial_number(old_x509.serial_number) .not_valid_before(old_x509.not_valid_before) .not_valid_after(old_x509.not_valid_after) ) if node_extensions: builder = builder.add_extension( models.Certificate.key_usage(), True, ).add_extension( x509.ExtendedKeyUsage([x509.ExtendedKeyUsageOID.CLIENT_AUTH]), False, ) if sha1_sign: algorithm = hashes.SHA1() else: algorithm = hashes.SHA256() x509_cert = builder.sign(private_key=ca_private_key, algorithm=algorithm, backend=default_backend()) private_key_bytes = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption() ) certificate_bytes = x509_cert.public_bytes( encoding=serialization.Encoding.PEM, ) return certificate_bytes, private_key_bytes def is_sha1_signed(certbytes: bytes) -> bool: x509_cert = x509.load_pem_x509_certificate(certbytes, default_backend()) return isinstance(x509_cert.signature_hash_algorithm, hashes.SHA1) def sha1_signed_node_certs(dbsession: Session, ignore_sign_alg: bool = False) -> List[models.Node]: node_certs = dbsession.query(models.Certificate).filter(models.Certificate.common_name.like('nodes-%')).filter(models.Certificate.revoke_time == None) if ignore_sign_alg: return node_certs.all() else: return [cert for cert in node_certs if is_sha1_signed(cert.certificate)] def write_upgrade_override_files(dbsession, root_private_key): success = True with open(UPGRADE_OVERRIDE_FILE, 'w') as override_file: override_file.write(f"Upgrade override file written by {THIS_FILE}") print(f"Created upgrade override file '{UPGRADE_OVERRIDE_FILE}' to unblock upgrading the Lighthouse.") secondaries = ( dbsession.query(models.LighthouseConfiguration) .filter(models.LighthouseConfiguration.role == models.constants.LighthouseConfigurationRoleEnum.Secondary) .all() ) for secondary in secondaries: lhvpn_address = str(secondary.instance_lhvpn_address) try: print(f"Attempting to create upgrade override file on secondary: {secondary.uuid} at {lhvpn_address} ... ", end='', flush=True) with paramiko.SSHClient() as client: client.load_system_host_keys(FILE_LH_KNOWN_NODES) client.connect(lhvpn_address, username="root", pkey=root_private_key, timeout=20) stdin, stdout, _ = client.exec_command(f"echo 'Upgrade override file written from the primary instance by {THIS_FILE}' > {UPGRADE_OVERRIDE_FILE}", timeout=20) output = stdout.readlines() print("OK") except Exception: success = False print("\033[0;31mFAIL\033[0m ") print(f"\033[0;31m Failed to create the upgrade override file on secondary: {secondary.uuid}.\033[0m\n") continue return success def main(args=None) -> int: parser = argparse.ArgumentParser( description=( "Check for old SHA1-signed x509 certificates in Lighthouse " "and replaces them with SHA256-signed equivalents. " "Only Lighthouse VPN and node client certificates are handled." ) ) lipy.cli.common.add_common_cli_arguments(parser) parser.add_argument( "--dry-run", dest="dry_run", action="store_true", help="Dry Run (makes no changes to Lighthouse or nodes)", ) parser.add_argument( "--server-only", dest="server_only", action="store_true", help="Only consider LHVPN server certificate (ignore node certificates)", ) parser.add_argument( "--sha1-sign", dest="sha1_sign", action="store_true", help="Use SHA1 to sign new certificates (FOR TESTING ONLY)", ) parser.add_argument( "--ignore-signature-algorithm", dest="ignore_sign_alg", action="store_true", help="Ignore signature algorithm of existing certificates (FOR TESTING ONLY)", ) parser.add_argument( "--ignore-version", dest="ignore_version", action="store_true", help="Ignore Lighthouse version check", ) parsed_args = parser.parse_args(sys.argv[1:]) session_factory, loader = lipy.cli.common.get_session_factory_and_loader(parsed_args) with open("/etc/version") as version_file: lighthouse_version = version_file.read() if not (parsed_args.ignore_version or lighthouse_version.startswith("23.04.")): print("This script should be run on Lighthouse version 23.04") return 1 with models.get_wrapped_session(session_factory) as dbsession: root = dbsession.query(models.Root).one() lighthouse_names = [lh.system_lighthouse_name for lh in dbsession.query(models.LighthouseConfiguration).all()] root_user = dbsession.query(models.User).filter(models.User.username == "root").one() root_private_key = paramiko.rsakey.RSAKey.from_private_key(io.StringIO(root_user.ssh_keys_rsa_private.decode("utf-8"))) ca_private_key = serialization.load_pem_private_key( root.services_ca_key, password=None, backend=default_backend(), ) ca_certificate = x509.load_pem_x509_certificate(root.services_ca_certificate, default_backend()) changes = 0 if not parsed_args.server_only: for cert in sha1_signed_node_certs(dbsession, parsed_args.ignore_sign_alg): node = dbsession.query(models.Node).filter_by(id=models.Node.uuid_to_id(cert.common_name)).first() lhvpn_address = str(node.current_lhvpn_address.address) print(f"Node {cert.common_name} certificate requires update") if parsed_args.dry_run: print(" Dry run (no change made to node)\n") continue certificate_bytes, private_key_bytes = renew_x509(ca_certificate, ca_private_key, cert.certificate, sha1_sign=parsed_args.sha1_sign) cert.certificate = certificate_bytes cert.private_key = private_key_bytes output = [] try: with paramiko.SSHClient() as client: client.load_system_host_keys(FILE_LH_KNOWN_NODES) client.connect(lhvpn_address, username="root", pkey=root_private_key, timeout=20) stdin, stdout, _ = client.exec_command("lua -", timeout=20) stdin.channel.send(lua_code(cert, lighthouse_names, node.is_next_gen_opengear)) stdin.channel.shutdown_write() output = stdout.readlines() except Exception: print("\033[0;31m Failed to update node. The node certificate will remain unchanged.\033[0m\n") dbsession.expire(cert) continue changes = changes + 1 dbsession.commit() print(" Node certificate updated") print("\033[1;34m ", end='') print(" ".join(output)) print("\033[0m") if parsed_args.ignore_sign_alg or is_sha1_signed(root.services_lhvpn_server_certificate): print("LHVPN server certificate requires update") if parsed_args.dry_run: print(" Dry run (no change made to LHVPN server certificate)\n") else: certificate_bytes, private_key_bytes = renew_x509( ca_certificate, ca_private_key, root.services_lhvpn_server_certificate, node_extensions=False, sha1_sign=parsed_args.sha1_sign ) root.services_lhvpn_server_certificate = certificate_bytes root.services_lhvpn_server_certificate_key = private_key_bytes changes = changes + 1 dbsession.commit() print(" LHVPN server certificate updated") print(f"\n{changes} certificate{'s'*(changes!=1)} updated") # Only create the upgrade override file if the options used allow it. if (not parsed_args.server_only) and (not parsed_args.sha1_sign) and (not parsed_args.ignore_sign_alg) and changes > 0: if not write_upgrade_override_files(dbsession, root_private_key): print("\033[0;31mERROR:\033[0m the upgrade override file could not be created on one or more secondaries.") print("Do NOT upgrade the primary lighthouse until this problem is manually resolved.\n") print("The file can be created on a secondary by running the command below:") print(f"\n\ttouch {UPGRADE_OVERRIDE_FILE}\n") else: print(f"INFO: upgrade override file NOT created.") return 0 if __name__ == "__main__": sys.exit(main())