머신러닝 & AI 개발자 Tip & 강좌 게시판

Data Scientist와 개발자를 위한 머신러닝, AI 등 개발 Tip과 강좌 게시판 입니다. 일반적인 머신러닝은 물론 딥러닝(Tensorflow, Keras, PyTorch 등), 인공지능 관련 업무를 진행하면서 얻은 Tip이나 강좌, 새로운 소식을 적어 주시면 다른 개발자 분들에게 큰 도움이 됩니다.

Spark을 이용해서 기본적인 머신러닝을 수행하는 과정을 수행했다.

실행하면서 이 PySpark 머신러닝 과정이 Spark을 잘 활용해 분산처리를 수행하고 있는지 확인하는 과정과 정확히 Spark의 어떤 기능들이 분산처리를 수행하는지 확인하기 위해 리뷰했으며, 그 과정을 정리.

 

Azure Databricks - Spark에서 머신러닝 병렬 처리

이전 포스트에서는 모두 맨땅에 헤딩하면서 Spark을 구성하고 머신러닝 작업을 실행하였다. 이 문서와 앞으로 진행할 포스팅에서는 Azure Databricks를 이용한다. 

 

- Azure Databricks Workspace 생성

- Spark 분산처리를 하지 않는 작업 수행

- Spark 분산처리 작업 수행 - MLlib

 

이 문서에서 사용한 코드는 3 Methods for Parallelization in Spark | by Ben Weber | Towards Data Science 문서에서 진행했으며, Azure Databricks를 이용하는 전체 코드는 아래에서 확인 가능.

Azure Databricks 전체 코드 : CloudBreadPaPa/pyspark-basic: pyspark basic self-study repo (github.com)

 

역시 유료 서비스가 편하다. Azure에서 Databricks를 클릭 몇 번 해서 만들면 세상 편하게 Spark Cluster가 완성된다.

 

Azure Databricks Workspace 생성

아래 링크를 통해 Databricks workspace를 생성할 수 있다. 

Quickstart - Run a Spark job on Azure Databricks Workspace using Azure portal | Microsoft Docs

포털을 이용하는 방법으로 진행했으며, cli 등을 이용해 생성 역시 가능하다.

 

create-databricks-spark-cluster.png

Worker node를 구성하는 부분을 보면 최소 옵션임에도 저렴해 보이지 않는다. 1개 worker를 이용하는 개발자 옵션도 있으니 적절하게 활용.

 

Spark 분산처리를 하지 않는 작업 수행

일반적인 Scikit-learn의 코드를 생각하면 된다. Spark workspace의 master 노드에서 작업하지만, 분산처리 되지 않는다.

 

# split dataset and run LinearRegression 
from sklearn.linear_model import LinearRegression
from scipy.stats.stats import pearsonr

# split into data and label arrays 
y = boston_pd['target']
X = boston_pd.drop(['target'], axis=1)

# create training (~80%) and test data sets
X_train = X[:400]
X_test = X[400:]
y_train = y[:400]
y_test = y[400:]

# train a classifier 
lr = LinearRegression()
model = lr.fit(X_train, y_train)

# make predictions
y_pred = model.predict(X_test)

# error metrics
r = pearsonr(y_pred, y_test)
mae = sum(abs(y_pred - y_test))/len(y_test)
print("R-sqaured: " + str(r[0]**2))
print("MAE: " + str(mae))

 

 

Spark 분산처리 작업 수행 - MLlib

Spark session과 context를 이용해 작업한다. Spark의 분산처리를 활용하기 위해서는

- Pandas dataframe을 이용하기보다, Spark dataframe으로 데이터를 로드

- Spark의 MLlib에서 제공하는 VectorAssembler를 이용해 feature들을 하나의 single feature로 변환한다.

참고링크 : Extracting, transforming and selecting features - Spark 3.1.2 Documentation (apache.org)

 

from pyspark.sql import SparkSession
from pyspark import SparkContext

spark = SparkSession.builder \
    .master("local") \
    .appName("Spark ML") \
    .getOrCreate()

sc = SparkContext.getOrCreate()

...

# convert to a Spark data frame
boston_sp = spark.createDataFrame(boston_pd)
display(boston_sp.take(5))

# split into training and test spark data frames
...

# convert to vector representation for MLlib
assembler = VectorAssembler(inputCols= boston_train.schema.names[:(boston_pd.shape[1] - 1)], outputCol="features" )
boston_train = assembler.transform(boston_train).select('features', 'target') 
boston_test = assembler.transform(boston_test).select('features', 'target') 
...

이 과정을 수행하면, Spark에서 분산 처리되며, 아래와 같이 job으로 worker node에서 실행된다.

 

spark_distributed_job.png

 

이어서 pyspark의 regression ML을 수행하여도 분산처리가 이루어진다.

# linear regresion with Spark
from pyspark.ml.regression import LinearRegression

# linear regression 
lr = LinearRegression(maxIter=100, regParam=0.1, 
                      elasticNetParam=0.5, labelCol="target")

# Fit the model
model = lr.fit(boston_train)
boston_pred = model.transform(boston_test)

# calculate results 
r = boston_pred.stat.corr("prediction", "target")
print("R-sqaured: " + str(r**2))

 

