模型部署与工程化
模型训练好只是开始,让它稳定服务于线上请求才是真正的挑战。这篇从 sklearn Pipeline 的最佳实践出发,讲清楚模型序列化、API 封装、以及生产级的监控思路。
为什么需要 Pipeline
新手常见错误:在训练集上 fit Scaler,再在训练/测试集上分别 transform,代码散落各处,上线时忘记某一步。
sklearn Pipeline 把预处理 + 模型打包成一个对象,fit 一次全部搞定,predict 时自动执行完整流程,不可能漏步骤:
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split, cross_val_score
import pandas as pd
import numpy as np
# 示例:信用评分数据
df = pd.DataFrame({
"age": [25, 35, np.nan, 45, 28, 52],
"income": [30000, 80000, 60000, np.nan, 45000, 120000],
"edu": ["高中", "本科", "研究生", "本科", "高中", "研究生"],
"city": ["北京", "上海", "北京", "广州", "上海", "北京"],
"default": [1, 0, 0, 1, 0, 0], # 标签:是否违约
})
X = df.drop("default", axis=1)
y = df["default"]
# 数值特征的处理流程
numeric_features = ["age", "income"]
numeric_transformer = Pipeline([
("imputer", SimpleImputer(strategy="median")), # 缺失值用中位数填充
("scaler", StandardScaler()), # 标准化
])
# 类别特征的处理流程
categorical_features = ["edu", "city"]
categorical_transformer = Pipeline([
("imputer", SimpleImputer(strategy="most_frequent")), # 缺失值用众数
("onehot", OneHotEncoder(handle_unknown="ignore")), # 独热编码
])
# 用 ColumnTransformer 分别处理不同类型的列
preprocessor = ColumnTransformer([
("num", numeric_transformer, numeric_features),
("cat", categorical_transformer, categorical_features),
])
# 最终 Pipeline:预处理 + 模型,一体化
pipeline = Pipeline([
("preprocessor", preprocessor),
("classifier", GradientBoostingClassifier(n_estimators=100, random_state=42)),
])
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
pipeline.fit(X_train, y_train)
# 直接用原始数据预测,Pipeline 自动完成全部预处理
print(pipeline.predict(X_test))
print(pipeline.predict_proba(X_test)[:, 1]) # 违约概率模型序列化
训练好的 Pipeline 需要持久化,上线时直接加载,不用重新训练:
import joblib
# 保存:比 pickle 更高效,支持 numpy 数组
joblib.dump(pipeline, "credit_model_v1.joblib")
# 加载(服务端)
loaded_pipeline = joblib.load("credit_model_v1.joblib")
# 验证加载结果
test_sample = pd.DataFrame([{
"age": 30, "income": 55000, "edu": "本科", "city": "上海"
}])
prob = loaded_pipeline.predict_proba(test_sample)[0, 1]
print(f"违约概率: {prob:.3f}")版本对齐:保存模型时记录 sklearn 版本(
sklearn.__version__)。不同版本间的模型文件可能不兼容,生产环境要固定依赖版本。用 FastAPI 封装成预测服务
# app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
import joblib
import pandas as pd
import logging
logging.basicConfig(level=logging.INFO)
app = FastAPI(title="信用评分 API", version="1.0.0")
# 启动时加载模型(只加载一次)
model = joblib.load("credit_model_v1.joblib")
class PredictRequest(BaseModel):
age: float = Field(..., gt=18, lt=100, description="年龄")
income: float = Field(..., gt=0, description="年收入(元)")
edu: str = Field(..., description="学历:高中/本科/研究生")
city: str = Field(..., description="城市")
class PredictResponse(BaseModel):
default_probability: float
risk_level: str
model_version: str = "v1"
@app.post("/predict", response_model=PredictResponse)
async def predict(req: PredictRequest):
try:
X = pd.DataFrame([req.model_dump()])
prob = model.predict_proba(X)[0, 1]
if prob < 0.3:
risk = "低风险"
elif prob < 0.6:
risk = "中风险"
else:
risk = "高风险"
logging.info(f"预测完成: prob={prob:.3f}, risk={risk}")
return PredictResponse(default_probability=round(prob, 4), risk_level=risk)
except Exception as e:
logging.error(f"预测失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
return {"status": "ok", "model": "credit_model_v1"}启动服务:
pip install fastapi uvicorn
uvicorn app:app --host 0.0.0.0 --port 8000 --reload测试:
curl -X POST http://localhost:8000/predict \
-H "Content-Type: application/json" \
-d '{"age": 30, "income": 55000, "edu": "本科", "city": "上海"}'批量预测与异步处理
对于离线批量预测(如每晚跑一批用户风险评分),不需要 API,直接用 Pipeline 批量处理:
import pandas as pd
import joblib
from pathlib import Path
model = joblib.load("credit_model_v1.joblib")
def batch_predict(input_path: str, output_path: str, batch_size: int = 10000):
"""分批处理大文件,避免内存溢出"""
results = []
for chunk in pd.read_csv(input_path, chunksize=batch_size):
probs = model.predict_proba(chunk)[: , 1]
chunk["default_prob"] = probs
chunk["risk_level"] = pd.cut(
probs, bins=[0, 0.3, 0.6, 1.0],
labels=["低风险", "中风险", "高风险"]
)
results.append(chunk)
pd.concat(results).to_csv(output_path, index=False)
print(f"批量预测完成,结果已写入 {output_path}")生产监控:数据漂移检测
模型上线后效果会随数据分布变化而下降(data drift)。必须持续监控:
from scipy.stats import ks_2samp
import numpy as np
def detect_feature_drift(train_data: pd.Series, current_data: pd.Series,
threshold: float = 0.05) -> dict:
"""用 KS 检验检测特征分布漂移"""
stat, p_value = ks_2samp(train_data.dropna(), current_data.dropna())
drifted = p_value < threshold
return {
"feature": train_data.name,
"ks_stat": round(stat, 4),
"p_value": round(p_value, 4),
"drifted": drifted,
}
# 示例:监控 age 特征是否漂移
train_age = pd.Series(np.random.normal(35, 10, 1000), name="age")
current_age = pd.Series(np.random.normal(40, 12, 500), name="age") # 分布变化了
result = detect_feature_drift(train_age, current_age)
print(result)
# {'feature': 'age', 'ks_stat': 0.xxx, 'p_value': 0.00x, 'drifted': True}监控体系建议:
| 监控项 | 频率 | 告警阈值 |
|---|---|---|
| 输入特征分布(KS 检验) | 每日 | p < 0.05 |
| 预测分数分布 | 每日 | 均值偏移 >10% |
| 有标签时的业务指标(AUC/F1) | 有标签后 | 下降 >3% |
| 空值率异常 | 每次请求 | 超过历史 2 倍 |
MLflow 实验跟踪
import mlflow
import mlflow.sklearn
mlflow.set_experiment("credit_scoring")
with mlflow.start_run(run_name="gbdt_v1"):
pipeline.fit(X_train, y_train)
# 记录参数
mlflow.log_param("n_estimators", 100)
mlflow.log_param("learning_rate", 0.1)
# 记录指标
from sklearn.metrics import roc_auc_score, f1_score
y_pred = pipeline.predict(X_test)
y_prob = pipeline.predict_proba(X_test)[:, 1]
mlflow.log_metric("test_auc", roc_auc_score(y_test, y_prob))
mlflow.log_metric("test_f1", f1_score(y_test, y_pred))
# 保存模型到 MLflow
mlflow.sklearn.log_model(pipeline, "model")
print(f"实验记录完成,查看: mlflow ui")一句话小结
工程化的核心是Pipeline 化(不会漏步骤)+ 版本化(模型/代码/数据可追溯)+ 监控化(漂移及时告警)。从 joblib.dump 到 FastAPI 到 MLflow,是模型从实验室走向生产的完整路径。
最后更新于