用户要配置自动化监控:证书到期提前预警,就得构建完整的SSL证书监控预警系统,确保在证书过期前及时发现并处理。下面就是具体步骤和架构。
一、核心监控架构设计
1. 分层监控体系
text
┌─────────────────────────────────────┐
│ 监控可视化层 │
┌─────────┐ ┌─────────┐ │
│ │ Grafana │ │ Kibana │ │
│ └─────────┘ └─────────┘ │
├─────────────────────────────────────┤
│ 监控分析层 │
│ ┌─────────┐ ┌─────────┐ │
│ │Prometheus│ │ ELK Stack│ │
│ └─────────┘ └─────────┘ │
├─────────────────────────────────────┤
│ 数据采集层 │
│ ┌─────────┐ ┌─────────┐ │
│ │cert-exporter│ │黑盒监控│ │
│ └─────────┘ └─────────┘ │
├─────────────────────────────────────┤
│ 证书源层 │
│ ┌─────────┐ ┌─────────┐ │
│ │Let'sEncrypt│ │ 自签CA │ │
│ └─────────┘ └─────────┘ │
└─────────────────────────────────────┘
二、Prometheus + cert-exporter方案
1. 部署cert-exporter
yaml
cert-exporter.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: cert-exporter
namespace: monitoring
labels:
app: cert-exporter
spec:
replicas: 2
selector:
matchLabels:
app: cert-exporter
template:
metadata:
labels:
app: cert-exporter
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9117"
spec:
containers:
- name: cert-exporter
image: enix/cert-exporter:latest
ports:
- containerPort: 9117
name: metrics
env:
- name: CERT_FILES
value: "/certs/*.crt,/certs/*.pem"
- name: CERT_DIRECTORIES
value: "/etc/ssl/certs"
- name: WATCH_NAMESPACES
value: "default,production"
volumeMounts:
- name: certs
mountPath: /certs
readOnly: true
- name: ssl-certs
mountPath: /etc/ssl/certs
readOnly: true
resources:
requests:
memory: "64Mi"
cpu: "100m"
limits:
memory: "128Mi"
cpu: "200m"
volumes:
- name: certs
hostPath:
path: /etc/ssl/certs
type: Directory
- name: ssl-certs
hostPath:
path: /etc/ssl
type: Directory
---
apiVersion: v1
kind: Service
metadata:
name: cert-exporter
namespace: monitoring
labels:
app: cert-exporter
spec:
ports:
- port: 9117
targetPort: 9117
name: metrics
selector:
app: cert-exporter
type: ClusterIP
2. Prometheus配置
yaml
prometheus-cert-rules.yaml
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: certificate-alerts
namespace: monitoring
spec:
groups:
- name: certificate.rules
interval: 5m
rules:
- record: certificate_expiry_days
expr: |
time() - ssl_certificate_not_after{job="cert-exporter"}
告警规则
- alert: CertificateExpiresSoon
expr: |
ssl_certificate_not_after{job="cert-exporter"} - time() < 86400 * 30
for: 5m
labels:
severity: warning
category: certificate
annotations:
summary: "证书即将过期 ({{ $labels.instance }})"
description: |
证书 {{ $labels.subject }} 将在 {{ $value | humanizeDuration }} 后过期。
颁发者: {{ $labels.issuer }}
域名: {{ $labels.dns_names }}
runbook_url: "https://wiki.example.com/certificate-renewal"
- alert: CertificateExpiresCritical
expr: |
ssl_certificate_not_after{job="cert-exporter"} - time() < 86400 * 7
for: 2m
labels:
severity: critical
category: certificate
annotations:
summary: "证书即将过期! ({{ $labels.instance }})"
description: |
紧急!证书 {{ $labels.subject }} 将在 {{ $value | humanizeDuration }} 后过期。
slack_channel: "#alerts-critical"
- alert: CertificateExpired
expr: |
ssl_certificate_not_after{job="cert-exporter"} - time() <= 0
for: 1m
labels:
severity: critical
category: certificate
annotations:
summary: "证书已过期! ({{ $labels.instance }})"
description: |
证书 {{ $labels.subject }} 已过期!
必须立即处理!
pagerduty_key: "certificate-expired"
- alert: CertificateChainIncomplete
expr: |
ssl_certificate_chain_info{job="cert-exporter"} == 0
for: 10m
labels:
severity: warning
category: certificate
annotations:
summary: "证书链不完整 ({{ $labels.instance }})"
- alert: WeakCertificateAlgorithm
expr: |
ssl_certificate_info{job="cert-exporter",signature_algorithm=~"sha1.*|md5.*"}
for: 5m
labels:
severity: warning
annotations:
summary: "使用弱签名算法的证书 ({{ $labels.instance }})"
description: |
证书 {{ $labels.subject }} 使用了不安全的签名算法 {{ $labels.signature_algorithm }}
三、多维度监控脚本
1. 综合监控脚本
python
!/usr/bin/env python3
证书综合监控脚本
支持多种证书源:文件系统、Kubernetes Secrets、Let's Encrypt、Vault等
import os
import sys
import json
import ssl
import socket
import smtplib
import logging
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
import yaml
from kubernetes import client, config
import requests
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/cert-monitor.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
@dataclass
class CertificateInfo:
"""证书信息"""
subject: str
issuer: str
sans: List[str]
not_before: datetime
not_after: datetime
key_size: int
signature_algorithm: str
source: str
location: str
days_remaining: int
class CertificateMonitor:
def __init__(self, config_path: str = '/etc/cert-monitor/config.yaml'):
self.config = self.load_config(config_path)
self.certificates: List[CertificateInfo] = []
self.alerts: List[Dict] = []
def load_config(self, config_path: str) -> Dict:
加载配置文件
with open(config_path, 'r') as f:
return yaml.safe_load(f)
def discover_certificates(self):
发现所有需要监控的证书
logger.info("开始证书发现...")
1. 文件系统证书
for path in self.config.get('filesystem_paths', []):
self.scan_filesystem(Path(path))
2. Kubernetes Secrets
if self.config.get('kubernetes', {}).get('enabled', False):
self.scan_kubernetes_secrets()
3. Let's Encrypt certificates
if self.config.get('letsencrypt', {}).get('enabled', False):
self.scan_letsencrypt()
4. HTTP/HTTPS endpoints
for endpoint in self.config.get('endpoints', []):
self.check_endpoint_certificate(endpoint)
logger.info(f"发现 {len(self.certificates)} 个证书")
def scan_filesystem(self, base_path: Path):
扫描文件系统中的证书
try:
for cert_file in base_path.rglob('*.pem'):
self.process_cert_file(cert_file, 'filesystem')
for cert_file in base_path.rglob('*.crt'):
self.process_cert_file(cert_file, 'filesystem')
except Exception as e:
logger.error(f"扫描文件系统失败 {base_path}: {e}")
def scan_kubernetes_secrets(self):
扫描Kubernetes Secrets中的证书
try:
加载kubeconfig
config.load_incluster_config()
v1 = client.CoreV1Api()
获取所有命名空间的TLS类型Secret
namespaces = self.config['kubernetes'].get('namespaces', ['default'])
for namespace in namespaces:
secrets = v1.list_namespaced_secret(
namespace=namespace,
label_selector=self.config['kubernetes'].get('label_selector', '')
)
for secret in secrets.items:
if secret.type == 'kubernetes.io/tls':
self.process_kubernetes_secret(secret, namespace)
except Exception as e:
logger.error(f"扫描Kubernetes证书失败: {e}")
def process_cert_file(self, cert_path: Path, source: str):
处理单个证书文件
try:
with open(cert_path, 'rb') as f:
cert_data = f.read()
解析证书
cert = ssl.PEM_cert_to_DER_cert(cert_data.decode('utf-8'))
x509 = ssl.DER_cert_to_PEM_cert(cert)
创建SSL上下文解析证书
context = ssl.create_default_context()
cert_info = ssl._ssl._test_decode_cert(cert_path)
cert_info = CertificateInfo(
subject=cert_info['subject'],
issuer=cert_info['issuer'],
sans=cert_info.get('subjectAltName', []),
not_before=datetime.strptime(cert_info['notBefore'], '%b %d %H:%M:%S %Y %Z'),
not_after=datetime.strptime(cert_info['notAfter'], '%b %d %H:%M:%S %Y %Z'),
key_size=cert_info.get('keySize', 0),
signature_algorithm=cert_info['signatureAlgorithm'],
source=source,
location=str(cert_path),
days_remaining=(datetime.strptime(cert_info['notAfter'], '%b %d %H:%M:%S %Y %Z') - datetime.now()).days
)
self.certificates.append(cert_info)
except Exception as e:
logger.warning(f"无法解析证书文件 {cert_path}: {e}")
def check_endpoint_certificate(self, endpoint: Dict):
检查HTTP/HTTPS端点的证书
try:
hostname = endpoint['hostname']
port = endpoint.get('port', 443)
context = ssl.create_default_context()
with socket.create_connection((hostname, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercert()
cert_info = CertificateInfo(
subject=str(cert.get('subject', [])),
issuer=str(cert.get('issuer', [])),
sans=self.extract_sans(cert),
not_before=self.parse_cert_time(cert['notBefore']),
not_after=self.parse_cert_time(cert['notAfter']),
key_size=self.get_key_size(cert),
signature_algorithm=cert.get('signatureAlgorithm', ''),
source='endpoint',
location=f"{hostname}:{port}",
days_remaining=(self.parse_cert_time(cert['notAfter']) - datetime.now()).days
)
self.certificates.append(cert_info)
except Exception as e:
logger.error(f"检查端点证书失败 {endpoint.get('hostname')}: {e}")
def evaluate_certificates(self):
评估所有证书状态
logger.info("评估证书状态...")
for cert in self.certificates:
检查过期时间
if cert.days_remaining < self.config['alerts']['critical_days']:
self.create_alert(
cert,
'critical',
f"证书将在 {cert.days_remaining} 天后过期"
)
elif cert.days_remaining < self.config['alerts']['warning_days']:
self.create_alert(
cert,
'warning',
f"证书将在 {cert.days_remaining} 天后过期"
)
检查密钥长度
if cert.key_size < 2048:
self.create_alert(
cert,
'warning',
f"证书密钥长度不足 ({cert.key_size} < 2048)"
)
检查签名算法
if 'sha1' in cert.signature_algorithm.lower():
self.create_alert(
cert,
'critical',
"使用不安全的SHA1签名算法"
)
def create_alert(self, cert: CertificateInfo, severity: str, message: str):
创建告警
alert = {
'timestamp': datetime.now().isoformat(),
'severity': severity,
'certificate': cert.subject,
'location': cert.location,
'source': cert.source,
'expires_in_days': cert.days_remaining,
'expiry_date': cert.not_after.isoformat(),
'message': message,
'sans': cert.sans,
'issuer': cert.issuer
}
self.alerts.append(alert)
logger.warning(f"{severity.upper()}: {cert.subject} - {message}")
def send_alerts(self):
发送告警通知
if not self.alerts:
logger.info("没有需要发送的告警")
return
分组告警
critical_alerts = [a for a in self.alerts if a['severity'] == 'critical']
warning_alerts = [a for a in self.alerts if a['severity'] == 'warning']
发送邮件
if self.config['notifications'].get('email', {}).get('enabled', False):
self.send_email_alerts(critical_alerts, warning_alerts)
发送Slack通知
if self.config['notifications'].get('slack', {}).get('enabled', False):
self.send_slack_alerts(critical_alerts, warning_alerts)
发送Webhook
if self.config['notifications'].get('webhook', {}).get('enabled', False):
self.send_webhook_alerts()
def send_email_alerts(self, critical_alerts: List, warning_alerts: List):
发送邮件告警
try:
smtp_config = self.config['notifications']['email']
msg = MIMEMultipart('alternative')
msg['Subject'] = f"证书监控告警 - {len(critical_alerts)}个严重, {len(warning_alerts)}个警告"
msg['From'] = smtp_config['from']
msg['To'] = ', '.join(smtp_config['to'])
创建HTML内容
html = self.create_html_report(critical_alerts, warning_alerts)
msg.attach(MIMEText(html, 'html'))
发送邮件
with smtplib.SMTP(smtp_config['smtp_server'], smtp_config['smtp_port']) as server:
if smtp_config.get('tls', False):
server.starttls()
if smtp_config.get('username'):
server.login(smtp_config['username'], smtp_config['password'])
server.send_message(msg)
logger.info("邮件告警已发送")
except Exception as e:
logger.error(f"发送邮件告警失败: {e}")
def create_html_report(self, critical_alerts: List, warning_alerts: List) -> str:
创建HTML报告
html = f"""
<html>
<head>
<style>
body {{ font-family: Arial, sans-serif; }}
.critical {{ color: #d9534f; }}
.warning {{ color: #f0ad4e; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
.badge {{ padding: 3px 8px; border-radius: 3px; font-size: 12px; }}
.badge-critical {{ background-color: #d9534f; color: white; }}
.badge-warning {{ background-color: #f0ad4e; color: white; }}
</style>
</head>
<body>
<h2>证书监控报告</h2>
<p>生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
<h3>摘要</h3>
<p>
<span class="badge badge-critical">严重告警: {len(critical_alerts)}</span>
<span class="badge badge-warning">警告告警: {len(warning_alerts)}</span>
</p>
"""
if critical_alerts:
html += """
<h3>严重告警</h3>
<table>
<tr>
<th>证书</th>
<th>位置</th>
<th>过期时间</th>
<th>剩余天数</th>
<th>问题描述</th>
</tr>
"""
for alert in critical_alerts:
html += f"""
<tr class="critical">
<td>{alert['certificate']}</td>
<td>{alert['location']}</td>
<td>{alert['expiry_date']}</td>
<td>{alert['expires_in_days']} 天</td>
<td>{alert['message']}</td>
</tr>
"""
html += "</table>"
if warning_alerts:
html += """
<h3>⚠️ 警告告警</h3>
<table>
<tr>
<th>证书</th>
<th>位置</th>
<th>过期时间</th>
<th>剩余天数</th>
<th>问题描述</th>
</tr>
"""
for alert in warning_alerts:
html += f"""
<tr class="warning">
<td>{alert['certificate']}</td>
<td>{alert['location']}</td>
<td>{alert['expiry_date']}</td>
<td>{alert['expires_in_days']} 天</td>
<td>{alert['message']}</td>
</tr>
"""
html += "</table>"
html += """
</body>
</html>
""
return html
def generate_metrics(self):
生成Prometheus格式的指标
metrics = []
for cert in self.certificates:
labels = {
'subject': cert.subject,
'issuer': cert.issuer,
'source': cert.source,
'location': cert.location
}
过期时间戳
metrics.append(
f'ssl_certificate_expiry_timestamp{{{self.format_labels(labels)}}} '
f'{int(cert.not_after.timestamp())}'
)
剩余天数
metrics.append(
f'ssl_certificate_days_remaining{{{self.format_labels(labels)}}} '
f'{cert.days_remaining}'
)
密钥长度
metrics.append(
f'ssl_certificate_key_size{{{self.format_labels(labels)}}} '
f'{cert.key_size}'
)
写入指标文件
metrics_file = self.config.get('metrics_file', '/var/lib/node-exporter/cert-metrics.prom')
with open(metrics_file, 'w') as f:
f.write('\n'.join(metrics))
logger.info(f"指标已写入 {metrics_file}")
def format_labels(self, labels: Dict) -> str:
格式化Prometheus标签
return ', '.join([f'{k}="{v}"' for k, v in labels.items()])
def run(self):
运行监控
logger.info("启动证书监控...")
self.discover_certificates()
self.evaluate_certificates()
self.generate_metrics()
if self.alerts:
self.send_alerts()
logger.info("证书监控完成")
if __name__ == "__main__":
monitor = CertificateMonitor()
monitor.run()
2. 配置文件
yaml
/etc/cert-monitor/config.yaml
证书监控配置文件
监控间隔(秒)
check_interval: 3600
证书发现配置
discovery:
filesystem_paths:
- /etc/ssl/certs
- /etc/letsencrypt/live
- /docker/certs
- /usr/local/share/ca-certificates
kubernetes:
enabled: true
namespaces:
- default
- production
- staging
label_selector: ""
letsencrypt:
enabled: true
path: /etc/letsencrypt
endpoints:
- hostname: "example.com"
port: 443
- hostname: "api.example.com"
port: 443
- hostname: "www.example.com"
port: 443
告警阈值
alerts:
warning_days: 30 # 提前30天警告
critical_days: 7 # 提前7天严重告警
expired_critical: true
通知配置
notifications:
email:
enabled: true
smtp_server: "smtp.example.com"
smtp_port: 587
tls: true
username: "alert@example.com"
password: "your-password"
from: "cert-monitor@example.com"
to:
- "admin@example.com"
- "devops@example.com"
slack:
enabled: true
webhook_url: "https://hooks.slack.com/services/XXX/XXX/XXX"
channel: "#alerts"
username: "证书监控"
icon_emoji: ":shield:"
webhook:
enabled: false
url: "https://alert-manager.example.com/api/alerts"
timeout: 10
pagerduty:
enabled: false
integration_key: "your-pagerduty-key"
telegram:
enabled: false
bot_token: "your-bot-token"
chat_id: "your-chat-id"
指标导出
metrics:
enabled: true
file: "/var/lib/node-exporter/cert-metrics.prom"
port: 9100
日志配置
logging:
level: "INFO"
file: "/var/log/cert-monitor.log"
max_size_mb: 100
backup_count: 10
四、Blackbox黑盒监控
1. Blackbox Exporter配置
yaml
blackbox-certificate-check.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: blackbox-exporter-cert-config
namespace: monitoring
data:
blackbox.yaml: |
modules:
http_2xx:
prober: http
timeout: 10s
http:
valid_http_versions: ["HTTP/1.1", "HTTP/2.0"]
valid_status_codes: [200]
no_follow_redirects: false
fail_if_ssl: false
fail_if_not_ssl: true
tls_config:
insecure_skip_verify: false
server_name: ""
preferred_ip_protocol: "ip4"
ip_protocol_fallback: false
ssl_certificate:
prober: http
timeout: 15s
http:
fail_if_not_ssl: true
tls_config:
insecure_skip_verify: false
server_name: ""
preferred_ip_protocol: "ip4"
ip_protocol_fallback: false
ssl_certificate:
days_until_expiry_warning: 30
days_until_expiry_critical: 7
check_chain: true
tcp_ssl_certificate:
prober: tcp
timeout: 15s
tcp:
tls: true
tls_config:
insecure_skip_verify: false
server_name: ""
query_response:
- expect: "^.*"
ssl_certificate:
days_until_expiry_warning: 30
days_until_expiry_critical: 7
check_chain: true
2. SSL证书检查ServiceMonitor
yaml
ssl-service-monitor.yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: ssl-certificate-monitor
namespace: monitoring
labels:
team: platform
spec:
selector:
matchLabels:
app: ssl-endpoints
endpoints:
- port: https
interval: 5m
path: /probe
params:
module: [ssl_certificate]
target:
- example.com:443
- api.example.com:443
- www.example.com:443
relabelings:
- sourceLabels: [__param_target]
targetLabel: instance
- sourceLabels: [__param_module]
targetLabel: module
- targetLabel: __address__
replacement: blackbox-exporter.monitoring.svc.cluster.com:9115
五、Alertmanager告警路由
1. Alertmanager配置
yaml
alertmanager-certificate.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: alertmanager-config
namespace: monitoring
data:
alertmanager.yml: |
global:
smtp_smarthost: 'smtp.example.com:587'
smtp_from: 'alerts@example.com'
smtp_auth_username: 'alert@example.com'
smtp_auth_password: 'password'
slack_api_url: 'https://hooks.slack.com/services/XXX/XXX/XXX'
route:
group_by: ['alertname', 'cluster', 'service']
group_wait: 30s
group_interval: 5m
repeat_interval: 12h
receiver: 'default-receiver'
routes:
- match:
severity: critical
category: certificate
receiver: 'certificate-critical'
group_wait: 10s
repeat_interval: 5m
continue: true
- match:
severity: warning
category: certificate
receiver: 'certificate-warning'
group_wait: 30s
repeat_interval: 1h
continue: true
receivers:
- name: 'default-receiver'
email_configs:
- to: 'alerts@example.com'
- name: 'certificate-critical'
email_configs:
- to: 'cert-admin@example.com'
headers:
subject: '[CRITICAL] 证书告警: {{ .GroupLabels.alertname }}'
slack_configs:
- channel: 'certificate-alerts'
title: '证书紧急告警'
text: |
{{ range .Alerts }}
告警: {{ .Annotations.summary }}
描述: {{ .Annotations.description }}
证书:*{{ .Labels.subject }}
过期时间:{{ .Labels.expiry_date }}
剩余天数: {{ .Labels.days_remaining }}
{{ end }}
pagerduty_configs:
- service_key: 'your-pagerduty-key'
description: '证书过期告警'
- name: 'certificate-warning'
email_configs:
- to: 'cert-notify@example.com'
headers:
subject: '[WARNING] 证书提醒: {{ .GroupLabels.alertname }}'
slack_configs:
- channel: 'certificate-notifications'
title: '证书提醒'
text: |
{{ range .Alerts }}
提醒: {{ .Annotations.summary }}
证书: {{ .Labels.subject }}
剩余天数: {{ .Labels.days_remaining }}
{{ end }}
webhook_configs:
- url: 'https://api.example.com/certificate-alerts'
send_resolved: true
2. 告警模板
yaml
alert-templates.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: alertmanager-templates
namespace: monitoring
data:
certificate.tmpl: |
{{ define "certificate.email.subject" }}
{{- if eq .Status "firing" -}}
[{{ .Status | toUpper }}] {{ .CommonLabels.severity | toUpper }} 证书告警: {{ .CommonLabels.alertname }}
{{- else -}}
[RESOLVED] 证书告警恢复: {{ .CommonLabels.alertname }}
{{- end }}
{{ end }}
{{ define "certificate.email.html" }}
<!DOCTYPE html>
<html>
<head>
<style>
body { font-family: Arial, sans-serif; }
.critical { color: #d9534f; }
.warning { color: #f0ad4e; }
.resolved { color: #5cb85c; }
table { border-collapse: collapse; width: 100%; }
th, td { border: 1px solid #ddd; padding: 8px; }
th { background-color: #f2f2f2; }
</style>
</head>
<body>
<h2 class="{{ .CommonLabels.severity }}">
{{ if eq .Status "firing" }}
证书告警
{{ else }}
证书告警恢复
{{ end }}
</h2>
<h3>告警详情</h3>
<table>
<tr><th>告警名称</th><td>{{ .CommonLabels.alertname }}</td></tr>
<tr><th>严重程度</th><td>{{ .CommonLabels.severity }}</td></tr>
<tr><th>证书主题</th><td>{{ .CommonLabels.subject }}</td></tr>
<tr><th>颁发者</th><td>{{ .CommonLabels.issuer }}</td></tr>
<tr><th>过期时间</th><td>{{ .CommonLabels.expiry_date }}</td></tr>
<tr><th>剩余天数</th><td>{{ .CommonLabels.days_remaining }}</td></tr>
<tr><th>发生时间</th><td>{{ .StartsAt }}</td></tr>
{{ if eq .Status "resolved" }}
<tr><th>恢复时间</th><td>{{ .EndsAt }}</td></tr>
{{ end }}
</table>
{{ if .Annotations.runbook_url }}
<h3>处理指南</h3>
<p>请参考: <a href="{{ .Annotations.runbook_url }}">{{ .Annotations.runbook_url }}</a></p>
{{ end }}
<h3>受影响域名</h3>
<ul>
{{ range split .CommonLabels.dns_names "," }}
<li>{{ . }}</li>
{{ end }}
</ul>
</body>
</html>
{{ end }}
六、Grafana监控面板
1. Grafana仪表板JSON
json
{
"dashboard": {
"title": "SSL/TLS证书监控",
"description": "实时监控所有证书状态和过期时间",
"panels": [
{
"title": "证书过期时间分布",
"type": "stat",
"targets": [{
"expr": "count by (days_remaining_range) (label_replace(label_replace(ssl_certificate_days_remaining > 0, \"days_remaining_range\", \"90+\", \"\", \".*\") or label_replace(ssl_certificate_days_remaining <= 90, \"days_remaining_range\", \"60-90\", \"\", \".*\") or label_replace(ssl_certificate_days_remaining <= 60, \"days_remaining_range\", \"30-60\", \"\", \".*\") or label_replace(ssl_certificate_days_remaining <= 30, \"days_remaining_range\", \"7-30\", \"\", \".*\") or label_replace(ssl_certificate_days_remaining <= 7, \"days_remaining_range\", \"<7\", \"\", \".*\") or label_replace(ssl_certificate_days_remaining <= 0, \"days_remaining_range\", \"已过期\", \"\", \".*\"))",
"legendFormat": "{{days_remaining_range}}"
}],
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"thresholds": {
"steps": [
{"color": "green", "value": null},
{"color": "#EAB839", "value": 30},
{"color": "red", "value": 7}
]
}
}
}
},
{
"title": "按来源统计证书数量",
"type": "piechart",
"targets": [{
"expr": "count by (source) (ssl_certificate_days_remaining)",
"legendFormat": "{{source}}"
}]
},
{
"title": "证书过期时间线",
"type": "table",
"targets": [{
"expr": "ssl_certificate_expiry_timestamp",
"instant": true,
"format": "table",
"legendFormat": "{{subject}}"
}],
"transformations": [{
"id": "calculateField",
"options": {
"mode": "reduceRow",
"reduce": {
"reducer": "lastNotNull"
}
}
}]
}
]
}
}
2. Grafana告警规则
yaml
# grafana-alert-rules.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: grafana-alert-rules
namespace: monitoring
data:
certificate-alerts.yml: |
groups:
- name: certificate.alerts
interval: 1m
rules:
- alert: CertificateExpiryWarning
expr: ssl_certificate_days_remaining < 30 and ssl_certificate_days_remaining >= 7
for: 5m
annotations:
summary: "证书即将过期警告"
description: "证书 {{ $labels.subject }} 将在 {{ $value }} 天后过期"
labels:
severity: warning
- alert: CertificateExpiryCritical
expr: ssl_certificate_days_remaining < 7 and ssl_certificate_days_remaining > 0
for: 2m
annotations:
summary: "证书即将过期严重警告"
description: "证书 {{ $labels.subject }} 将在 {{ $value }} 天后过期,请立即处理!"
labels:
severity: critical
- alert: CertificateExpired
expr: ssl_certificate_days_remaining <= 0
for: 1m
annotations:
summary: "证书已过期"
description: "证书 {{ $labels.subject }} 已过期!服务可能中断。"
labels:
severity: emergency
七、自动化响应与修复
1. 自动化续期机器人
python
!/usr/bin/env python3
"""
证书自动续期机器人
"""
import asyncio
import aiohttp
from datetime import datetime, timedelta
import subprocess
import json
import logging
from typing import List, Dict
class CertificateAutoRenewer:
def __init__(self):
self.config = self.load_config()
self.session = None
async def check_and_renew(self):
"""检查并续期证书"""
logger.info("开始证书续期检查")
获取需要续期的证书列表
expiring_certs = await self.get_expiring_certificates()
renew_tasks = []
for cert in expiring_certs:
if cert['days_remaining'] <= self.config['auto_renew_threshold']:
task = asyncio.create_task(self.renew_certificate(cert))
renew_tasks.append(task)
并发续期
if renew_tasks:
results = await asyncio.gather(*renew_tasks, return_exceptions=True)
self.process_renewal_results(results)
async def renew_certificate(self, cert: Dict):
续期单个证书
try:
domain = cert['domain']
logger.info(f"开始续期证书: {domain}")
使用certbot续期
cmd = [
'certbot', 'renew',
'--cert-name', domain,
'--non-interactive',
'--agree-tos',
'--force-renewal',
'--preferred-challenges', 'dns'
]
执行续期
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
logger.info(f"证书续期成功: {domain}")
部署新证书
await self.deploy_certificate(domain)
发送成功通知
await self.send_renewal_notification(cert, success=True)
return {'domain': domain, 'success': True}
else:
logger.error(f"证书续期失败: {domain} - {stderr.decode()}")
发送失败通知
await self.send_renewal_notification(
cert,
success=False,
error=stderr.decode()
)
return {'domain': domain, 'success': False, 'error': stderr.decode()}
except Exception as e:
logger.error(f"续期证书异常: {domain} - {e}")
return {'domain': domain, 'success': False, 'error': str(e)}
async def deploy_certificate(self, domain: str):
部署证书到各个服务
deployment_tasks = []
部署到Nginx
deployment_tasks.append(self.deploy_to_nginx(domain))
部署到HAProxy
deployment_tasks.append(self.deploy_to_haproxy(domain))
部署到Kubernetes
if self.config.get('kubernetes', {}).get('enabled', False):
deployment_tasks.append(self.deploy_to_kubernetes(domain))
并发部署
await asyncio.gather(*deployment_tasks)
async def deploy_to_nginx(self, domain: str):
部署证书到Nginx
try:
复制证书文件
cert_src = f"/etc/letsencrypt/live/{domain}/fullchain.pem"
cert_dest = f"/etc/nginx/ssl/{domain}/fullchain.pem"
subprocess.run(['cp', cert_src, cert_dest], check=True)
重载Nginx
subprocess.run(['nginx', '-s', 'reload'], check=True)
logger.info(f"证书已部署到Nginx: {domain}")
except subprocess.CalledProcessError as e:
logger.error(f"部署到Nginx失败: {domain} - {e}")
if __name__ == "__main__":
配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
运行续期机器人
renewer = CertificateAutoRenewer()
asyncio.run(renewer.check_and_renew())
八、集成第三方监控平台
1. Datadog集成
yaml
datadog-cert-check.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: datadog-cert-check
spec:
template:
spec:
containers:
- name: datadog-cert-check
image: datadog/agent:latest
env:
- name: DD_API_KEY
value: "your-api-key"
- name: DD_SITE
value: "datadoghq.com"
command:
- /bin/bash
- -c
- |
创建自定义检查
cat > /etc/datadog-agent/conf.d/cert_check.d/conf.yaml << EOF
instances:
- name: ssl_certificates
cert_paths:
- /etc/ssl/certs
- /etc/letsencrypt/live
days_warning: 30
days_critical: 7
check_certificate_chain: true
EOF
启动Agent
/init
2. New Relic集成
python
newrelic-cert-monitor.py
import newrelic.agent
from datetime import datetime
import ssl
newrelic.agent.initialize('/etc/newrelic/newrelic.ini')
@newrelic.agent.background_task()
def check_certificates():
"""New Relic自定义指标"""
certificates = discover_certificates()
for cert in certificates:
发送自定义指标
newrelic.agent.record_custom_metric(
f"Certificate/ExpiryDays/{cert.domain}",
cert.days_remaining
)
发送事件
if cert.days_remaining < 30:
newrelic.agent.record_custom_event("CertificateWarning", {
"domain": cert.domain,
"days_remaining": cert.days_remaining,
"expiry_date": cert.not_after.isoformat(),
"issuer": cert.issuer
})
最佳实践总结
1. 监控策略分层
text
第1层: 实时监控 (Prometheus + Alertmanager)
第2层: 定期扫描 (Python监控脚本)
第3层: 黑盒检查 (Blackbox Exporter)
第4层: 人工审核 (每月报告)
2. 告警升级策略
yaml
升级策略:
- 7天前: 邮件通知 + Slack警告
- 3天前: Slack紧急通知 + 短信
- 1天前: 电话通知 + 自动创建工单
- 已过期: 自动切换备用证书 + 紧急会议
3. 容灾预案
bash
紧急证书切换脚本
!/bin/bash
emergency-cert-switch.sh
DOMAIN=$1
BACKUP_CERT="/backup/certs/$DOMAIN/fullchain.pem"
if [[ ! -f "$BACKUP_CERT" ]]; then
生成自签名紧急证书
openssl req -x509 -newkey rsa:4096 \
-keyout /tmp/emergency.key \
-out /tmp/emergency.crt \
-days 7 -nodes \
-subj "/CN=$DOMAIN"
BACKUP_CERT="/tmp/emergency.crt"
fi
部署紧急证书
cp "$BACKUP_CERT" "/etc/nginx/ssl/$DOMAIN/fullchain.pem"
nginx -s reload
发送紧急通知
send_emergency_alert "$DOMAIN"
通过上述完整的监控预警系统,可以实现:
实时监控:分钟级证书状态检查
智能预警:分级告警,避免告警疲劳
自动修复:自动化续期和部署
可视化:清晰的仪表板和报告
合规审计:完整的证书生命周期记录
通过以上步骤和架构确保在SSL证书过期前及时发现并处理,避免服务中断。