Spark의 분산처리를 활용하는 부분은 hyper parameter tuning 부분과 Cross validation 부분이다. 

아래의 코드처럼, pyspark에세 제공하는 CrossValidator를 이용해 분산처리를 수행하며, CV fold는 10으로 수행한다.

 

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

crossval = CrossValidator(estimator=LinearRegression(labelCol = "target"),  
                         estimatorParamMaps=ParamGridBuilder().addGrid(
                           LinearRegression.elasticNetParam, [0, 0.5, 1.0]).build(),
                         evaluator=RegressionEvaluator(
                           labelCol = "target", metricName = "r2"),
                         numFolds=10)

# cross validate the model and select the best fit
cvModel = crossval.fit(boston_train) 
model = cvModel.bestModel

# calculate results 
boston_pred = model.transform(boston_test)
r = boston_pred.stat.corr("prediction", "target")
print("R-sqaured: " + str(r**2))

 

실행하면 이렇게 job으로 분산처리 되는 것을 Azure Databricks 노트북에서 확인 가능하다.

spark_distributed_job_05.png

 

개별 job에 대한 정보를 확인하면 이렇게 정보를 확인 가능하고,

spark_distributed_job_04.png

 

DAG 시각화 정보는 아래처럼 표현된다.

spark_distributed_job_03.png

 

Spark에서 기본 제공하는 화면도 Databricks에서 확인 가능.

spark_distributed_job_02.png

 

 

이렇게, PySpark에서 기본 MLlib을 이용하여 spark의 분산처리를 수행하는 과정을 Azure Databricks에서 진행하였다.

 

MLlib을 이용하는 방법 외에, Databricks에서 제공하는 scikit-learn을 이용하는 MLflow를 이용해 트레이닝하고, Hyperopt를 이용해 hyperparameter tuning을 수행하는 방법도 있다. 또한, Deep learning을 이용하는 방법 역시 다음 포스트에서 시리즈로 리뷰할 예정.

 

참고링크

CloudBreadPaPa/pyspark-basic: pyspark basic self-study repo (github.com)

3 Methods for Parallelization in Spark | by Ben Weber | Towards Data Science

개발자 커뮤니티 SQLER.com - Apache Spark, pyspark 설치 후 jupyter notebook 실행

개발자 커뮤니티 SQLER.com - PySpark, koalas와 pandas dataframe

개발자 커뮤니티 SQLER.com - PySpark을 이용한 머신러닝 튜토리얼 예제

Quickstart - Run a Spark job on Azure Databricks Workspace using Azure portal | Microsoft Docs

 

No. Subject Author Date Views
39 (2) AzureML - Azure Machine Learning 이란 무엇인가? file 코난(김대우) 2021.10.25 242
38 kakaobrain pororo - Automated Essay Scorer 리뷰 코난(김대우) 2021.10.21 216
37 kakobrain에서 발표한 pororo 리뷰 file 코난(김대우) 2021.10.20 601
36 Azure Databricks MLflow를 이용한 MLOps - CI/CD 및 deployment 포함 file 코난(김대우) 2021.10.15 142
35 Azure Databricks MLflow를 이용한 MLOps file 코난(김대우) 2021.10.14 153
34 Azure Databricks - MLflow를 이용한 머신러닝(5) - Model 배포 file 코난(김대우) 2021.10.14 259
33 Azure Databricks - MLflow를 이용한 머신러닝(4) - Model Registry file 코난(김대우) 2021.10.12 159
32 Azure Databricks - MLflow를 이용한 머신러닝(3) - Project file 코난(김대우) 2021.10.08 129
31 Azure Databricks - MLflow를 이용한 머신러닝(2) - Tracking file 코난(김대우) 2021.10.08 158
30 Azure Databricks - MLflow를 이용한 머신러닝(1) file 코난(김대우) 2021.10.08 184
» Azure Databricks - Spark에서 머신러닝 분산 처리 file 코난(김대우) 2021.10.07 93
28 PySpark cheat sheet 자료 - RDD, 데이터 처리 file 코난(김대우) 2021.10.01 80
27 PySpark을 이용한 머신러닝 튜토리얼 예제 코난(김대우) 2021.10.01 528
26 Form Recognizer로 문서에서 표 데이터 추출 file 코난(김대우) 2021.01.21 290
25 MLaaS - (1) 12가지의 머신러닝을 먼저 도입한 기업들의 고민 file 코난(김대우) 2021.01.15 559
24 Python 머신러닝 강좌 - 15. Matplotlib으로 데이터 시각화(visualization) file 코난(김대우) 2021.01.09 475
23 Python 머신러닝 강좌 - 14. NumPy와 Pandas 코난(김대우) 2021.01.09 416
22 Python 머신러닝 강좌 - 13. 모델의 정확도 평가(accuracy evaluating) 코난(김대우) 2021.01.09 738
21 Python 머신러닝 강좌 - 12. 머신러닝 모델 테스트 코난(김대우) 2021.01.09 541
20 Python 머신러닝 강좌 - 11. scikit-learn으로 선형회기(linear regression) 모델 머신러닝 트레이닝 수행 코난(김대우) 2021.01.08 229





XE Login