Apache Sparkにて機械学習

⊥∀KA です。

今回は、Apache Sparkにて簡単に機械学習を行ってみたいと思います。

Sparkは簡単に言うと大規模データをオンメモリで分散処理を可能とするフレームワークです。
Hadoopも同じく分散処理を可能としますが、hdfsと呼ばれるファイルシステムにアクセスし処理するのに対し、
Sparkはメモリ上で処理が可能です。よって高速に分散処理が可能になるため、
ビックデータの分析や機械学習などに向いてます。
Spark単体で動作させることも可能ですし、Hadoopと連携することも可能です。

今回は、Sparkで簡単な機械学習を行ってみたいと思います。
Sparkのインストールは簡単です。モジュールをダウンロードして tarファイルを展開し、
環境変数などを設定するするだけですので、説明は省略します。
ちなみに、Sparkは、Scala、Java、Python、Rの言語に対応していますが、
今回はPythonでいきたいと思います。事前にPython3.6をインストールしています。
データは、機械学習でおなじみのIrisのCSVデータを事前に配置しています。

PySparkを起動します。

$ pyspark

// 省略 //

Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Python version 3.6.3 (default, Oct 13 2017 12:02:49)
SparkSession available as 'spark'.
>>>

読み込むCSV形式に合わせてスキーマ定義をします。

>>> from pyspark.sql.types import *

>>> fields = StructType([
StructField('SepalLength', DoubleType(), False),
StructField('SepalWidth', DoubleType(), False),
StructField('PetalLength', DoubleType(), False),
StructField('PetalWidth', DoubleType(), False),
StructField('Name', StringType(), False)
])

CSVを読み込みます。

>>> df = spark.read.csv("/home/spark/iris.csv",schema=fields,header="True",sep=",")

データの中身を確認します。

>>> df.show(5)
+-----------+----------+-----------+----------+-----------+
|SepalLength|SepalWidth|PetalLength|PetalWidth| Name|
+-----------+----------+-----------+----------+-----------+
| 5.1| 3.5| 1.4| 0.2|Iris-setosa|
| 4.9| 3.0| 1.4| 0.2|Iris-setosa|
| 4.7| 3.2| 1.3| 0.2|Iris-setosa|
| 4.6| 3.1| 1.5| 0.2|Iris-setosa|
| 5.0| 3.6| 1.4| 0.2|Iris-setosa|
+-----------+----------+-----------+----------+-----------+
only showing top 5 rows

目的変数となるNameは文字列なので数値ラベル化します。

>>> from pyspark.ml.feature import StringIndexer
>>> label_encode = StringIndexer(inputCol = 'Name', outputCol = 'label')

説明変数をベクトル化します。

>>> from pyspark.ml.feature import VectorAssembler
>>> assembler = VectorAssembler(inputCols=df.columns[0:4],outputCol = 'features')

説明変数を標準化します。

>>> from pyspark.ml.feature import StandardScaler
>>> scaler = StandardScaler(inputCol='features', outputCol= 'scaledFeatures')

ランダムフォレスト分類器を作成します。

>>> from pyspark.ml.classification import RandomForestClassifier
>>> lr = RandomForestClassifier(featuresCol = 'scaledFeatures', labelCol = 'label')

パイプラインを作成します。

>>> from pyspark.ml.pipeline import Pipeline
>>> pipeline = pyspark.ml.Pipeline(stages=[label_encode,assembler,scaler,lr])

学習データとテストデータを8:2で分けます。

>>> (train, test) = df.randomSplit([0.8, 0.2])

学習します。

>>> model = pipeline.fit(train)

予測します。

>>> from pyspark.ml.evaluation import MulticlassClassificationEvaluator
>>> predict = model.transform(test)
>>> evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
>>> accuracy = evaluator.evaluate(predict)
>>> print("Accuracy = %g" % (accuracy))
accuracy 94.59%

簡単に予測までできました。パイプラインでつないでいくのがコツです。
scikit-learnと少し使い勝手が違いますが、馴れれば難しくないと思います。
今度、大規模データかつ分散環境でどれくらいパフォーマンスが出るか試してみたいと思います。

Standby Expressに関するお問合わせ

  • TEL 042-333-6217
  • FAX 042-352-6101
  • LINE
  • Mail