跳至内容
模型部署与工程化

模型部署与工程化

模型训练好只是开始,让它稳定服务于线上请求才是真正的挑战。这篇从 sklearn Pipeline 的最佳实践出发,讲清楚模型序列化、API 封装、以及生产级的监控思路。

为什么需要 Pipeline

新手常见错误:在训练集上 fit Scaler,再在训练/测试集上分别 transform,代码散落各处,上线时忘记某一步。

sklearn Pipeline 把预处理 + 模型打包成一个对象,fit 一次全部搞定,predict 时自动执行完整流程,不可能漏步骤:

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split, cross_val_score
import pandas as pd
import numpy as np

# 示例:信用评分数据
df = pd.DataFrame({
    "age":     [25, 35, np.nan, 45, 28, 52],
    "income":  [30000, 80000, 60000, np.nan, 45000, 120000],
    "edu":     ["高中", "本科", "研究生", "本科", "高中", "研究生"],
    "city":    ["北京", "上海", "北京", "广州", "上海", "北京"],
    "default": [1, 0, 0, 1, 0, 0],  # 标签:是否违约
})
X = df.drop("default", axis=1)
y = df["default"]

# 数值特征的处理流程
numeric_features   = ["age", "income"]
numeric_transformer = Pipeline([
    ("imputer", SimpleImputer(strategy="median")),  # 缺失值用中位数填充
    ("scaler",  StandardScaler()),                  # 标准化
])

# 类别特征的处理流程
categorical_features    = ["edu", "city"]
categorical_transformer  = Pipeline([
    ("imputer", SimpleImputer(strategy="most_frequent")),  # 缺失值用众数
    ("onehot",  OneHotEncoder(handle_unknown="ignore")),   # 独热编码
])

# 用 ColumnTransformer 分别处理不同类型的列
preprocessor = ColumnTransformer([
    ("num", numeric_transformer,  numeric_features),
    ("cat", categorical_transformer, categorical_features),
])

# 最终 Pipeline:预处理 + 模型,一体化
pipeline = Pipeline([
    ("preprocessor", preprocessor),
    ("classifier",   GradientBoostingClassifier(n_estimators=100, random_state=42)),
])

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
pipeline.fit(X_train, y_train)

# 直接用原始数据预测,Pipeline 自动完成全部预处理
print(pipeline.predict(X_test))
print(pipeline.predict_proba(X_test)[:, 1])  # 违约概率

模型序列化

训练好的 Pipeline 需要持久化,上线时直接加载,不用重新训练:

import joblib

# 保存:比 pickle 更高效,支持 numpy 数组
joblib.dump(pipeline, "credit_model_v1.joblib")

# 加载(服务端)
loaded_pipeline = joblib.load("credit_model_v1.joblib")

# 验证加载结果
test_sample = pd.DataFrame([{
    "age": 30, "income": 55000, "edu": "本科", "city": "上海"
}])
prob = loaded_pipeline.predict_proba(test_sample)[0, 1]
print(f"违约概率: {prob:.3f}")
版本对齐:保存模型时记录 sklearn 版本(sklearn.__version__)。不同版本间的模型文件可能不兼容,生产环境要固定依赖版本。

用 FastAPI 封装成预测服务

# app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
import joblib
import pandas as pd
import logging

logging.basicConfig(level=logging.INFO)
app = FastAPI(title="信用评分 API", version="1.0.0")

# 启动时加载模型(只加载一次)
model = joblib.load("credit_model_v1.joblib")


class PredictRequest(BaseModel):
    age:    float = Field(..., gt=18, lt=100, description="年龄")
    income: float = Field(..., gt=0, description="年收入(元)")
    edu:    str   = Field(..., description="学历:高中/本科/研究生")
    city:   str   = Field(..., description="城市")


class PredictResponse(BaseModel):
    default_probability: float
    risk_level:          str
    model_version:       str = "v1"


