지난 시간에 진행했던 Bulk Insert 관련 작업과 이어지는 내용.
개발자 커뮤니티 SQLER.com - Azure SQL Database로 CSV 파일 BULK INSERT - Python
최초 목표는 ADX(Azure Data Explorer)의 테이블 데이터를 ASDB(Azure SQL Database)로 옮기는 작업.
이를 위해 ADF(Azure Data Factory)를 이용한 방법 등을 리뷰했으며, Bulk Insert 과정도 진행.
즉, 이 문서에서 만들어진 CSV 파일을 ASDB로 Bulk Insert 하는 것이 목표.
ADX에서 ASDB로 프로그래밍적인 방안을 이용해 데이터를 트랜스퍼 하려면,
- ADX에서 쿼리
- 결과를 CSV로 전환
- CSV를 Bulk Insert로 ASDB에 추가
과정을 진행해야 한다.
ADX는 여러 클라이언트 SDK를 지원한다. .NET, Java, Node, Python, Go 등을 선택 가능하며, 최종 목표가 CSV로 말아 올리는 것이기 때문에, Python과 Pandas를 선택했다.
Azure Data Explorer의 데이터를 Python Pandas Dataframe과 CSV로 변환
이 과정의 코드는 모두 아래 github 리포지토리에서 확인.
다음 과정들을 진행.
- Python에서 ADX 연결을 위해 Kusto 쿼리 준비
- Python "azure-kusto-data" 패키지 설치
- ADX 인증 방식들을 통한 인증
- Python으로 Kusto 쿼리 수행
- Kusto 쿼리 결과를 Pandas Dataframe으로 변환
- Dataframe을 CSV로 저장
- 성능 및 속도 리뷰
전체적인 과정은 아래 공식 문서에서 확인
Query data using the Azure Data Explorer Python library | Microsoft Docs
Python에서 ADX 연결을 위해 Kusto 쿼리 준비
우선 ADX를 배포했고, 기존 malware 900만건 데이터 중에, split한 1만건과 100만건을 사용해 테스트.
예제로 사용 가능하도록 1만건 데이터는 github 리포에도 올려 두었다.
전체 CSV 파일 - kaggle - Microsoft Malware Prediction | Kaggle
ADX의 구성과 전반적인 흐름은 아래 문서를 참조.
참고링크: 개발자 커뮤니티 SQLER.com - Azure Data Explorer에 대량 CSV 파일 ingest
ADX 테이블 생성 쿼리
이미 위의 문서에서 대량 CSV ingest할때 했던 과정이다. 테이블 이름을 "malware_table_10000"으로 생성하고, 이어서 mapping도 생성한다.
.create table malware_table_10000 (MachineIdentifier:string, ProductName:string, EngineVersion:string, AppVersion:string, AvSigVersion:string, IsBeta:int, RtpStateBitfield:real, IsSxsPassiveMode:int, DefaultBrowsersIdentifier:real, AVProductStatesIdentifier:real, AVProductsInstalled:real, AVProductsEnabled:real, HasTpm:int, CountryIdentifier:int, CityIdentifier:real, OrganizationIdentifier:real, GeoNameIdentifier:real, LocaleEnglishNameIdentifier:int, Platform:string, Processor:string, OsVer:string, OsBuild:int, OsSuite:int, OsPlatformSubRelease:string, OsBuildLab:string, SkuEdition:string, IsProtected:real, AutoSampleOptIn:int, PuaMode:string, SMode:real, IeVerIdentifier:real, SmartScreen:string, Firewall:real, UacLuaenable:real, Census_MDC2FormFactor:string, Census_DeviceFamily:string, Census_OEMNameIdentifier:real, Census_OEMModelIdentifier:real, Census_ProcessorCoreCount:real, Census_ProcessorManufacturerIdentifier:real, Census_ProcessorModelIdentifier:real, Census_ProcessorClass:string, Census_PrimaryDiskTotalCapacity:real, Census_PrimaryDiskTypeName:string, Census_SystemVolumeTotalCapacity:real, Census_HasOpticalDiskDrive:int, Census_TotalPhysicalRAM:real, Census_ChassisTypeName:string, Census_InternalPrimaryDiagonalDisplaySizeInInches:real, Census_InternalPrimaryDisplayResolutionHorizontal:real, Census_InternalPrimaryDisplayResolutionVertical:real, Census_PowerPlatformRoleName:string, Census_InternalBatteryType:string, Census_InternalBatteryNumberOfCharges:real, Census_OSVersion:string, Census_OSArchitecture:string, Census_OSBranch:string, Census_OSBuildNumber:int, Census_OSBuildRevision:int, Census_OSEdition:string, Census_OSSkuName:string, Census_OSInstallTypeName:string, Census_OSInstallLanguageIdentifier:real, Census_OSUILocaleIdentifier:int, Census_OSWUAutoUpdateOptionsName:string, Census_IsPortableOperatingSystem:int, Census_GenuineStateName:string, Census_ActivationChannel:string, Census_IsFlightingInternal:real, Census_IsFlightsDisabled:real, Census_FlightRing:string, Census_ThresholdOptIn:real, Census_FirmwareManufacturerIdentifier:real, Census_FirmwareVersionIdentifier:real, Census_IsSecureBootEnabled:int, Census_IsWIMBootEnabled:real, Census_IsVirtualDevice:real, Census_IsTouchEnabled:int, Census_IsPenCapable:int, Census_IsAlwaysOnAlwaysConnectedCapable:real, Wdft_IsGamer:real, Wdft_RegionIdentifier:real, HasDetections:int)
mapping 생성
.create table malware_table_10000 ingestion csv mapping "malware_table_mapping" '[' '{"column":"MachineIdentifier","DataType":"string","Properties":{"Ordinal":"0"}},' '{"column":"ProductName","DataType":"string","Properties":{"Ordinal":"1"}},' '{"column":"EngineVersion","DataType":"string","Properties":{"Ordinal":"2"}},' '{"column":"AppVersion","DataType":"string","Properties":{"Ordinal":"3"}},' '{"column":"AvSigVersion","DataType":"string","Properties":{"Ordinal":"4"}},' '{"column":"IsBeta","DataType":"int","Properties":{"Ordinal":"5"}},' '{"column":"RtpStateBitfield","DataType":"real","Properties":{"Ordinal":"6"}},' '{"column":"IsSxsPassiveMode","DataType":"int","Properties":{"Ordinal":"7"}},' '{"column":"DefaultBrowsersIdentifier","DataType":"real","Properties":{"Ordinal":"8"}},' '{"column":"AVProductStatesIdentifier","DataType":"real","Properties":{"Ordinal":"9"}},' '{"column":"AVProductsInstalled","DataType":"real","Properties":{"Ordinal":"10"}},' '{"column":"AVProductsEnabled","DataType":"real","Properties":{"Ordinal":"11"}},' '{"column":"HasTpm","DataType":"int","Properties":{"Ordinal":"12"}},' '{"column":"CountryIdentifier","DataType":"int","Properties":{"Ordinal":"13"}},' '{"column":"CityIdentifier","DataType":"real","Properties":{"Ordinal":"14"}},' '{"column":"OrganizationIdentifier","DataType":"real","Properties":{"Ordinal":"15"}},' '{"column":"GeoNameIdentifier","DataType":"real","Properties":{"Ordinal":"16"}},' '{"column":"LocaleEnglishNameIdentifier","DataType":"int","Properties":{"Ordinal":"17"}},' '{"column":"Platform","DataType":"string","Properties":{"Ordinal":"18"}},' '{"column":"Processor","DataType":"string","Properties":{"Ordinal":"19"}},' '{"column":"OsVer","DataType":"string","Properties":{"Ordinal":"20"}},' '{"column":"OsBuild","DataType":"int","Properties":{"Ordinal":"21"}},' '{"column":"OsSuite","DataType":"int","Properties":{"Ordinal":"22"}},' '{"column":"OsPlatformSubRelease","DataType":"string","Properties":{"Ordinal":"23"}},' '{"column":"OsBuildLab","DataType":"string","Properties":{"Ordinal":"24"}},' '{"column":"SkuEdition","DataType":"string","Properties":{"Ordinal":"25"}},' '{"column":"IsProtected","DataType":"real","Properties":{"Ordinal":"26"}},' '{"column":"AutoSampleOptIn","DataType":"int","Properties":{"Ordinal":"27"}},' '{"column":"PuaMode","DataType":"string","Properties":{"Ordinal":"28"}},' '{"column":"SMode","DataType":"real","Properties":{"Ordinal":"29"}},' '{"column":"IeVerIdentifier","DataType":"real","Properties":{"Ordinal":"30"}},' '{"column":"SmartScreen","DataType":"string","Properties":{"Ordinal":"31"}},' '{"column":"Firewall","DataType":"real","Properties":{"Ordinal":"32"}},' '{"column":"UacLuaenable","DataType":"real","Properties":{"Ordinal":"33"}},' '{"column":"Census_MDC2FormFactor","DataType":"string","Properties":{"Ordinal":"34"}},' '{"column":"Census_DeviceFamily","DataType":"string","Properties":{"Ordinal":"35"}},' '{"column":"Census_OEMNameIdentifier","DataType":"real","Properties":{"Ordinal":"36"}},' '{"column":"Census_OEMModelIdentifier","DataType":"real","Properties":{"Ordinal":"37"}},' '{"column":"Census_ProcessorCoreCount","DataType":"real","Properties":{"Ordinal":"38"}},' '{"column":"Census_ProcessorManufacturerIdentifier","DataType":"real","Properties":{"Ordinal":"39"}},' '{"column":"Census_ProcessorModelIdentifier","DataType":"real","Properties":{"Ordinal":"40"}},' '{"column":"Census_ProcessorClass","DataType":"string","Properties":{"Ordinal":"41"}},' '{"column":"Census_PrimaryDiskTotalCapacity","DataType":"real","Properties":{"Ordinal":"42"}},' '{"column":"Census_PrimaryDiskTypeName","DataType":"string","Properties":{"Ordinal":"43"}},' '{"column":"Census_SystemVolumeTotalCapacity","DataType":"real","Properties":{"Ordinal":"44"}},' '{"column":"Census_HasOpticalDiskDrive","DataType":"int","Properties":{"Ordinal":"45"}},' '{"column":"Census_TotalPhysicalRAM","DataType":"real","Properties":{"Ordinal":"46"}},' '{"column":"Census_ChassisTypeName","DataType":"string","Properties":{"Ordinal":"47"}},' '{"column":"Census_InternalPrimaryDiagonalDisplaySizeInInches","DataType":"real","Properties":{"Ordinal":"48"}},' '{"column":"Census_InternalPrimaryDisplayResolutionHorizontal","DataType":"real","Properties":{"Ordinal":"49"}},' '{"column":"Census_InternalPrimaryDisplayResolutionVertical","DataType":"real","Properties":{"Ordinal":"50"}},' '{"column":"Census_PowerPlatformRoleName","DataType":"string","Properties":{"Ordinal":"51"}},' '{"column":"Census_InternalBatteryType","DataType":"string","Properties":{"Ordinal":"52"}},' '{"column":"Census_InternalBatteryNumberOfCharges","DataType":"real","Properties":{"Ordinal":"53"}},' '{"column":"Census_OSVersion","DataType":"string","Properties":{"Ordinal":"54"}},' '{"column":"Census_OSArchitecture","DataType":"string","Properties":{"Ordinal":"55"}},' '{"column":"Census_OSBranch","DataType":"string","Properties":{"Ordinal":"56"}},' '{"column":"Census_OSBuildNumber","DataType":"int","Properties":{"Ordinal":"57"}},' '{"column":"Census_OSBuildRevision","DataType":"int","Properties":{"Ordinal":"58"}},' '{"column":"Census_OSEdition","DataType":"string","Properties":{"Ordinal":"59"}},' '{"column":"Census_OSSkuName","DataType":"string","Properties":{"Ordinal":"60"}},' '{"column":"Census_OSInstallTypeName","DataType":"string","Properties":{"Ordinal":"61"}},' '{"column":"Census_OSInstallLanguageIdentifier","DataType":"real","Properties":{"Ordinal":"62"}},' '{"column":"Census_OSUILocaleIdentifier","DataType":"int","Properties":{"Ordinal":"63"}},' '{"column":"Census_OSWUAutoUpdateOptionsName","DataType":"string","Properties":{"Ordinal":"64"}},' '{"column":"Census_IsPortableOperatingSystem","DataType":"int","Properties":{"Ordinal":"65"}},' '{"column":"Census_GenuineStateName","DataType":"string","Properties":{"Ordinal":"66"}},' '{"column":"Census_ActivationChannel","DataType":"string","Properties":{"Ordinal":"67"}},' '{"column":"Census_IsFlightingInternal","DataType":"real","Properties":{"Ordinal":"68"}},' '{"column":"Census_IsFlightsDisabled","DataType":"real","Properties":{"Ordinal":"69"}},' '{"column":"Census_FlightRing","DataType":"string","Properties":{"Ordinal":"70"}},' '{"column":"Census_ThresholdOptIn","DataType":"real","Properties":{"Ordinal":"71"}},' '{"column":"Census_FirmwareManufacturerIdentifier","DataType":"real","Properties":{"Ordinal":"72"}},' '{"column":"Census_FirmwareVersionIdentifier","DataType":"real","Properties":{"Ordinal":"73"}},' '{"column":"Census_IsSecureBootEnabled","DataType":"int","Properties":{"Ordinal":"74"}},' '{"column":"Census_IsWIMBootEnabled","DataType":"real","Properties":{"Ordinal":"75"}},' '{"column":"Census_IsVirtualDevice","DataType":"real","Properties":{"Ordinal":"76"}},' '{"column":"Census_IsTouchEnabled","DataType":"int","Properties":{"Ordinal":"77"}},' '{"column":"Census_IsPenCapable","DataType":"int","Properties":{"Ordinal":"78"}},' '{"column":"Census_IsAlwaysOnAlwaysConnectedCapable","DataType":"real","Properties":{"Ordinal":"79"}},' '{"column":"Wdft_IsGamer","DataType":"real","Properties":{"Ordinal":"80"}},' '{"column":"Wdft_RegionIdentifier","DataType":"real","Properties":{"Ordinal":"81"}},' '{"column":"HasDetections","DataType":"int","Properties":{"Ordinal":"82"}}' ']'
다음으로 ingest를 수행한다.
.ingest into table malware_table_10000 'https://<STORAGE-ACCOUNT>.blob.core.windows.net/<CONTAINER>/train_10000.csv?sv=...SAS코드...' with(ignoreFirstRecord=True) malware_table_10000 | take 10 // test query
"ignoreFirstRecord=True"는 첫 라인(헤더)을 skip하는 설정이다.
테스트 쿼리까지 ADX에서 수행하면 Python으로 진행할 모든 준비가 완료된다.
Python "azure-kusto-data" 패키지 설치
아래 구문으로 패키지를 설치한다. 필요한 pandas 등은 적절히 확인 후 설치한다.
pip install azure-kusto-data
ADX 인증 방식들을 통한 인증
Client SDK를 이용해 ADX로 쿼리하기 위해서는 몇 가지 방안이 있다.
인터렉티브 로그인(Interactive login)
클라우드 벤더들이 제공하는 interactive device login이며, 말그대로, device login 링크와 키가 나오고, URL에서 키를 넣으면 인증된다.
# Interactive login KCSB = KustoConnectionStringBuilder.with_aad_device_authentication(KUSTO_CLUSTER) KCSB.authority_id = AAD_TENANT_ID KUSTO_CLIENT = KustoClient(KCSB)
Service Principal을 이용한 로그인
Interactive login이 불가한 자동화된 환경이나 프로그래밍적인 처리에서는 SP를 이용하는 방식도 사용 가능하다.
SP를 해당 리소스그룹에 접근 가능하게 구성하고, 이 SP를 ADX의 permission에 추가한다.
ADX Permission 추가 : 개발자 커뮤니티 SQLER.com - Azure Data Factory를 이용해 ADX에서 SQL로 900만건의 데이터 전송 이 문서의 중간 부분에서 볼 수 있다.
Service Principal 생성 : 개발자 커뮤니티 SQLER.com - Service Princial과 Azure 리소스 접근/사용을 위한 인증 방법 3+1가지
# Service principal login # https://github.com/Azure/azure-kusto-python/tree/master/azure-kusto-data client_id = os.environ['CLIENT_ID'] client_secret = os.environ['CLIENT_SECRET'] authority_id = os.environ['AAD_TENANT_ID'] KCSB = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_CLUSTER, client_id, client_secret, authority_id) KUSTO_CLIENT = KustoClient(KCSB)
이런 형태의 코드로 Service Principal을 이용해 자동화된 접근이 가능하다.
ADX가 제공하는 여러 인증 방식에 대해서는 아래 문서 참조.
Kusto authenticate with AAD for access - Azure Data Explorer | Microsoft Docs
azure-kusto-python/sample.py at master · Azure/azure-kusto-python (github.com)
이렇게 kusto client를 생성하면 ADX의 DB에 쿼리를 수행할 수 있다.
Python으로 Kusto 쿼리 수행
KUSTO_QUERY = "malware_table_10000" RESPONSE = KUSTO_CLIENT.execute(KUSTO_DATABASE, KUSTO_QUERY)
위와 같이 쿼리를 수행해 결과를 response object로 담을 수 있다.
오류 발생
KustoServiceError: Query execution has exceeded the allowed limits (80DA0003):
Query result set has exceeded the internal data size limit 67108864 (E_QUERY_RESULT_SET_TOO_LARGE;
see https://aka.ms/kustoquerylimits).
친절하게 링크도 준다. 익숙한 오류로, 1만건을 조회하면 문제 없지만, 데이터를 100만건 정도 조회하면 만나게 된다. 기존 ADF에서 ETL을 수행할 때에도 나왔던 오류다. Query limits - Azure Data Explorer | Microsoft Docs 문서에서 관련 내용을 확인 가능하며, 코드를 수행할 때, 이렇게 "set notruncation"을 추가해 수행하면 해결된다.
KUSTO_QUERY = "set notruncation; malware_table_1000000"
Kusto 쿼리 결과를 Pandas Dataframe으로 변환
response object를 dataframe으로 변환하는 옵션을 기본적으로 제공한다.
# convert to data frame or native test df = dataframe_from_result_table(RESPONSE.primary_results[0]) df.head()
이렇게 kusto 쿼리 결과를 dataframe으로 변환 가능하다.
dataframe으로 변환했으니, pandas의 function들을 모두 사용 가능하며, 당연히 CSV로 출력도 가능하다.
Dataframe을 CSV로 저장
# Save dataframe to CSV df.to_csv(r'data/adx-df.csv', index = False, header=True)
이렇게 dataframe을 CSV로 저장할 수 있다.
성능 및 속도 리뷰
100만건의 데이터를 가지고 테스트를 진행했다. 개인 노트북에서 WSL을 이용해 테스트했기 때문에 약간의 추가 시간이 소요된 것으로 예상되며, 데이터센터 안에서 처리한다면 network 속도 이슈는 없을 것이다.
ADX의 100만건 데이터 쿼리 후 클라이언트로 가져와 메모리로 로드하는 시간
RESPONSE = KUSTO_CLIENT.execute(KUSTO_DATABASE, KUSTO_QUERY)
# 약 2분 50초 소요
100만건 kusto query result를 dataframe으로 변환하는 시간
df = dataframe_from_result_table(RESPONSE.primary_results[0])
# 약 26초 소요
df.memory_usage(index=True).sum()
# 663,999,464
dataframe의 메모리 사용량은 약 663M 정도.
100만건의 Dataframe을 CSV로 저장하는 시간
df.to_csv(r'data/adx-df-1000000.csv', index = False, header=True)
# 약 68초 정도 소요
개인 노트북에서 진행한 작업이차 900만건 전체는 하지 못했으나, 100만건을 진행했으니 대략 *9 정도를 하면 총 소요시간 계산이 가능하리라 생각됨.
이렇게 ADX의 DB에 Python slient SDK를 이용해 쿼리하고 CSV로 저장하는 과정까지 진행해 보았다.
참고링크
개발자 커뮤니티 SQLER.com - Azure Data Factory를 이용해 ADX에서 SQL로 900만건의 데이터 전송
개발자 커뮤니티 SQLER.com - Azure Data Explorer에서 SQL서버 데이터베이스 테이블 조회/삽입 - sql_request plugin
개발자 커뮤니티 SQLER.com - Azure Data Explorer에 대량 CSV 파일 ingest
개발자 커뮤니티 SQLER.com - Azure Event Hubs의 데이터를 Azure Data Explorer로 전송
개발자 커뮤니티 SQLER.com - Service Princial과 Azure 리소스 접근/사용을 위한 인증 방법 3+1가지
Query data using the Azure Data Explorer Python library | Microsoft Docs