用户要配置自动化监控:证书到期提前预警,就得构建完整的SSL证书监控预警系统,确保在证书过期前及时发现并处理。下面就是具体步骤和架构。

一、核心监控架构设计

1.  分层监控体系

text

┌─────────────────────────────────────┐

│                  监控可视化层                                │

    ┌─────────┐    ┌─────────┐                    │

│    │  Grafana  │    │  Kibana    │                    │

│    └─────────┘    └─────────┘                    │

├─────────────────────────────────────┤

│                  监控分析层                                    │

│    ┌─────────┐    ┌─────────┐                    │

│    │Prometheus│  │  ELK  Stack│                  │

│    └─────────┘    └─────────┘                    │

├─────────────────────────────────────┤

│                  数据采集层                                    │

│    ┌─────────┐    ┌─────────┐                    │

│    │cert-exporter│  │黑盒监控│                  │

│    └─────────┘    └─────────┘                    │

├─────────────────────────────────────┤

│                  证书源层                                        │

│    ┌─────────┐    ┌─────────┐                    │

│    │Let'sEncrypt│  │  自签CA  │                    │

│    └─────────┘    └─────────┘                    │

└─────────────────────────────────────┘

二、Prometheus  +  cert-exporter方案

1.  部署cert-exporter

yaml

cert-exporter.yaml

apiVersion:  apps/v1

kind:  Deployment

metadata:

    name:  cert-exporter

    namespace:  monitoring

    labels:

        app:  cert-exporter

spec:

    replicas:  2

    selector:

        matchLabels:

            app:  cert-exporter

    template:

        metadata:

            labels:

                app:  cert-exporter

            annotations:

                prometheus.io/scrape:  "true"

                prometheus.io/port:  "9117"

        spec:

            containers:

            -  name:  cert-exporter

                image:  enix/cert-exporter:latest

                ports:

                -  containerPort:  9117

                    name:  metrics

                env:

                -  name:  CERT_FILES

                    value:  "/certs/*.crt,/certs/*.pem"

                -  name:  CERT_DIRECTORIES

                    value:  "/etc/ssl/certs"

                -  name:  WATCH_NAMESPACES

                    value:  "default,production"

                volumeMounts:

                -  name:  certs

                    mountPath:  /certs

                    readOnly:  true

                -  name:  ssl-certs

                    mountPath:  /etc/ssl/certs

                    readOnly:  true

                resources:

                    requests:

                        memory:  "64Mi"

                        cpu:  "100m"

                    limits:

                        memory:  "128Mi"

                        cpu:  "200m"

            volumes:

            -  name:  certs

                hostPath:

                    path:  /etc/ssl/certs

                    type:  Directory

            -  name:  ssl-certs

                hostPath:

                    path:  /etc/ssl

                    type:  Directory

---

apiVersion:  v1

kind:  Service

metadata:

    name:  cert-exporter

    namespace:  monitoring

    labels:

        app:  cert-exporter

spec:

    ports:

    -  port:  9117

        targetPort:  9117

        name:  metrics

    selector:

        app:  cert-exporter

    type:  ClusterIP

2.  Prometheus配置

yaml

prometheus-cert-rules.yaml

apiVersion:  monitoring.coreos.com/v1

kind:  PrometheusRule

metadata:

    name:  certificate-alerts

    namespace:  monitoring

spec:

    groups:

    -  name:  certificate.rules

        interval:  5m

        rules:

        -  record:  certificate_expiry_days

            expr:  |

                time()  -  ssl_certificate_not_after{job="cert-exporter"}

                

        告警规则

        -  alert:  CertificateExpiresSoon

            expr:  |

                ssl_certificate_not_after{job="cert-exporter"}  -  time()  <  86400  *  30

            for:  5m

            labels:

                severity:  warning

                category:  certificate

            annotations:

                summary:  "证书即将过期  ({{  $labels.instance  }})"

                description:  |

                    证书  {{  $labels.subject  }}  将在  {{  $value  |  humanizeDuration  }}  后过期。

                    颁发者:  {{  $labels.issuer  }}

                    域名:  {{  $labels.dns_names  }}

                runbook_url:  "https://wiki.example.com/certificate-renewal"

                

        -  alert:  CertificateExpiresCritical

            expr:  |

                ssl_certificate_not_after{job="cert-exporter"}  -  time()  <  86400  *  7

            for:  2m

            labels:

                severity:  critical

                category:  certificate

            annotations:

                summary:  "证书即将过期!  ({{  $labels.instance  }})"

                description:  |

                    紧急!证书  {{  $labels.subject  }}  将在  {{  $value  |  humanizeDuration  }}  后过期。

                slack_channel:  "#alerts-critical"

                

        -  alert:  CertificateExpired

            expr:  |

                ssl_certificate_not_after{job="cert-exporter"}  -  time()  <=  0

            for:  1m

            labels:

                severity:  critical

                category:  certificate

            annotations:

                summary:  "证书已过期!  ({{  $labels.instance  }})"

                description:  |

                    证书  {{  $labels.subject  }}  已过期!

                    必须立即处理!

                pagerduty_key:  "certificate-expired"

                

        -  alert:  CertificateChainIncomplete

            expr:  |

                ssl_certificate_chain_info{job="cert-exporter"}  ==  0

            for:  10m

            labels:

                severity:  warning

                category:  certificate

            annotations:

                summary:  "证书链不完整  ({{  $labels.instance  }})"

                

        -  alert:  WeakCertificateAlgorithm

            expr:  |

                ssl_certificate_info{job="cert-exporter",signature_algorithm=~"sha1.*|md5.*"}

            for:  5m

            labels:

                severity:  warning

            annotations:

                summary:  "使用弱签名算法的证书  ({{  $labels.instance  }})"

                description:  |

                    证书  {{  $labels.subject  }}  使用了不安全的签名算法  {{  $labels.signature_algorithm  }}

