Spark MLlib — 机器学习
什么是 MLlib
Spark MLlib 是 Spark 的分布式机器学习库,提供常用的机器学习算法和工具,能够在大规模数据集上训练模型。
两套 API:
spark.mllib(旧):基于 RDD,已进入维护模式spark.ml(新):基于 DataFrame,推荐使用,支持 Pipeline
ML Pipeline
Pipeline 是 MLlib 的核心抽象,将数据预处理和模型训练串联成流水线:
python
from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
# 示例:预测用户是否会购买
# 1. 准备数据
df = spark.read.csv("/data/users.csv", header=True, inferSchema=True)
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# 2. 特征工程
tokenizer = Tokenizer(inputCol="description", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=1000)
idf = IDF(inputCol="raw_features", outputCol="text_features")
# 类别特征编码
indexer = StringIndexer(inputCol="category", outputCol="category_idx")
encoder = OneHotEncoder(inputCol="category_idx", outputCol="category_vec")
# 特征合并
assembler = VectorAssembler(
inputCols=["age", "salary", "text_features", "category_vec"],
outputCol="features"
)
# 特征标准化
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
# 3. 模型
lr = LogisticRegression(
featuresCol="scaled_features",
labelCol="label",
maxIter=100,
regParam=0.01
)
# 4. 构建 Pipeline
pipeline = Pipeline(stages=[
tokenizer, hashingTF, idf,
indexer, encoder,
assembler, scaler,
lr
])
# 5. 训练
model = pipeline.fit(train_df)
# 6. 预测
predictions = model.transform(test_df)
predictions.select("label", "prediction", "probability").show()
# 7. 评估
evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc:.4f}")特征工程
特征提取
python
# TF-IDF 文本特征
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="raw_features")
idf = IDF(inputCol="raw_features", outputCol="features")
# Word2Vec
word2vec = Word2Vec(vectorSize=100, minCount=5,
inputCol="words", outputCol="word_vectors")
# PCA 降维
pca = PCA(k=10, inputCol="features", outputCol="pca_features")特征转换
python
# 标准化(均值0,方差1)
scaler = StandardScaler(inputCol="features", outputCol="scaled",
withMean=True, withStd=True)
# 归一化([0,1] 范围)
minmax = MinMaxScaler(inputCol="features", outputCol="normalized")
# 类别编码
indexer = StringIndexer(inputCol="color", outputCol="color_idx")
encoder = OneHotEncoder(inputCol="color_idx", outputCol="color_vec")
# 分箱(连续变量离散化)
bucketizer = Bucketizer(
splits=[0, 18, 35, 60, float("inf")],
inputCol="age",
outputCol="age_bucket"
)
# 特征合并
assembler = VectorAssembler(
inputCols=["age", "salary", "color_vec"],
outputCol="features"
)
# 多项式特征
poly = PolynomialExpansion(degree=2, inputCol="features", outputCol="poly_features")分类算法
python
from pyspark.ml.classification import *
# 逻辑回归
lr = LogisticRegression(maxIter=100, regParam=0.01, elasticNetParam=0.0)
# 决策树
dt = DecisionTreeClassifier(maxDepth=5, labelCol="label")
# 随机森林
rf = RandomForestClassifier(numTrees=100, maxDepth=5, seed=42)
# 梯度提升树(GBT)
gbt = GBTClassifier(maxIter=50, maxDepth=5, stepSize=0.1)
# 支持向量机(线性 SVM)
svm = LinearSVC(maxIter=100, regParam=0.01)
# 朴素贝叶斯
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
# 多层感知机(神经网络)
mlp = MultilayerPerceptronClassifier(
layers=[4, 8, 8, 3], # 输入层4个特征,两个隐藏层,输出层3个类别
maxIter=100,
seed=42
)回归算法
python
from pyspark.ml.regression import *
# 线性回归
lr = LinearRegression(maxIter=100, regParam=0.01, elasticNetParam=0.0)
# 决策树回归
dt = DecisionTreeRegressor(maxDepth=5)
# 随机森林回归
rf = RandomForestRegressor(numTrees=100, maxDepth=5)
# GBT 回归
gbt = GBTRegressor(maxIter=50, maxDepth=5)
# 训练和评估
model = lr.fit(train_df)
predictions = model.transform(test_df)
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse:.4f}")
# 查看模型系数
print(f"Coefficients: {model.coefficients}")
print(f"Intercept: {model.intercept}")聚类算法
python
from pyspark.ml.clustering import *
# K-Means
kmeans = KMeans(k=5, seed=42, maxIter=20)
model = kmeans.fit(df)
predictions = model.transform(df)
# 评估(轮廓系数)
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette: {silhouette:.4f}")
# 查看聚类中心
centers = model.clusterCenters()
for i, center in enumerate(centers):
print(f"Cluster {i}: {center}")
# 高斯混合模型(GMM)
gmm = GaussianMixture(k=5, seed=42)
model = gmm.fit(df)
# LDA 主题模型
lda = LDA(k=10, maxIter=10, optimizer="em")
model = lda.fit(df)
topics = model.describeTopics(maxTermsPerTopic=5)超参数调优
python
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
# 构建参数网格
paramGrid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
.addGrid(lr.maxIter, [50, 100]) \
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
.build()
# 交叉验证(5折)
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=5,
parallelism=4 # 并行评估
)
cv_model = cv.fit(train_df)
best_model = cv_model.bestModel
# 查看最佳参数
print(cv_model.avgMetrics)
# 训练验证分割(更快,适合大数据集)
tvs = TrainValidationSplit(
estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
trainRatio=0.8
)
tvs_model = tvs.fit(train_df)模型保存与加载
python
# 保存 Pipeline 模型
model.save("hdfs://namenode:9000/models/user_purchase_model")
# 加载模型
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("hdfs://namenode:9000/models/user_purchase_model")
# 批量预测
predictions = loaded_model.transform(new_data)协同过滤推荐
python
from pyspark.ml.recommendation import ALS
# 准备评分数据
ratings = spark.read.csv("/data/ratings.csv", header=True, inferSchema=True)
# 列:userId, movieId, rating
# 训练 ALS 模型
als = ALS(
maxIter=10,
regParam=0.01,
rank=10,
userCol="userId",
itemCol="movieId",
ratingCol="rating",
coldStartStrategy="drop" # 处理冷启动问题
)
model = als.fit(ratings)
# 为所有用户推荐 Top 10 电影
user_recs = model.recommendForAllUsers(10)
user_recs.show()
# 为指定用户推荐
users = ratings.select("userId").distinct().limit(5)
user_subset_recs = model.recommendForUserSubset(users, 10)
# 评估
predictions = model.transform(ratings)
evaluator = RegressionEvaluator(
metricName="rmse",
labelCol="rating",
predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse:.4f}")