Spark을 이용해서 기본적인 머신러닝을 수행하는 과정을 수행했다.
실행하면서 이 PySpark 머신러닝 과정이 Spark을 잘 활용해 분산처리를 수행하고 있는지 확인하는 과정과 정확히 Spark의 어떤 기능들이 분산처리를 수행하는지 확인하기 위해 리뷰했으며, 그 과정을 정리.
Azure Databricks - Spark에서 머신러닝 병렬 처리
이전 포스트에서는 모두 맨땅에 헤딩하면서 Spark을 구성하고 머신러닝 작업을 실행하였다. 이 문서와 앞으로 진행할 포스팅에서는 Azure Databricks를 이용한다.
- Azure Databricks Workspace 생성
- Spark 분산처리를 하지 않는 작업 수행
- Spark 분산처리 작업 수행 - MLlib
이 문서에서 사용한 코드는 3 Methods for Parallelization in Spark | by Ben Weber | Towards Data Science 문서에서 진행했으며, Azure Databricks를 이용하는 전체 코드는 아래에서 확인 가능.
Azure Databricks 전체 코드 : CloudBreadPaPa/pyspark-basic: pyspark basic self-study repo (github.com)
역시 유료 서비스가 편하다. Azure에서 Databricks를 클릭 몇 번 해서 만들면 세상 편하게 Spark Cluster가 완성된다.
Azure Databricks Workspace 생성
아래 링크를 통해 Databricks workspace를 생성할 수 있다.
Quickstart - Run a Spark job on Azure Databricks Workspace using Azure portal | Microsoft Docs
포털을 이용하는 방법으로 진행했으며, cli 등을 이용해 생성 역시 가능하다.
Worker node를 구성하는 부분을 보면 최소 옵션임에도 저렴해 보이지 않는다. 1개 worker를 이용하는 개발자 옵션도 있으니 적절하게 활용.
Spark 분산처리를 하지 않는 작업 수행
일반적인 Scikit-learn의 코드를 생각하면 된다. Spark workspace의 master 노드에서 작업하지만, 분산처리 되지 않는다.
# split dataset and run LinearRegression from sklearn.linear_model import LinearRegression from scipy.stats.stats import pearsonr # split into data and label arrays y = boston_pd['target'] X = boston_pd.drop(['target'], axis=1) # create training (~80%) and test data sets X_train = X[:400] X_test = X[400:] y_train = y[:400] y_test = y[400:] # train a classifier lr = LinearRegression() model = lr.fit(X_train, y_train) # make predictions y_pred = model.predict(X_test) # error metrics r = pearsonr(y_pred, y_test) mae = sum(abs(y_pred - y_test))/len(y_test) print("R-sqaured: " + str(r[0]**2)) print("MAE: " + str(mae))
Spark 분산처리 작업 수행 - MLlib
Spark session과 context를 이용해 작업한다. Spark의 분산처리를 활용하기 위해서는
- Pandas dataframe을 이용하기보다, Spark dataframe으로 데이터를 로드
- Spark의 MLlib에서 제공하는 VectorAssembler를 이용해 feature들을 하나의 single feature로 변환한다.
참고링크 : Extracting, transforming and selecting features - Spark 3.1.2 Documentation (apache.org)
from pyspark.sql import SparkSession from pyspark import SparkContext spark = SparkSession.builder \ .master("local") \ .appName("Spark ML") \ .getOrCreate() sc = SparkContext.getOrCreate() ... # convert to a Spark data frame boston_sp = spark.createDataFrame(boston_pd) display(boston_sp.take(5)) # split into training and test spark data frames ... # convert to vector representation for MLlib assembler = VectorAssembler(inputCols= boston_train.schema.names[:(boston_pd.shape[1] - 1)], outputCol="features" ) boston_train = assembler.transform(boston_train).select('features', 'target') boston_test = assembler.transform(boston_test).select('features', 'target') ...
이 과정을 수행하면, Spark에서 분산 처리되며, 아래와 같이 job으로 worker node에서 실행된다.
이어서 pyspark의 regression ML을 수행하여도 분산처리가 이루어진다.
# linear regresion with Spark from pyspark.ml.regression import LinearRegression # linear regression lr = LinearRegression(maxIter=100, regParam=0.1, elasticNetParam=0.5, labelCol="target") # Fit the model model = lr.fit(boston_train) boston_pred = model.transform(boston_test) # calculate results r = boston_pred.stat.corr("prediction", "target") print("R-sqaured: " + str(r**2))
Spark의 분산처리를 활용하는 부분은 hyper parameter tuning 부분과 Cross validation 부분이다.
아래의 코드처럼, pyspark에세 제공하는 CrossValidator를 이용해 분산처리를 수행하며, CV fold는 10으로 수행한다.
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder from pyspark.ml.evaluation import RegressionEvaluator crossval = CrossValidator(estimator=LinearRegression(labelCol = "target"), estimatorParamMaps=ParamGridBuilder().addGrid( LinearRegression.elasticNetParam, [0, 0.5, 1.0]).build(), evaluator=RegressionEvaluator( labelCol = "target", metricName = "r2"), numFolds=10) # cross validate the model and select the best fit cvModel = crossval.fit(boston_train) model = cvModel.bestModel # calculate results boston_pred = model.transform(boston_test) r = boston_pred.stat.corr("prediction", "target") print("R-sqaured: " + str(r**2))
실행하면 이렇게 job으로 분산처리 되는 것을 Azure Databricks 노트북에서 확인 가능하다.
개별 job에 대한 정보를 확인하면 이렇게 정보를 확인 가능하고,
DAG 시각화 정보는 아래처럼 표현된다.
Spark에서 기본 제공하는 화면도 Databricks에서 확인 가능.
이렇게, PySpark에서 기본 MLlib을 이용하여 spark의 분산처리를 수행하는 과정을 Azure Databricks에서 진행하였다.
MLlib을 이용하는 방법 외에, Databricks에서 제공하는 scikit-learn을 이용하는 MLflow를 이용해 트레이닝하고, Hyperopt를 이용해 hyperparameter tuning을 수행하는 방법도 있다. 또한, Deep learning을 이용하는 방법 역시 다음 포스트에서 시리즈로 리뷰할 예정.
참고링크
CloudBreadPaPa/pyspark-basic: pyspark basic self-study repo (github.com)
3 Methods for Parallelization in Spark | by Ben Weber | Towards Data Science
개발자 커뮤니티 SQLER.com - Apache Spark, pyspark 설치 후 jupyter notebook 실행
개발자 커뮤니티 SQLER.com - PySpark, koalas와 pandas dataframe
개발자 커뮤니티 SQLER.com - PySpark을 이용한 머신러닝 튜토리얼 예제
Quickstart - Run a Spark job on Azure Databricks Workspace using Azure portal | Microsoft Docs