@app.post("/predict", response_model=PredictResponse)
async def predict(req: PredictRequest):
    try:
        X = pd.DataFrame([req.model_dump()])
        prob = model.predict_proba(X)[0, 1]

        if prob < 0.3:
            risk = "低风险"
        elif prob < 0.6:
            risk = "中风险"
        else:
            risk = "高风险"

        logging.info(f"预测完成: prob={prob:.3f}, risk={risk}")
        return PredictResponse(default_probability=round(prob, 4), risk_level=risk)

    except Exception as e:
        logging.error(f"预测失败: {e}")
        raise HTTPException(status_code=500, detail=str(e))


@app.get("/health")
async def health():
    return {"status": "ok", "model": "credit_model_v1"}

启动服务:

pip install fastapi uvicorn
uvicorn app:app --host 0.0.0.0 --port 8000 --reload

测试:

curl -X POST http://localhost:8000/predict \
  -H "Content-Type: application/json" \
  -d '{"age": 30, "income": 55000, "edu": "本科", "city": "上海"}'

批量预测与异步处理

对于离线批量预测(如每晚跑一批用户风险评分),不需要 API,直接用 Pipeline 批量处理:

import pandas as pd
import joblib
from pathlib import Path

model = joblib.load("credit_model_v1.joblib")

def batch_predict(input_path: str, output_path: str, batch_size: int = 10000):
    """分批处理大文件,避免内存溢出"""
    results = []
    for chunk in pd.read_csv(input_path, chunksize=batch_size):
        probs = model.predict_proba(chunk)[: , 1]
        chunk["default_prob"] = probs
        chunk["risk_level"] = pd.cut(
            probs, bins=[0, 0.3, 0.6, 1.0],
            labels=["低风险", "中风险", "高风险"]
        )
        results.append(chunk)

    pd.concat(results).to_csv(output_path, index=False)
    print(f"批量预测完成,结果已写入 {output_path}")

生产监控:数据漂移检测

模型上线后效果会随数据分布变化而下降(data drift)。必须持续监控:

from scipy.stats import ks_2samp
import numpy as np

def detect_feature_drift(train_data: pd.Series, current_data: pd.Series,
                          threshold: float = 0.05) -> dict:
    """用 KS 检验检测特征分布漂移"""
    stat, p_value = ks_2samp(train_data.dropna(), current_data.dropna())
    drifted = p_value < threshold
    return {
        "feature":    train_data.name,
        "ks_stat":    round(stat, 4),
        "p_value":    round(p_value, 4),
        "drifted":    drifted,
    }

# 示例:监控 age 特征是否漂移
train_age   = pd.Series(np.random.normal(35, 10, 1000), name="age")
current_age = pd.Series(np.random.normal(40, 12, 500),  name="age")  # 分布变化了

result = detect_feature_drift(train_age, current_age)
print(result)
# {'feature': 'age', 'ks_stat': 0.xxx, 'p_value': 0.00x, 'drifted': True}

监控体系建议

监控项频率告警阈值
输入特征分布(KS 检验)每日p < 0.05
预测分数分布每日均值偏移 >10%
有标签时的业务指标(AUC/F1)有标签后下降 >3%
空值率异常每次请求超过历史 2 倍

MLflow 实验跟踪

import mlflow
import mlflow.sklearn

mlflow.set_experiment("credit_scoring")

with mlflow.start_run(run_name="gbdt_v1"):
    pipeline.fit(X_train, y_train)

    # 记录参数
    mlflow.log_param("n_estimators", 100)
    mlflow.log_param("learning_rate", 0.1)

    # 记录指标
    from sklearn.metrics import roc_auc_score, f1_score
    y_pred = pipeline.predict(X_test)
    y_prob = pipeline.predict_proba(X_test)[:, 1]

    mlflow.log_metric("test_auc", roc_auc_score(y_test, y_prob))
    mlflow.log_metric("test_f1",  f1_score(y_test, y_pred))

    # 保存模型到 MLflow
    mlflow.sklearn.log_model(pipeline, "model")

print(f"实验记录完成,查看: mlflow ui")

一句话小结

工程化的核心是Pipeline 化(不会漏步骤)+ 版本化(模型/代码/数据可追溯)+ 监控化(漂移及时告警)。从 joblib.dump 到 FastAPI 到 MLflow,是模型从实验室走向生产的完整路径。

最后更新于