【Spark】ml/recommendation/ALS を試したみた【CF】

| 0件のコメント

前回, 構築した Spark on YARN 環境で ml/recommendation/ALS を試してみます。

  • macOS: 10.12.1
  • Java: 1.8.0_111
  • Apache Hadoop: 2.7.3
  • Apache Spark: 2.0.2

データを HDFS にロードする

今回使う data/mllib/als/sample_movielens_ratings.txt は左から, ユーザID, アイテムID, レイティング, 時間 となっている。 レイティングに 0 の多い疎な行列となっている。

$ head $SPARK_HOME/data/mllib/als/sample_movielens_ratings.txt
0::2::3::1424380312
0::3::1::1424380312
0::5::2::1424380312
0::9::4::1424380312
0::11::1::1424380312
0::12::2::1424380312
0::15::1::1424380312
0::17::1::1424380312
0::19::1::1424380312
0::21::1::1424380312

Local から HDFS に load する。

$ hdfs version
Hadoop 2.7.3
$ hdfs dfs -mkdir als
$ hdfs dfs -put $SPARK_HOME/data/mllib/als/sample_movielens_ratings.txt als/
$ hdfs dfs -ls -R
drwxr-xr-x   - user supergroup          0 2016-11-25 02:13 .sparkStaging
drwxr-xr-x   - user supergroup          0 2016-11-27 22:59 als
-rw-r--r--   1 user supergroup      32363 2016-11-27 22:59 als/sample_movielens_ratings.txt

ALS

推薦システムのアルゴリズム [1] の中で, 推薦の個人化の度合い [Ben Schafer 01] を以下の3段階に分けて紹介している。

  • 非個人化レコメンド(non personalization)
  • 顧客中心レコメンド(ephemeral personalization)
  • 商品中心レコメンド(persistent personalization)

協調フィルタリングは個人化されたレコメンドで, 内容ベースフィルタリングと比較して, 多様性・セレンディピティに優れておりドメイン知識を必要としない。

ALSの説明を “Sparkによる実践データ解析” の内容を参考にして要約してみる。

NMF (NonNegative Matrix Factorization) はユーザ – アイテム行列を隠れ空間に射影させ, その空間であるユーザに類似しているアイテムをレコメンドする方法。ユーザ – アイテムのデータを行列Aとしたとき NMF で因子分解した結果を X と Y とする。
A は疎であるが XYT は密な行列となる。これらは潜在的な特徴 (好みやジャンルに対応) を含む。
ただし, この推定で問題なのは A = XYT の解が一般には存在しないこと。これはランクが低すぎるためである。さらに, 行列分解で最善な X と最善の Y を同時には直接求められない。でも何とか XYT を A に近似させたい。
そこで ALS (Alternating Least Squares) という手法を用いる。
Y をランダムに選択された行ベクトルで初期化し, この Y と A から最善の X を求める。 ここで2乗の差の合計を最小 (Least Squares) にして X を推定する。逆行列の計算では解けないが, QR分解のような方法で解いていく。 また, X の各行は並列計算可能なので大規模分散処理に向いている。
Y の計算も推定された X を用いて同様にでき, これを交互に行う。
ランダムな値を使うことが問題となりそうだが, 繰り返すうちに妥当な値に収束するようである。

より詳しい ALS のアルゴリズムについては [2] を参考に。

ml/recommendation/ALS を試してみる

コードは examples/src/main/python/ml/als_example.py とほぼ同様で, 今回は HDFS から読み込むので read.text() に hdfs://… を指定している。

HDFS ではなく Local から読み込む場合 は $SPARK_HOME/data/mllib/als/sample_movielens_ratings.txt を指定する。

読み込んだ RDD から DataFrame を生成して, 訓練:評価 = 8:2 に分割している。
レコメンデーションモデルを構築する ALS() に指定するパラメータは [3] を参考に。

#coding:utf-8

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import print_function

import sys
if sys.version >= '3':
    long = int

from pyspark.sql import SparkSession

# $example on$
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
# $example off$

if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("FirstApp")\
        .getOrCreate()

    # $example on$
    lines = spark.read.text("hdfs://localhost:9000/user/you/als/sample_movielens_ratings.txt").rdd
    print("lines = " + str(lines.count()))
    
    parts = lines.map(lambda row: row.value.split("::"))
    ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                         rating=float(p[2]), timestamp=long(p[3])))
    ratings = spark.createDataFrame(ratingsRDD)
    (training, test) = ratings.randomSplit([0.8, 0.2])

    # Build the recommendation model using ALS on the training data
    als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
    model = als.fit(training)

    # Evaluate the model by computing the RMSE on the test data
    predictions = model.transform(test)
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="prediction")
    rmse = evaluator.evaluate(predictions)
    print("Root-mean-square error = " + str(rmse))

    # $example off$
    spark.stop()

Sparkアプリケーションをデプロイするために spark-submit コマンドを使う。
その前にINFOレベルのログ出力が多いのでログ出力のレベルを変更しておいた。

$ cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties
$ vim $SPARK_HOME/conf/log4j.properties

log4j.properties 中の `log4j.rootCategory=INFO, console` を `log4j.rootCategory=WARN, console` に変更。

spark-submit を実行。RMSEは 1.84 となった。

$ spark-submit --master yarn spark-cf-als.py
16/11/28 00:15:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/11/28 00:16:06 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
lines = 1501
Root-mean-square error = 1.83956717404

ちなみに, PySpark で Jupyter Notebook を使いたい場合は $SPARK_HOME/conf/spark-env.sh に以下を設定する。

PYSPARK_PYTHON=$PYENV_ROOT/shims/python
PYSPARK_DRIVER_PYTHON=$PYENV_ROOT/shims/jupyter
PYSPARK_DRIVER_PYTHON_OPTS="notebook"

“Sparkによる実践データ解析” の第3章では ALS を用いた音楽レコメンドを紹介しており CV によるチューニング例も解説している。


[1] 推薦システムのアルゴリズム
[2] Matrix Factorization Techniques for Recommender Systems
[3] ml-collaborative-filtering
[4] mllib-collaborative-filtering
[5] Matrix Factorizationとは
[6] Spark API チートシート
[7] How to turn off INFO logging in PySpark?

コメントを残す

必須欄は * がついています