三、多维度监控脚本

1.  综合监控脚本

python

!/usr/bin/env  python3

证书综合监控脚本

支持多种证书源:文件系统、Kubernetes  Secrets、Let's  Encrypt、Vault等

import  os

import  sys

import  json

import  ssl

import  socket

import  smtplib

import  logging

from  datetime  import  datetime,  timedelta

from  pathlib  import  Path

from  typing  import  Dict,  List,  Tuple,  Optional

from  dataclasses  import  dataclass

import  yaml

from  kubernetes  import  client,  config

import  requests

from  email.mime.text  import  MIMEText

from  email.mime.multipart  import  MIMEMultipart

配置日志

logging.basicConfig(

        level=logging.INFO,

        format='%(asctime)s  -  %(name)s  -  %(levelname)s  -  %(message)s',

        handlers=[

                logging.FileHandler('/var/log/cert-monitor.log'),

                logging.StreamHandler(sys.stdout)

        ]

)

logger  =  logging.getLogger(__name__)


@dataclass

class  CertificateInfo:

        """证书信息"""

        subject:  str

        issuer:  str

        sans:  List[str]

        not_before:  datetime

        not_after:  datetime

        key_size:  int

        signature_algorithm:  str

        source:  str

        location:  str

        days_remaining:  int


class  CertificateMonitor:

        def  __init__(self,  config_path:  str  =  '/etc/cert-monitor/config.yaml'):

                self.config  =  self.load_config(config_path)

                self.certificates:  List[CertificateInfo]  =  []

                self.alerts:  List[Dict]  =  []

                

        def  load_config(self,  config_path:  str)  ->  Dict:

                加载配置文件

                with  open(config_path,  'r')  as  f:

                        return  yaml.safe_load(f)

        

        def  discover_certificates(self):

                发现所有需要监控的证书

                logger.info("开始证书发现...")

                1.  文件系统证书

                for  path  in  self.config.get('filesystem_paths',  []):

                        self.scan_filesystem(Path(path))

                2.  Kubernetes  Secrets

                if  self.config.get('kubernetes',  {}).get('enabled',  False):

                        self.scan_kubernetes_secrets()

                3.  Let's  Encrypt  certificates

                if  self.config.get('letsencrypt',  {}).get('enabled',  False):

                        self.scan_letsencrypt()

                4.  HTTP/HTTPS  endpoints

                for  endpoint  in  self.config.get('endpoints',  []):

                        self.check_endpoint_certificate(endpoint)

                logger.info(f"发现  {len(self.certificates)}  个证书")

                def  scan_filesystem(self,  base_path:  Path):

                扫描文件系统中的证书

                try:

                        for  cert_file  in  base_path.rglob('*.pem'):

                                self.process_cert_file(cert_file,  'filesystem')

                        for  cert_file  in  base_path.rglob('*.crt'):

                                self.process_cert_file(cert_file,  'filesystem')

                except  Exception  as  e:

                        logger.error(f"扫描文件系统失败  {base_path}:  {e}")

               def  scan_kubernetes_secrets(self):

                扫描Kubernetes  Secrets中的证书

                try:

                        加载kubeconfig

                        config.load_incluster_config()

                        v1  =  client.CoreV1Api()

                        获取所有命名空间的TLS类型Secret

                        namespaces  =  self.config['kubernetes'].get('namespaces',  ['default'])

                        for  namespace  in  namespaces:

                                secrets  =  v1.list_namespaced_secret(

                                        namespace=namespace,

                                        label_selector=self.config['kubernetes'].get('label_selector',  '')

                                )

                                

                                for  secret  in  secrets.items:

                                        if  secret.type  ==  'kubernetes.io/tls':

                                                self.process_kubernetes_secret(secret,  namespace)

                                                

                except  Exception  as  e:

                        logger.error(f"扫描Kubernetes证书失败:  {e}")

        

        def  process_cert_file(self,  cert_path:  Path,  source:  str):

                处理单个证书文件

                try:

                        with  open(cert_path,  'rb')  as  f:

                                cert_data  =  f.read()

                        

                        解析证书

                        cert  =  ssl.PEM_cert_to_DER_cert(cert_data.decode('utf-8'))

                        x509  =  ssl.DER_cert_to_PEM_cert(cert)

                        

                        创建SSL上下文解析证书

                        context  =  ssl.create_default_context()

                        cert_info  =  ssl._ssl._test_decode_cert(cert_path)

                        

                        cert_info  =  CertificateInfo(

                                subject=cert_info['subject'],

                                issuer=cert_info['issuer'],

                                sans=cert_info.get('subjectAltName',  []),

                                not_before=datetime.strptime(cert_info['notBefore'],  '%b  %d  %H:%M:%S  %Y  %Z'),

                                not_after=datetime.strptime(cert_info['notAfter'],  '%b  %d  %H:%M:%S  %Y  %Z'),

                                key_size=cert_info.get('keySize',  0),

                                signature_algorithm=cert_info['signatureAlgorithm'],

                                source=source,

                                location=str(cert_path),

                                days_remaining=(datetime.strptime(cert_info['notAfter'],  '%b  %d  %H:%M:%S  %Y  %Z')  -  datetime.now()).days

                        )

                        

                        self.certificates.append(cert_info)

                        

                except  Exception  as  e:

                        logger.warning(f"无法解析证书文件  {cert_path}:  {e}")

        

        def  check_endpoint_certificate(self,  endpoint:  Dict):

                检查HTTP/HTTPS端点的证书

                try:

                        hostname  =  endpoint['hostname']

                        port  =  endpoint.get('port',  443)

                        

                        context  =  ssl.create_default_context()

                        

                        with  socket.create_connection((hostname,  port),  timeout=10)  as  sock:

                                with  context.wrap_socket(sock,  server_hostname=hostname)  as  ssock:

                                        cert  =  ssock.getpeercert()

                                        

                                        cert_info  =  CertificateInfo(

                                                subject=str(cert.get('subject',  [])),

                                                issuer=str(cert.get('issuer',  [])),

                                                sans=self.extract_sans(cert),

                                                not_before=self.parse_cert_time(cert['notBefore']),

                                                not_after=self.parse_cert_time(cert['notAfter']),

                                                key_size=self.get_key_size(cert),

                                                signature_algorithm=cert.get('signatureAlgorithm',  ''),

                                                source='endpoint',

                                                location=f"{hostname}:{port}",

                                                days_remaining=(self.parse_cert_time(cert['notAfter'])  -  datetime.now()).days

                                        )

                                        

                                        self.certificates.append(cert_info)

                                        

                except  Exception  as  e:

                        logger.error(f"检查端点证书失败  {endpoint.get('hostname')}:  {e}")

        

        def  evaluate_certificates(self):

                评估所有证书状态

                logger.info("评估证书状态...")

                

                for  cert  in  self.certificates:

                        检查过期时间

                        if  cert.days_remaining  <  self.config['alerts']['critical_days']:

                                self.create_alert(

                                        cert,  

                                        'critical',  

                                        f"证书将在  {cert.days_remaining}  天后过期"

                                )

                        elif  cert.days_remaining  <  self.config['alerts']['warning_days']:

                                self.create_alert(

                                        cert,  

                                        'warning',  

                                        f"证书将在  {cert.days_remaining}  天后过期"

                                )

                        

                        检查密钥长度

                        if  cert.key_size  <  2048:

                                self.create_alert(

                                        cert,

                                        'warning',

                                        f"证书密钥长度不足  ({cert.key_size}  <  2048)"

                                )

                        

                        检查签名算法

                        if  'sha1'  in  cert.signature_algorithm.lower():

                                self.create_alert(

                                        cert,

                                        'critical',

                                        "使用不安全的SHA1签名算法"

                                )

        

        def  create_alert(self,  cert:  CertificateInfo,  severity:  str,  message:  str):

                创建告警

                alert  =  {

                        'timestamp':  datetime.now().isoformat(),

                        'severity':  severity,

                        'certificate':  cert.subject,

                        'location':  cert.location,

                        'source':  cert.source,

                        'expires_in_days':  cert.days_remaining,

                        'expiry_date':  cert.not_after.isoformat(),

                        'message':  message,

                        'sans':  cert.sans,

                        'issuer':  cert.issuer

                }

                

                self.alerts.append(alert)

                logger.warning(f"{severity.upper()}:  {cert.subject}  -  {message}")

        

        def  send_alerts(self):

                发送告警通知

                if  not  self.alerts:

                        logger.info("没有需要发送的告警")

                        return

                分组告警

                critical_alerts  =  [a  for  a  in  self.alerts  if  a['severity']  ==  'critical']

                warning_alerts  =  [a  for  a  in  self.alerts  if  a['severity']  ==  'warning']

                发送邮件

                if  self.config['notifications'].get('email',  {}).get('enabled',  False):

                        self.send_email_alerts(critical_alerts,  warning_alerts)

                发送Slack通知

                if  self.config['notifications'].get('slack',  {}).get('enabled',  False):

                        self.send_slack_alerts(critical_alerts,  warning_alerts)

                        发送Webhook

                if  self.config['notifications'].get('webhook',  {}).get('enabled',  False):

                        self.send_webhook_alerts()

            def  send_email_alerts(self,  critical_alerts:  List,  warning_alerts:  List):

                发送邮件告警

                try:

                        smtp_config  =  self.config['notifications']['email']

                        msg  =  MIMEMultipart('alternative')

                        msg['Subject']  =  f"证书监控告警  -  {len(critical_alerts)}个严重,  {len(warning_alerts)}个警告"

                        msg['From']  =  smtp_config['from']

                        msg['To']  =  ',  '.join(smtp_config['to'])

                        创建HTML内容

                        html  =  self.create_html_report(critical_alerts,  warning_alerts)

                        msg.attach(MIMEText(html,  'html'))

                        发送邮件

                        with  smtplib.SMTP(smtp_config['smtp_server'],  smtp_config['smtp_port'])  as  server:

                                if  smtp_config.get('tls',  False):

                                        server.starttls()

                                if  smtp_config.get('username'):

                                        server.login(smtp_config['username'],  smtp_config['password'])

                                server.send_message(msg)

                        logger.info("邮件告警已发送")

                        except  Exception  as  e:

                        logger.error(f"发送邮件告警失败:  {e}")

              def  create_html_report(self,  critical_alerts:  List,  warning_alerts:  List)  ->  str:

                创建HTML报告

                html  =  f"""

                <html>

                <head>

                        <style>

                                body  {{  font-family:  Arial,  sans-serif;  }}

                                .critical  {{  color:  #d9534f;  }}

                                .warning  {{  color:  #f0ad4e;  }}

                                table  {{  border-collapse:  collapse;  width:  100%;  }}

                                th,  td  {{  border:  1px  solid  #ddd;  padding:  8px;  text-align:  left;  }}

                                th  {{  background-color:  #f2f2f2;  }}

                                .badge  {{  padding:  3px  8px;  border-radius:  3px;  font-size:  12px;  }}

                                .badge-critical  {{  background-color:  #d9534f;  color:  white;  }}

                                .badge-warning  {{  background-color:  #f0ad4e;  color:  white;  }}

                        </style>

                </head>

                <body>

                        <h2>证书监控报告</h2>

                        <p>生成时间:  {datetime.now().strftime('%Y-%m-%d  %H:%M:%S')}</p>

                        <h3>摘要</h3>

                        <p>

                                <span  class="badge  badge-critical">严重告警:  {len(critical_alerts)}</span>

                                <span  class="badge  badge-warning">警告告警:  {len(warning_alerts)}</span>

                        </p>

                """

                if  critical_alerts:

                        html  +=  """

                        <h3>严重告警</h3>

                        <table>

                                <tr>

                                        <th>证书</th>

                                        <th>位置</th>

                                        <th>过期时间</th>

                                        <th>剩余天数</th>

                                        <th>问题描述</th>

                                </tr>

                        """

                        for  alert  in  critical_alerts:

                                html  +=  f"""

                                <tr  class="critical">

                                        <td>{alert['certificate']}</td>

                                        <td>{alert['location']}</td>

                                        <td>{alert['expiry_date']}</td>

                                        <td>{alert['expires_in_days']}  天</td>

                                        <td>{alert['message']}</td>

                                </tr>

                                """

                        html  +=  "</table>"

                if  warning_alerts:

                        html  +=  """

                        <h3>⚠️  警告告警</h3>

                        <table>

                                <tr>

                                        <th>证书</th>

                                        <th>位置</th>

                                        <th>过期时间</th>

                                        <th>剩余天数</th>

                                        <th>问题描述</th>

                                </tr>

                        """

                        for  alert  in  warning_alerts:

                                html  +=  f"""

                                <tr  class="warning">

                                        <td>{alert['certificate']}</td>

                                        <td>{alert['location']}</td>

                                        <td>{alert['expiry_date']}</td>

                                        <td>{alert['expires_in_days']}  天</td>

                                        <td>{alert['message']}</td>

                                </tr>

                                """

                        html  +=  "</table>"

                

                html  +=  """

                </body>

                </html>

                ""

                return  html

        

        def  generate_metrics(self):

                生成Prometheus格式的指标

                metrics  =  []

                for  cert  in  self.certificates:

                        labels  =  {

                                'subject':  cert.subject,

                                'issuer':  cert.issuer,

                                'source':  cert.source,

                                'location':  cert.location

                        }

                        过期时间戳

                        metrics.append(

                                f'ssl_certificate_expiry_timestamp{{{self.format_labels(labels)}}}  '

                                f'{int(cert.not_after.timestamp())}'

                        )

                        剩余天数

                        metrics.append(

                                f'ssl_certificate_days_remaining{{{self.format_labels(labels)}}}  '

                                f'{cert.days_remaining}'

                        )

                        密钥长度

                        metrics.append(

                                f'ssl_certificate_key_size{{{self.format_labels(labels)}}}  '

                                f'{cert.key_size}'

                        )

                写入指标文件

                metrics_file  =  self.config.get('metrics_file',  '/var/lib/node-exporter/cert-metrics.prom')

                with  open(metrics_file,  'w')  as  f:

                        f.write('\n'.join(metrics))

                logger.info(f"指标已写入  {metrics_file}")

        

        def  format_labels(self,  labels:  Dict)  ->  str:

                格式化Prometheus标签

                return  ',  '.join([f'{k}="{v}"'  for  k,  v  in  labels.items()])

                def  run(self):

                运行监控

                logger.info("启动证书监控...")

                self.discover_certificates()

                self.evaluate_certificates()

                self.generate_metrics()

                if  self.alerts:

                        self.send_alerts()

                logger.info("证书监控完成")

if  __name__  ==  "__main__":

        monitor  =  CertificateMonitor()

        monitor.run()

2.  配置文件

yaml

  /etc/cert-monitor/config.yaml

证书监控配置文件

监控间隔(秒)

check_interval:  3600

证书发现配置

discovery:

    filesystem_paths:

        -  /etc/ssl/certs

        -  /etc/letsencrypt/live

        -  /docker/certs

        -  /usr/local/share/ca-certificates

        kubernetes:

        enabled:  true

        namespaces:

            -  default

            -  production

            -  staging

        label_selector:  ""

        letsencrypt:

        enabled:  true

        path:  /etc/letsencrypt

        endpoints:

        -  hostname:  "example.com"

            port:  443

        -  hostname:  "api.example.com"

            port:  443

        -  hostname:  "www.example.com"

            port:  443

告警阈值

alerts:

    warning_days:  30        #  提前30天警告

    critical_days:  7        #  提前7天严重告警

    expired_critical:  true

通知配置

notifications:

    email:

        enabled:  true

        smtp_server:  "smtp.example.com"

        smtp_port:  587

        tls:  true

        username:  "alert@example.com"

        password:  "your-password"

        from:  "cert-monitor@example.com"

        to:

            -  "admin@example.com"

            -  "devops@example.com"

        slack:

        enabled:  true

        webhook_url:  "https://hooks.slack.com/services/XXX/XXX/XXX"

        channel:  "#alerts"

        username:  "证书监控"

        icon_emoji:  ":shield:"

        webhook:

        enabled:  false

        url:  "https://alert-manager.example.com/api/alerts"

        timeout:  10

        pagerduty:

        enabled:  false

        integration_key:  "your-pagerduty-key"

    

    telegram:

        enabled:  false

        bot_token:  "your-bot-token"

        chat_id:  "your-chat-id"

指标导出

metrics:

    enabled:  true

    file:  "/var/lib/node-exporter/cert-metrics.prom"

    port:  9100

日志配置

logging:

    level:  "INFO"

    file:  "/var/log/cert-monitor.log"

    max_size_mb:  100

    backup_count:  10

四、Blackbox黑盒监控

1.  Blackbox  Exporter配置

yaml

blackbox-certificate-check.yaml

apiVersion:  v1

kind:  ConfigMap

metadata:

    name:  blackbox-exporter-cert-config

    namespace:  monitoring

data:

    blackbox.yaml:  |

        modules:

            http_2xx:

                prober:  http

                timeout:  10s

                http:

                    valid_http_versions:  ["HTTP/1.1",  "HTTP/2.0"]

                    valid_status_codes:  [200]

                    no_follow_redirects:  false

                    fail_if_ssl:  false

                    fail_if_not_ssl:  true

                    tls_config:

                        insecure_skip_verify:  false

                        server_name:  ""

                    preferred_ip_protocol:  "ip4"

                    ip_protocol_fallback:  false

            ssl_certificate:

                prober:  http

                timeout:  15s

                http:

                    fail_if_not_ssl:  true

                    tls_config:

                        insecure_skip_verify:  false

                        server_name:  ""

                    preferred_ip_protocol:  "ip4"

                    ip_protocol_fallback:  false

                ssl_certificate:

                    days_until_expiry_warning:  30

                    days_until_expiry_critical:  7

                    check_chain:  true

                tcp_ssl_certificate:

                prober:  tcp

                timeout:  15s

                tcp:

                    tls:  true

                    tls_config:

                        insecure_skip_verify:  false

                        server_name:  ""

                    query_response:

                    -  expect:  "^.*"

                ssl_certificate:

                    days_until_expiry_warning:  30

                    days_until_expiry_critical:  7

                    check_chain:  true

2.  SSL证书检查ServiceMonitor

yaml

ssl-service-monitor.yaml

apiVersion:  monitoring.coreos.com/v1

kind:  ServiceMonitor

metadata:

    name:  ssl-certificate-monitor

    namespace:  monitoring

    labels:

        team:  platform

spec:

    selector:

        matchLabels:

            app:  ssl-endpoints

    endpoints:

    -  port:  https

        interval:  5m

        path:  /probe

        params:

            module:  [ssl_certificate]

            target:  

                -  example.com:443

                -  api.example.com:443

                -  www.example.com:443

        relabelings:

        -  sourceLabels:  [__param_target]

            targetLabel:  instance

        -  sourceLabels:  [__param_module]

            targetLabel:  module

        -  targetLabel:  __address__

            replacement:  blackbox-exporter.monitoring.svc.cluster.com:9115

五、Alertmanager告警路由

1.  Alertmanager配置

yaml

alertmanager-certificate.yaml

apiVersion:  v1

kind:  ConfigMap

metadata:

    name:  alertmanager-config

    namespace:  monitoring

data:

    alertmanager.yml:  |

        global:

            smtp_smarthost:  'smtp.example.com:587'

            smtp_from:  'alerts@example.com'

            smtp_auth_username:  'alert@example.com'

            smtp_auth_password:  'password'

            slack_api_url:  'https://hooks.slack.com/services/XXX/XXX/XXX'

        route:

            group_by:  ['alertname',  'cluster',  'service']

            group_wait:  30s

            group_interval:  5m

            repeat_interval:  12h

            receiver:  'default-receiver'

            routes:

            -  match:

                    severity:  critical

                    category:  certificate

                receiver:  'certificate-critical'

                group_wait:  10s

                repeat_interval:  5m

                continue:  true

            -  match:

                    severity:  warning

                    category:  certificate

                receiver:  'certificate-warning'

                group_wait:  30s

                repeat_interval:  1h

                continue:  true

              receivers:

        -  name:  'default-receiver'

            email_configs:

            -  to:  'alerts@example.com'

            -  name:  'certificate-critical'

            email_configs:

            -  to:  'cert-admin@example.com'

                headers:

                    subject:  '[CRITICAL]  证书告警:  {{  .GroupLabels.alertname  }}'

            slack_configs:

            -  channel:  'certificate-alerts'

                title:  '证书紧急告警'

                text:  |

                    {{  range  .Alerts  }}

                    告警:  {{  .Annotations.summary  }}

                    描述:  {{  .Annotations.description  }}

                    证书:*{{  .Labels.subject  }}

                    过期时间:{{  .Labels.expiry_date  }}

                    剩余天数:  {{  .Labels.days_remaining  }}

                    {{  end  }}

            pagerduty_configs:

            -  service_key:  'your-pagerduty-key'

                description:  '证书过期告警'

            -  name:  'certificate-warning'

            email_configs:

            -  to:  'cert-notify@example.com'

                headers:

                    subject:  '[WARNING]  证书提醒:  {{  .GroupLabels.alertname  }}'

            slack_configs:

            -  channel:  'certificate-notifications'

                title:  '证书提醒'

                text:  |

                    {{  range  .Alerts  }}

                    提醒:  {{  .Annotations.summary  }}

                    证书:  {{  .Labels.subject  }}

                    剩余天数:  {{  .Labels.days_remaining  }}

                    {{  end  }}

            webhook_configs:

            -  url:  'https://api.example.com/certificate-alerts'

                send_resolved:  true

2.  告警模板

yaml

alert-templates.yaml

apiVersion:  v1

kind:  ConfigMap

metadata:

    name:  alertmanager-templates

    namespace:  monitoring

data:

    certificate.tmpl:  |

        {{  define  "certificate.email.subject"  }}

        {{-  if  eq  .Status  "firing"  -}}

        [{{  .Status  |  toUpper  }}]  {{  .CommonLabels.severity  |  toUpper  }}  证书告警:  {{  .CommonLabels.alertname  }}

        {{-  else  -}}

        [RESOLVED]  证书告警恢复:  {{  .CommonLabels.alertname  }}

        {{-  end  }}

        {{  end  }}

        {{  define  "certificate.email.html"  }}

        <!DOCTYPE  html>

        <html>

        <head>

            <style>

                body  {  font-family:  Arial,  sans-serif;  }

                .critical  {  color:  #d9534f;  }

                .warning  {  color:  #f0ad4e;  }

                .resolved  {  color:  #5cb85c;  }

                table  {  border-collapse:  collapse;  width:  100%;  }

                th,  td  {  border:  1px  solid  #ddd;  padding:  8px;  }

                th  {  background-color:  #f2f2f2;  }

            </style>

        </head>

        <body>

            <h2  class="{{  .CommonLabels.severity  }}">

                {{  if  eq  .Status  "firing"  }}

                    证书告警

                {{  else  }}

                    证书告警恢复

                {{  end  }}

            </h2>

            <h3>告警详情</h3>

            <table>

                <tr><th>告警名称</th><td>{{  .CommonLabels.alertname  }}</td></tr>

                <tr><th>严重程度</th><td>{{  .CommonLabels.severity  }}</td></tr>

                <tr><th>证书主题</th><td>{{  .CommonLabels.subject  }}</td></tr>

                <tr><th>颁发者</th><td>{{  .CommonLabels.issuer  }}</td></tr>

                <tr><th>过期时间</th><td>{{  .CommonLabels.expiry_date  }}</td></tr>

                <tr><th>剩余天数</th><td>{{  .CommonLabels.days_remaining  }}</td></tr>

                <tr><th>发生时间</th><td>{{  .StartsAt  }}</td></tr>

                {{  if  eq  .Status  "resolved"  }}

                <tr><th>恢复时间</th><td>{{  .EndsAt  }}</td></tr>

                {{  end  }}

            </table>

              {{  if  .Annotations.runbook_url  }}

            <h3>处理指南</h3>

            <p>请参考:  <a  href="{{  .Annotations.runbook_url  }}">{{  .Annotations.runbook_url  }}</a></p>

            {{  end  }}

            <h3>受影响域名</h3>

            <ul>

                {{  range  split  .CommonLabels.dns_names  ","  }}

                <li>{{  .  }}</li>

                {{  end  }}

            </ul>

        </body>

        </html>

        {{  end  }}

六、Grafana监控面板

1.  Grafana仪表板JSON

json

{

    "dashboard":  {

        "title":  "SSL/TLS证书监控",

        "description":  "实时监控所有证书状态和过期时间",

        "panels":  [

            {

                "title":  "证书过期时间分布",

                "type":  "stat",

                "targets":  [{

                    "expr":  "count  by  (days_remaining_range)  (label_replace(label_replace(ssl_certificate_days_remaining  >  0,  \"days_remaining_range\",  \"90+\",  \"\",  \".*\")  or  label_replace(ssl_certificate_days_remaining  <=  90,  \"days_remaining_range\",  \"60-90\",  \"\",  \".*\")  or  label_replace(ssl_certificate_days_remaining  <=  60,  \"days_remaining_range\",  \"30-60\",  \"\",  \".*\")  or  label_replace(ssl_certificate_days_remaining  <=  30,  \"days_remaining_range\",  \"7-30\",  \"\",  \".*\")  or  label_replace(ssl_certificate_days_remaining  <=  7,  \"days_remaining_range\",  \"<7\",  \"\",  \".*\")  or  label_replace(ssl_certificate_days_remaining  <=  0,  \"days_remaining_range\",  \"已过期\",  \"\",  \".*\"))",

                    "legendFormat":  "{{days_remaining_range}}"

                }],

                "fieldConfig":  {

                    "defaults":  {

                        "color":  {

                            "mode":  "thresholds"

                        },

                        "thresholds":  {

                            "steps":  [

                                {"color":  "green",  "value":  null},

                                {"color":  "#EAB839",  "value":  30},

                                {"color":  "red",  "value":  7}

                            ]

                        }

                    }

                }

            },

            {

                "title":  "按来源统计证书数量",

                "type":  "piechart",

                "targets":  [{

                    "expr":  "count  by  (source)  (ssl_certificate_days_remaining)",

                    "legendFormat":  "{{source}}"

                }]

            },

            {

                "title":  "证书过期时间线",

                "type":  "table",

                "targets":  [{

                    "expr":  "ssl_certificate_expiry_timestamp",

                    "instant":  true,

                    "format":  "table",

                    "legendFormat":  "{{subject}}"

                }],

                "transformations":  [{

                    "id":  "calculateField",

                    "options":  {

                        "mode":  "reduceRow",

                        "reduce":  {

                            "reducer":  "lastNotNull"

                        }

                    }

                }]

            }

        ]

    }

}

2.  Grafana告警规则

yaml

#  grafana-alert-rules.yaml

apiVersion:  v1

kind:  ConfigMap

metadata:

    name:  grafana-alert-rules

    namespace:  monitoring

data:

    certificate-alerts.yml:  |

        groups:

            -  name:  certificate.alerts

                interval:  1m

                rules:

                    -  alert:  CertificateExpiryWarning

                        expr:  ssl_certificate_days_remaining  <  30  and  ssl_certificate_days_remaining  >=  7

                        for:  5m

                        annotations:

                            summary:  "证书即将过期警告"

                            description:  "证书  {{  $labels.subject  }}  将在  {{  $value  }}  天后过期"

                        labels:

                            severity:  warning

                            

                    -  alert:  CertificateExpiryCritical

                        expr:  ssl_certificate_days_remaining  <  7  and  ssl_certificate_days_remaining  >  0

                        for:  2m

                        annotations:

                            summary:  "证书即将过期严重警告"

                            description:  "证书  {{  $labels.subject  }}  将在  {{  $value  }}  天后过期,请立即处理!"

                        labels:

                            severity:  critical

                            

                    -  alert:  CertificateExpired

                        expr:  ssl_certificate_days_remaining  <=  0

                        for:  1m

                        annotations:

                            summary:  "证书已过期"

                            description:  "证书  {{  $labels.subject  }}  已过期!服务可能中断。"

                        labels:

                            severity:  emergency

七、自动化响应与修复

1.  自动化续期机器人

python

!/usr/bin/env  python3

"""

