Skip to content

Spark MLlib — 机器学习

什么是 MLlib

Spark MLlib 是 Spark 的分布式机器学习库,提供常用的机器学习算法和工具,能够在大规模数据集上训练模型。

两套 API

  • spark.mllib(旧):基于 RDD,已进入维护模式
  • spark.ml(新):基于 DataFrame,推荐使用,支持 Pipeline

ML Pipeline

Pipeline 是 MLlib 的核心抽象,将数据预处理和模型训练串联成流水线:

python
from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *

# 示例:预测用户是否会购买

# 1. 准备数据
df = spark.read.csv("/data/users.csv", header=True, inferSchema=True)
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# 2. 特征工程
tokenizer = Tokenizer(inputCol="description", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=1000)
idf = IDF(inputCol="raw_features", outputCol="text_features")

# 类别特征编码
indexer = StringIndexer(inputCol="category", outputCol="category_idx")
encoder = OneHotEncoder(inputCol="category_idx", outputCol="category_vec")

# 特征合并
assembler = VectorAssembler(
    inputCols=["age", "salary", "text_features", "category_vec"],
    outputCol="features"
)

# 特征标准化
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

# 3. 模型
lr = LogisticRegression(
    featuresCol="scaled_features",
    labelCol="label",
    maxIter=100,
    regParam=0.01
)

# 4. 构建 Pipeline
pipeline = Pipeline(stages=[
    tokenizer, hashingTF, idf,
    indexer, encoder,
    assembler, scaler,
    lr
])

# 5. 训练
model = pipeline.fit(train_df)

# 6. 预测
predictions = model.transform(test_df)
predictions.select("label", "prediction", "probability").show()

# 7. 评估
evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc:.4f}")

特征工程

特征提取

python
# TF-IDF 文本特征
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="raw_features")
idf = IDF(inputCol="raw_features", outputCol="features")

# Word2Vec
word2vec = Word2Vec(vectorSize=100, minCount=5,
                   inputCol="words", outputCol="word_vectors")

# PCA 降维
pca = PCA(k=10, inputCol="features", outputCol="pca_features")

特征转换

python
# 标准化(均值0,方差1)
scaler = StandardScaler(inputCol="features", outputCol="scaled",
                        withMean=True, withStd=True)

# 归一化([0,1] 范围)
minmax = MinMaxScaler(inputCol="features", outputCol="normalized")

# 类别编码
indexer = StringIndexer(inputCol="color", outputCol="color_idx")
encoder = OneHotEncoder(inputCol="color_idx", outputCol="color_vec")

# 分箱(连续变量离散化)
bucketizer = Bucketizer(
    splits=[0, 18, 35, 60, float("inf")],
    inputCol="age",
    outputCol="age_bucket"
)

# 特征合并
assembler = VectorAssembler(
    inputCols=["age", "salary", "color_vec"],
    outputCol="features"
)

# 多项式特征
poly = PolynomialExpansion(degree=2, inputCol="features", outputCol="poly_features")

分类算法

python
from pyspark.ml.classification import *

# 逻辑回归
lr = LogisticRegression(maxIter=100, regParam=0.01, elasticNetParam=0.0)

# 决策树
dt = DecisionTreeClassifier(maxDepth=5, labelCol="label")

# 随机森林
rf = RandomForestClassifier(numTrees=100, maxDepth=5, seed=42)

# 梯度提升树(GBT)
gbt = GBTClassifier(maxIter=50, maxDepth=5, stepSize=0.1)

# 支持向量机(线性 SVM)
svm = LinearSVC(maxIter=100, regParam=0.01)

# 朴素贝叶斯
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# 多层感知机(神经网络)
mlp = MultilayerPerceptronClassifier(
    layers=[4, 8, 8, 3],  # 输入层4个特征,两个隐藏层,输出层3个类别
    maxIter=100,
    seed=42
)

回归算法

python
from pyspark.ml.regression import *

# 线性回归
lr = LinearRegression(maxIter=100, regParam=0.01, elasticNetParam=0.0)

# 决策树回归
dt = DecisionTreeRegressor(maxDepth=5)

# 随机森林回归
rf = RandomForestRegressor(numTrees=100, maxDepth=5)

# GBT 回归
gbt = GBTRegressor(maxIter=50, maxDepth=5)

# 训练和评估
model = lr.fit(train_df)
predictions = model.transform(test_df)

from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse:.4f}")

# 查看模型系数
print(f"Coefficients: {model.coefficients}")
print(f"Intercept: {model.intercept}")

聚类算法

python
from pyspark.ml.clustering import *

# K-Means
kmeans = KMeans(k=5, seed=42, maxIter=20)
model = kmeans.fit(df)
predictions = model.transform(df)

# 评估(轮廓系数)
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette: {silhouette:.4f}")

# 查看聚类中心
centers = model.clusterCenters()
for i, center in enumerate(centers):
    print(f"Cluster {i}: {center}")

# 高斯混合模型(GMM)
gmm = GaussianMixture(k=5, seed=42)
model = gmm.fit(df)

# LDA 主题模型
lda = LDA(k=10, maxIter=10, optimizer="em")
model = lda.fit(df)
topics = model.describeTopics(maxTermsPerTopic=5)

超参数调优

python
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit

# 构建参数网格
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.maxIter, [50, 100]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# 交叉验证(5折)
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=BinaryClassificationEvaluator(),
    numFolds=5,
    parallelism=4  # 并行评估
)

cv_model = cv.fit(train_df)
best_model = cv_model.bestModel

# 查看最佳参数
print(cv_model.avgMetrics)

# 训练验证分割(更快,适合大数据集)
tvs = TrainValidationSplit(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=BinaryClassificationEvaluator(),
    trainRatio=0.8
)
tvs_model = tvs.fit(train_df)

模型保存与加载

python
# 保存 Pipeline 模型
model.save("hdfs://namenode:9000/models/user_purchase_model")

# 加载模型
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("hdfs://namenode:9000/models/user_purchase_model")

# 批量预测
predictions = loaded_model.transform(new_data)

协同过滤推荐

python
from pyspark.ml.recommendation import ALS

# 准备评分数据
ratings = spark.read.csv("/data/ratings.csv", header=True, inferSchema=True)
# 列:userId, movieId, rating

# 训练 ALS 模型
als = ALS(
    maxIter=10,
    regParam=0.01,
    rank=10,
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop"  # 处理冷启动问题
)

model = als.fit(ratings)

# 为所有用户推荐 Top 10 电影
user_recs = model.recommendForAllUsers(10)
user_recs.show()

# 为指定用户推荐
users = ratings.select("userId").distinct().limit(5)
user_subset_recs = model.recommendForUserSubset(users, 10)

# 评估
predictions = model.transform(ratings)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse:.4f}")

本站内容由 褚成志 整理编写,仅供学习参考