Spark을 이용해서 기본적인 머신러닝을 수행하는 과정을 수행했다.

실행하면서 이 PySpark 머신러닝 과정이 Spark을 잘 활용해 분산처리를 수행하고 있는지 확인하는 과정과 정확히 Spark의 어떤 기능들이 분산처리를 수행하는지 확인하기 위해 리뷰했으며, 그 과정을 정리.

 

Azure Databricks - Spark에서 머신러닝 병렬 처리

이전 포스트에서는 모두 맨땅에 헤딩하면서 Spark을 구성하고 머신러닝 작업을 실행하였다. 이 문서와 앞으로 진행할 포스팅에서는 Azure Databricks를 이용한다. 

 

- Azure Databricks Workspace 생성

- Spark 분산처리를 하지 않는 작업 수행

- Spark 분산처리 작업 수행 - MLlib

 

이 문서에서 사용한 코드는 3 Methods for Parallelization in Spark | by Ben Weber | Towards Data Science 문서에서 진행했으며, Azure Databricks를 이용하는 전체 코드는 아래에서 확인 가능.

Azure Databricks 전체 코드 : CloudBreadPaPa/pyspark-basic: pyspark basic self-study repo (github.com)

 

역시 유료 서비스가 편하다. Azure에서 Databricks를 클릭 몇 번 해서 만들면 세상 편하게 Spark Cluster가 완성된다.

 

Azure Databricks Workspace 생성

아래 링크를 통해 Databricks workspace를 생성할 수 있다. 

Quickstart - Run a Spark job on Azure Databricks Workspace using Azure portal | Microsoft Docs

포털을 이용하는 방법으로 진행했으며, cli 등을 이용해 생성 역시 가능하다.

 

create-databricks-spark-cluster.png

Worker node를 구성하는 부분을 보면 최소 옵션임에도 저렴해 보이지 않는다. 1개 worker를 이용하는 개발자 옵션도 있으니 적절하게 활용.

 

Spark 분산처리를 하지 않는 작업 수행

일반적인 Scikit-learn의 코드를 생각하면 된다. Spark workspace의 master 노드에서 작업하지만, 분산처리 되지 않는다.

 

# split dataset and run LinearRegression 
from sklearn.linear_model import LinearRegression
from scipy.stats.stats import pearsonr

# split into data and label arrays 
y = boston_pd['target']
X = boston_pd.drop(['target'], axis=1)

# create training (~80%) and test data sets
X_train = X[:400]
X_test = X[400:]
y_train = y[:400]
y_test = y[400:]

# train a classifier 
lr = LinearRegression()
model = lr.fit(X_train, y_train)

# make predictions
y_pred = model.predict(X_test)

# error metrics
r = pearsonr(y_pred, y_test)
mae = sum(abs(y_pred - y_test))/len(y_test)
print("R-sqaured: " + str(r[0]**2))
print("MAE: " + str(mae))

 

 

Spark 분산처리 작업 수행 - MLlib

Spark session과 context를 이용해 작업한다. Spark의 분산처리를 활용하기 위해서는

- Pandas dataframe을 이용하기보다, Spark dataframe으로 데이터를 로드

- Spark의 MLlib에서 제공하는 VectorAssembler를 이용해 feature들을 하나의 single feature로 변환한다.

참고링크 : Extracting, transforming and selecting features - Spark 3.1.2 Documentation (apache.org)

 

from pyspark.sql import SparkSession
from pyspark import SparkContext

spark = SparkSession.builder \
    .master("local") \
    .appName("Spark ML") \
    .getOrCreate()

sc = SparkContext.getOrCreate()

...

# convert to a Spark data frame
boston_sp = spark.createDataFrame(boston_pd)
display(boston_sp.take(5))

# split into training and test spark data frames
...

# convert to vector representation for MLlib
assembler = VectorAssembler(inputCols= boston_train.schema.names[:(boston_pd.shape[1] - 1)], outputCol="features" )
boston_train = assembler.transform(boston_train).select('features', 'target') 
boston_test = assembler.transform(boston_test).select('features', 'target') 
...

이 과정을 수행하면, Spark에서 분산 처리되며, 아래와 같이 job으로 worker node에서 실행된다.

 

spark_distributed_job.png

 

이어서 pyspark의 regression ML을 수행하여도 분산처리가 이루어진다.

# linear regresion with Spark
from pyspark.ml.regression import LinearRegression

# linear regression 
lr = LinearRegression(maxIter=100, regParam=0.1, 
                      elasticNetParam=0.5, labelCol="target")

# Fit the model
model = lr.fit(boston_train)
boston_pred = model.transform(boston_test)

# calculate results 
r = boston_pred.stat.corr("prediction", "target")
print("R-sqaured: " + str(r**2))

 

Spark의 분산처리를 활용하는 부분은 hyper parameter tuning 부분과 Cross validation 부분이다. 

아래의 코드처럼, pyspark에세 제공하는 CrossValidator를 이용해 분산처리를 수행하며, CV fold는 10으로 수행한다.

 

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

crossval = CrossValidator(estimator=LinearRegression(labelCol = "target"),  
                         estimatorParamMaps=ParamGridBuilder().addGrid(
                           LinearRegression.elasticNetParam, [0, 0.5, 1.0]).build(),
                         evaluator=RegressionEvaluator(
                           labelCol = "target", metricName = "r2"),
                         numFolds=10)

