PySpark을 이용한 머신러닝 튜토리얼 예제
공부 중인 PySpark. Python의 기능과 notebook이 제공되어 여러 예제들을 빠르게 돌려 볼 수 있었다.
오늘은 guru99에서 제공하는 예제가 있어서 이 예제를 돌리면서 PySpark 기본 ML 기능에 대해서 확인했다.
PySpark Tutorial for Beginners: Learn with EXAMPLES (guru99.com)
위의 문서는 약간 시간이 지나서 그런지, 사용하는 dataset과 약간 다른 구조가 있었고, 몇몇 module의 function명이 다르거나, 동작하지 않는 부분이 있어서, 공부하면서 진행했다. 오류를 보완한 노트북과 전체 코드는 아래 github repo에 올려 두었다.
전체코드: CloudBreadPaPa/pyspark-basic: pyspark basic self-study repo (github.com)
원본 문서의 순서는 설치부터 진행되는데, 최근에 진행한 문서를 참조하면 편하다.
개발자 커뮤니티 SQLER.com - Apache Spark, pyspark 설치 후 jupyter notebook 실행
기본적인 SparkContext 부분이나 SQLContext, 기본 데이터 처리 부분은 넘어가고, PySpark Machine Learning 부분을 바로 진행.
Spark Context
SparkContext는 Spark cluster와의 연결을 제공하는 오브젝트이다.
import pyspark from pyspark import SparkContext sc = SparkContext()
SQL Context
예제가 오래되었지만, 기록 목적으로 남겨둔다. 더 이상 SQLContext를 사용하지 않으며, 2.0 버전부터 SparkSession 으로 변경되었다. 간략히, SQL Context는 SQL 테이블과 유사한 structured 데이터를 쉽게 다룰 수 있도록 지원하는 오브젝트이며, DataFrame등을 손쉽게 사용할 수 있다.
from pyspark.sql import SQLContext sqlContext = SQLContext(sc)
PySpark Machine Learning 실행
기본적으로, Scikit Learn과 유사한 흐름으로 진행된다. Spark으로 처리되어 scikit-learn과 다른 부분 및 전체적인 흐름에 초점을 맞춰 리뷰한다.
from pyspark import SparkFiles url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv" sc.addFile(url) sqlContext = SQLContext(sc)
codeguru의 코드와 위의 데이터셋 구조가 약간 틀리다.
예를 들어, 코드는 label feature를 "label"로 지정하고 처리하는데, 데이터에는 "income"이 label feature이다. (너무 오래전 예제를 고른 것 같다.) 여하간, 코드의 문제를 수정하면서 진행했고, CloudBreadPaPa/pyspark-basic: pyspark basic self-study repo (github.com) 리포지토리에서 수정되어 잘 동작하는 전체 코드 notebook을 올려 두었으니, 여기를 참조하면 된다.
CSV 파일을 이렇게 로드하야 spark dataframe을 생성할 수 있다.
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
Spark Pipelne 생성
Spark ML Pipelines는 Spark DataFrame에 대한 Machine Learning API를 제공한다.
In this section, we introduce the concept of ML Pipelines. ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines.
ML Pipelines - Spark 3.1.2 Documentation (apache.org)
기본 설명 부분부터 scikit-learn에 영향을 받았다고 명시하고 있으며, 전체적인 흐름도 scikit-learn과 비슷하게 진행된다.
String feature를 numeric으로 변환
ML 트레이닝을 수행하기 위해서는 데이터를 이렇게 vector 형태, numeric 값으로 변환해야 한다.
아래는 String을 numeric으로 변환하고, one hot encoding을 적용하는 단계이다.
... stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded") ... encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec") ...
카테고리(Categorical) 데이터 인코딩
마찬가지로, 이렇게 카테고리 feature들을 인코딩한다.
... CATE_FEATURES = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'gender', 'native-country'] stages = [] # stages in our Pipeline for categoricalCol in CATE_FEATURES: stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index") encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"]) stages += [stringIndexer, encoder] ...
Numeric feature들을 병합
기존 카테고리 feature들을 변환했다면 다음으로 numeric feature들과 병합해 트레이닝을 위한 데이터셋을 준비한다.
... assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features") stages += [assembler] ...
Pipeline 생성
다음으로 Pipeline을 생성한다.
# Create a Pipeline. pipeline = Pipeline(stages=stages) pipelineModel = pipeline.fit(df_remove) model = pipelineModel.transform(df_remove) model.take(1)
결과 :
[Row(x=1, age=25, workclass='Private', fnlwgt=226802, education='11th', educational-num=7, marital-status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', gender='Male', capital-gain=0, capital-loss=0, hours-per-week=40, native-country='United-States', income='<=50K', age_square=625.0, workclassIndex=0.0, workclassclassVec=SparseVector(8, {0: 1.0}), educationIndex=5.0, educationclassVec=SparseVector(15, {5: 1.0}), marital-statusIndex=1.0, marital-statusclassVec=SparseVector(6, {1: 1.0}), occupationIndex=6.0, occupationclassVec=SparseVector(14, {6: 1.0}), relationshipIndex=2.0, relationshipclassVec=SparseVector(5, {2: 1.0}), raceIndex=1.0, raceclassVec=SparseVector(4, {1: 1.0}), genderIndex=0.0, genderclassVec=SparseVector(1, {0: 1.0}), native-countryIndex=0.0, native-countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {0: 1.0, 13: 1.0, 24: 1.0, 35: 1.0, 45: 1.0, 49: 1.0, 52: 1.0, 53: 1.0, 93: 25.0, 94: 226802.0, 96: 7.0, 98: 40.0}))]
Map을 이용해 Machine Learning의 트레이닝에 사용할 label과 feature들을 선택
... input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"]))) ... df_train = sqlContext.createDataFrame(input_data, ["income", "features"]) df_train.show(5)
결과
+------+--------------------+ |income| features| +------+--------------------+ | 0.0|[1.0,0.0,0.0,0.0,...| | 0.0|[1.0,0.0,0.0,0.0,...| | 1.0|[0.0,0.0,1.0,0.0,...| | 1.0|[1.0,0.0,0.0,0.0,...| | 0.0|[0.0,0.0,0.0,1.0,...| +------+--------------------+ only showing top 5 rows
Training 데이터셋과 Test 데이터셋 생성
scikit-learn과 마찬가지로, random split을 수행한다.
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)
로지스틱 회기(Logistics regression) 모델 생성
training 데이터가 위에서 분할 되었으니, 머신러닝 트레이닝을 수행해 model을 생성한다.
scikit-learn에서 제공하는 여러 분류/회기 모델들이 있는데, Spark MLlib과 어떻게 다른지 다른 포스트에서 체크 예정.
from pyspark.ml.classification import LogisticRegression # Initialize `lr` lr = LogisticRegression(labelCol="income", featuresCol="features", maxIter=10, regParam=0.3) # Fit the data to the model linearModel = lr.fit(train_data)
모델의 정확도 측정을 위해 test 데이터셋에 prediction 수행하고 evaluation 수행
scikit-learn과 키워드도 대부분 맞춘 듯 하다. prediction 결과를 test 결과와 비교해 accuracy를 출력한다.
def accuracy_m(model): predictions = model.transform(test_data) cm = predictions.select("income", "prediction") acc = cm.filter(cm.income == cm.prediction).count() / cm.count() print("Model accuracy: %.3f%%" % (acc * 100)) accuracy_m(model = linearModel)
결과: Model accuracy: 82.501%
Classification metric으로 사용되는 ROC 출력
ROC metric을 계산하기 위해서는 BinaryClassificationEvaluator — PySpark 3.1.2 documentation (apache.org) 을 사용한다. ROC metric에 대해서는 scikit-learn의 ROC 참조
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="income") print(evaluator.evaluate(predictions))
결과: 0.8893620345339743
Hyper parameter 튜닝 - ParamGrid cross validation(교차 검증)
모델의 성능을 높이기 위해 현재 사용하는 로지스틱 회기모델의 하이퍼 파라미터를 튜닝한다.
Param grid를 이용하고, 마찬가지로 cross validation을 5fold 수행한다.
로지스틱 회기모델의 regularization parameter (코드의 regParam) 값을 튜닝하는 코드이다.
paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, [0.01, 0.5]) .build()) ... cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5) # Run cross validations cvModel = cv.fit(train_data) accuracy_m(model = cvModel)
결과: Model accuracy: 84.911%
생성된 모델에서 best model을 가져오고 best model의 정보를 보면 regularization parameter 최적값을 볼 수 있다.
bestModel = cvModel.bestModel bestModel.extractParamMap()
결과
...
Param(parent='LogisticRegression_d38ef5edde44', name='regParam', doc='regularization parameter (>= 0).'): 0.01,
...
이렇게 Spark의 MLlib을 이용해 machine learning을 하는 과정을 살펴보았다.
scikit-learn과 유사해 접근하기 쉬웠고, 트레이닝/테스트 데이터셋 생성과 모델 생성, 모델 평가 과정까지 모두 같은 패턴으로 수행할 수 있는 것은 큰 장점이다.
분산 병렬 트레이닝 부분에서 아직 조금 더 리뷰할 부분이 남았지만, 문서들도 많고 오류가 생겨도 대부분 community를 통해 해결할 수 있었다.
참고링크
CloudBreadPaPa/pyspark-basic: pyspark basic self-study repo (github.com)
PySpark Tutorial for Beginners: Learn with EXAMPLES (guru99.com)
개발자 커뮤니티 SQLER.com - Apache Spark, pyspark 설치 후 jupyter notebook 실행
개발자 커뮤니티 SQLER.com - PySpark, koalas와 pandas dataframe
MLlib: Main Guide - Spark 3.1.2 Documentation (apache.org)
Receiver Operating Characteristic (ROC) — scikit-learn 1.0 documentation