证书自动续期机器人

"""

import  asyncio

import  aiohttp

from  datetime  import  datetime,  timedelta

import  subprocess

import  json

import  logging

from  typing  import  List,  Dict

class  CertificateAutoRenewer:

        def  __init__(self):

                self.config  =  self.load_config()

                self.session  =  None

        async  def  check_and_renew(self):

                """检查并续期证书"""

                logger.info("开始证书续期检查")

                获取需要续期的证书列表

                expiring_certs  =  await  self.get_expiring_certificates()

                renew_tasks  =  []

                for  cert  in  expiring_certs:

                        if  cert['days_remaining']  <=  self.config['auto_renew_threshold']:

                                task  =  asyncio.create_task(self.renew_certificate(cert))

                                renew_tasks.append(task)

                  并发续期

                if  renew_tasks:

                        results  =  await  asyncio.gather(*renew_tasks,  return_exceptions=True)

                        self.process_renewal_results(results)

              async  def  renew_certificate(self,  cert:  Dict):

                续期单个证书

                try:

                        domain  =  cert['domain']

                        logger.info(f"开始续期证书:  {domain}")

                        使用certbot续期

                        cmd  =  [

                                'certbot',  'renew',

                                '--cert-name',  domain,

                                '--non-interactive',

                                '--agree-tos',

                                '--force-renewal',

                                '--preferred-challenges',  'dns'

                        ]

                        执行续期

                        process  =  await  asyncio.create_subprocess_exec(

                                *cmd,

                                stdout=asyncio.subprocess.PIPE,

                                stderr=asyncio.subprocess.PIPE

                        )

                        stdout,  stderr  =  await  process.communicate()

                        if  process.returncode  ==  0:

                                logger.info(f"证书续期成功:  {domain}")

                                部署新证书

                                await  self.deploy_certificate(domain)

                                

                                发送成功通知

                                await  self.send_renewal_notification(cert,  success=True)

                                return  {'domain':  domain,  'success':  True}

                        else:

                                logger.error(f"证书续期失败:  {domain}  -  {stderr.decode()}")

                                发送失败通知

                                await  self.send_renewal_notification(

                                        cert,  

                                        success=False,  

                                        error=stderr.decode()

                                )

                                return  {'domain':  domain,  'success':  False,  'error':  stderr.decode()}

                        except  Exception  as  e:

                        logger.error(f"续期证书异常:  {domain}  -  {e}")

                        return  {'domain':  domain,  'success':  False,  'error':  str(e)}

            async  def  deploy_certificate(self,  domain:  str):

                部署证书到各个服务

                deployment_tasks  =  []

                部署到Nginx

                deployment_tasks.append(self.deploy_to_nginx(domain))

                部署到HAProxy

                deployment_tasks.append(self.deploy_to_haproxy(domain))

                部署到Kubernetes

                if  self.config.get('kubernetes',  {}).get('enabled',  False):

                        deployment_tasks.append(self.deploy_to_kubernetes(domain))

                并发部署

                await  asyncio.gather(*deployment_tasks)

              async  def  deploy_to_nginx(self,  domain:  str):

                部署证书到Nginx

                try:

                        复制证书文件

                        cert_src  =  f"/etc/letsencrypt/live/{domain}/fullchain.pem"

                        cert_dest  =  f"/etc/nginx/ssl/{domain}/fullchain.pem"

                        subprocess.run(['cp',  cert_src,  cert_dest],  check=True)

                        重载Nginx

                        subprocess.run(['nginx',  '-s',  'reload'],  check=True)

                        logger.info(f"证书已部署到Nginx:  {domain}")

                except  subprocess.CalledProcessError  as  e:

                        logger.error(f"部署到Nginx失败:  {domain}  -  {e}")

if  __name__  ==  "__main__":

        配置日志

        logging.basicConfig(

                level=logging.INFO,

                format='%(asctime)s  -  %(levelname)s  -  %(message)s'

        )

        logger  =  logging.getLogger(__name__)

        运行续期机器人

        renewer  =  CertificateAutoRenewer()

        asyncio.run(renewer.check_and_renew())

八、集成第三方监控平台

1.  Datadog集成

yaml

datadog-cert-check.yaml

apiVersion:  apps/v1

kind:  DaemonSet

metadata:

    name:  datadog-cert-check

spec:

    template:

        spec:

            containers:

            -  name:  datadog-cert-check

                image:  datadog/agent:latest

                env:

                -  name:  DD_API_KEY

                    value:  "your-api-key"

                -  name:  DD_SITE

                    value:  "datadoghq.com"

                command:

                -  /bin/bash

                -  -c

                -  |

                    创建自定义检查

                    cat  >  /etc/datadog-agent/conf.d/cert_check.d/conf.yaml  <<  EOF

                    instances:

                        -  name:  ssl_certificates

                            cert_paths:

                                -  /etc/ssl/certs

                                -  /etc/letsencrypt/live

                            days_warning:  30

                            days_critical:  7

                            check_certificate_chain:  true

                    EOF

                    

                    启动Agent

                    /init

2.  New  Relic集成

python

newrelic-cert-monitor.py

import  newrelic.agent

from  datetime  import  datetime

import  ssl

newrelic.agent.initialize('/etc/newrelic/newrelic.ini')

@newrelic.agent.background_task()

def  check_certificates():

        """New  Relic自定义指标"""

        certificates  =  discover_certificates()

        

        for  cert  in  certificates:

                发送自定义指标

                newrelic.agent.record_custom_metric(

                        f"Certificate/ExpiryDays/{cert.domain}",

                        cert.days_remaining

                )

                

                发送事件

                if  cert.days_remaining  <  30:

                        newrelic.agent.record_custom_event("CertificateWarning",  {

                                "domain":  cert.domain,

                                "days_remaining":  cert.days_remaining,

                                "expiry_date":  cert.not_after.isoformat(),

                                "issuer":  cert.issuer

                        })

最佳实践总结

1.  监控策略分层

text

第1层:  实时监控  (Prometheus  +  Alertmanager)

第2层:  定期扫描  (Python监控脚本)

第3层:  黑盒检查  (Blackbox  Exporter)

第4层:  人工审核  (每月报告)

2.  告警升级策略

yaml

升级策略:

    -  7天前:  邮件通知  +  Slack警告

    -  3天前:  Slack紧急通知  +  短信

    -  1天前:  电话通知  +  自动创建工单

    -  已过期:  自动切换备用证书  +  紧急会议

3.  容灾预案

bash

紧急证书切换脚本

!/bin/bash

emergency-cert-switch.sh

DOMAIN=$1

BACKUP_CERT="/backup/certs/$DOMAIN/fullchain.pem"

if  [[  !  -f  "$BACKUP_CERT"  ]];  then

        生成自签名紧急证书

        openssl  req  -x509  -newkey  rsa:4096  \

                -keyout  /tmp/emergency.key  \

                -out  /tmp/emergency.crt  \

                -days  7  -nodes  \

                -subj  "/CN=$DOMAIN"

          BACKUP_CERT="/tmp/emergency.crt"

fi

部署紧急证书

cp  "$BACKUP_CERT"  "/etc/nginx/ssl/$DOMAIN/fullchain.pem"

nginx  -s  reload

发送紧急通知

send_emergency_alert  "$DOMAIN"

通过上述完整的监控预警系统,可以实现:

实时监控:分钟级证书状态检查

智能预警:分级告警,避免告警疲劳

自动修复:自动化续期和部署

可视化:清晰的仪表板和报告

合规审计:完整的证书生命周期记录

通过以上步骤和架构确保在SSL证书过期前及时发现并处理,避免服务中断。