# cross validate the model and select the best fit
cvModel = crossval.fit(boston_train) 
model = cvModel.bestModel

# calculate results 
boston_pred = model.transform(boston_test)
r = boston_pred.stat.corr("prediction", "target")
print("R-sqaured: " + str(r**2))

 

실행하면 이렇게 job으로 분산처리 되는 것을 Azure Databricks 노트북에서 확인 가능하다.

spark_distributed_job_05.png

 

개별 job에 대한 정보를 확인하면 이렇게 정보를 확인 가능하고,

spark_distributed_job_04.png

 

DAG 시각화 정보는 아래처럼 표현된다.

spark_distributed_job_03.png

 

Spark에서 기본 제공하는 화면도 Databricks에서 확인 가능.

spark_distributed_job_02.png

 

 

이렇게, PySpark에서 기본 MLlib을 이용하여 spark의 분산처리를 수행하는 과정을 Azure Databricks에서 진행하였다.

 

MLlib을 이용하는 방법 외에, Databricks에서 제공하는 scikit-learn을 이용하는 MLflow를 이용해 트레이닝하고, Hyperopt를 이용해 hyperparameter tuning을 수행하는 방법도 있다. 또한, Deep learning을 이용하는 방법 역시 다음 포스트에서 시리즈로 리뷰할 예정.

 

참고링크

CloudBreadPaPa/pyspark-basic: pyspark basic self-study repo (github.com)

3 Methods for Parallelization in Spark | by Ben Weber | Towards Data Science

개발자 커뮤니티 SQLER.com - Apache Spark, pyspark 설치 후 jupyter notebook 실행

개발자 커뮤니티 SQLER.com - PySpark, koalas와 pandas dataframe

개발자 커뮤니티 SQLER.com - PySpark을 이용한 머신러닝 튜토리얼 예제

Quickstart - Run a Spark job on Azure Databricks Workspace using Azure portal | Microsoft Docs

 

No. Subject Author Date Views
Notice SQL강좌: 챗GPT와 함께 배우는 SQL Server 무료 강좌 목차와 소개 (2023년 9월 업데이트) 코난(김대우) 2023.08.18 38045
Notice Python 무료 강좌 - 기초, 중급, 머신러닝(2023년 6월 업데이트) 코난(김대우) 2021.01.01 20666
34 Azure Databricks - MLflow를 이용한 머신러닝(5) - Model 배포 file 코난(김대우) 2021.10.14 550
33 Azure Databricks - MLflow를 이용한 머신러닝(4) - Model Registry file 코난(김대우) 2021.10.12 389
32 Azure Databricks - MLflow를 이용한 머신러닝(3) - Project file 코난(김대우) 2021.10.08 368
31 Azure Databricks - MLflow를 이용한 머신러닝(2) - Tracking file 코난(김대우) 2021.10.08 484
30 Azure Databricks - MLflow를 이용한 머신러닝(1) file 코난(김대우) 2021.10.08 510
» Azure Databricks - Spark에서 머신러닝 분산 처리 file 코난(김대우) 2021.10.07 205
28 PySpark cheat sheet 자료 - RDD, 데이터 처리 file 코난(김대우) 2021.10.01 176
27 PySpark을 이용한 머신러닝 튜토리얼 예제 코난(김대우) 2021.10.01 952
26 Form Recognizer로 문서에서 표 데이터 추출 file 코난(김대우) 2021.01.21 433
25 MLaaS - 12가지의 머신러닝을 먼저 도입한 기업들의 고민 file 코난(김대우) 2021.01.15 807
24 Python 머신러닝 강좌 - 15. Matplotlib으로 데이터 시각화(visualization) file 코난(김대우) 2021.01.09 837
23 Python 머신러닝 강좌 - 14. NumPy와 Pandas 코난(김대우) 2021.01.09 776
22 Python 머신러닝 강좌 - 13. 모델의 정확도 평가(accuracy evaluating) 코난(김대우) 2021.01.09 1753
21 Python 머신러닝 강좌 - 12. 머신러닝 모델 테스트 코난(김대우) 2021.01.09 1084
20 Python 머신러닝 강좌 - 11. scikit-learn으로 선형회귀(linear regression) 모델 머신러닝 트레이닝 수행 코난(김대우) 2021.01.08 442
19 Python 머신러닝 강좌 - 10. 머신러닝을 위해 scikit-learn으로 트레이닝 데이터와 테스트 데이터 분할 코난(김대우) 2021.01.08 635
18 Python 머신러닝 강좌 - 9. 중복데이터와 결측값(missing value) 처리 코난(김대우) 2021.01.08 318
17 Python 머신러닝 강좌 - 8. Pandas DataFrame 컬럼(column) 분할(split)과 삭제(remove) 코난(김대우) 2021.01.08 482
16 Python 머신러닝 강좌 - 7. Pandas DataFrame으로 CSV 파일 읽고 쓰기 코난(김대우) 2021.01.08 621
15 Python 머신러닝 강좌 - 6. CSV 파일과 주피터 노트북 file 코난(김대우) 2021.01.08 587





XE Login