이 문서에서는 ADX(Azure Data Explorer, 이하 ADX)에서 SQL의 trigger와 유사한 기능을 제공하는 update policy에 대한 리뷰 진행.

 

이 포스트에서 사용한 Kusto query와 테스트 데이터셋이 필요할 경우 아래 repository에서 사용 가능하다.

CloudBreadPaPa/adx-update-policy: Azure Data Explorer update policy code - CSV file and event hub streaming update policy (github.com)

 

Azure Data Explorer에서 Trigger 기능 구현 - update policy

ADX에 ingest를 하면서, 자동으로 ingest되는 데이터에 대해 trigger처럼 데이터를 변환에 다른 테이블에 추가할 필요성이 있다. 예를 들어, ingest하는 데이터 컬럼 안에 CSV나 JSON 데이터가 포함되면, 이를 풀어서 다른 테이블에서는 각각 컬럼에 넣는 처리도 가능하다. ADX에서 이러한 기능을 update policy가 제공한다.

 

ADX update policy란 무엇인가?

update-policy.png

ADX에서 새로운 데이터가 추가되면 자동으로 타겟 테이블에 데이터를 추가하는 기능.

The update policy instructs Azure Data Explorer to automatically append data to a target table whenever new data is inserted into the source table, based on a transformation query that runs on the data inserted into the source table.

SQL의 trigger와 유사하고, 새롭게 ingest된 데이터에 대해 작업해, 원하는 타겟 테이블에 변환된 정보를 추가 가능하다.

이 update policy는 batch(CSV나 JSON 파일) ingest와 Event hub를 주로 이용하는 streaming ingest 모두 동작한다. 

 

ADX에 update policy를 적용 - CSV파일을 ingest

예전에 진행한 포스트의 진행 패턴과 같다. CSV 파일을 Storage에 올려 두고, SAS 토큰을 생성한다음, ingest를 수행한다.

개발자 커뮤니티 SQLER.com - Azure Data Explorer에 대량 CSV 파일 ingest

 

이때, ingest를 수행하면, update policy에 의해, 다른 테이블로 추가된 데이터가 변환되어 저장되도록 처리하는 것이 목표.

 

ADX에 테이블 생성

// 테이블 생성
.create table malware_table (MachineIdentifier:string, ProductName:string, EngineVersion:string, AppVersion:string, AvSigVersion:string, IsBeta:int, RtpStateBitfield:real, IsSxsPassiveMode:int, DefaultBrowsersIdentifier:real, AVProductStatesIdentifier:real, AVProductsInstalled:real, AVProductsEnabled:real, HasTpm:int, CountryIdentifier:int, CityIdentifier:real, OrganizationIdentifier:real, GeoNameIdentifier:real, LocaleEnglishNameIdentifier:int, Platform:string, Processor:string, OsVer:string, OsBuild:int, OsSuite:int, OsPlatformSubRelease:string, OsBuildLab:string, SkuEdition:string, IsProtected:real, AutoSampleOptIn:int, PuaMode:string, SMode:real, IeVerIdentifier:real, SmartScreen:string, Firewall:real, UacLuaenable:real, Census_MDC2FormFactor:string, Census_DeviceFamily:string, Census_OEMNameIdentifier:real, Census_OEMModelIdentifier:real, Census_ProcessorCoreCount:real, Census_ProcessorManufacturerIdentifier:real, Census_ProcessorModelIdentifier:real, Census_ProcessorClass:string, Census_PrimaryDiskTotalCapacity:real, Census_PrimaryDiskTypeName:string, Census_SystemVolumeTotalCapacity:real, Census_HasOpticalDiskDrive:int, Census_TotalPhysicalRAM:real, Census_ChassisTypeName:string, Census_InternalPrimaryDiagonalDisplaySizeInInches:real, Census_InternalPrimaryDisplayResolutionHorizontal:real, Census_InternalPrimaryDisplayResolutionVertical:real, Census_PowerPlatformRoleName:string, Census_InternalBatteryType:string, Census_InternalBatteryNumberOfCharges:real, Census_OSVersion:string, Census_OSArchitecture:string, Census_OSBranch:string, Census_OSBuildNumber:int, Census_OSBuildRevision:int, Census_OSEdition:string, Census_OSSkuName:string, Census_OSInstallTypeName:string, Census_OSInstallLanguageIdentifier:real, Census_OSUILocaleIdentifier:int, Census_OSWUAutoUpdateOptionsName:string, Census_IsPortableOperatingSystem:int, Census_GenuineStateName:string, Census_ActivationChannel:string, Census_IsFlightingInternal:real, Census_IsFlightsDisabled:real, Census_FlightRing:string, Census_ThresholdOptIn:real, Census_FirmwareManufacturerIdentifier:real, Census_FirmwareVersionIdentifier:real, Census_IsSecureBootEnabled:int, Census_IsWIMBootEnabled:real, Census_IsVirtualDevice:real, Census_IsTouchEnabled:int, Census_IsPenCapable:int, Census_IsAlwaysOnAlwaysConnectedCapable:real, Wdft_IsGamer:real, Wdft_RegionIdentifier:real, HasDetections:int)

 

테이블 mapping 생성

// mapping 생성
.create table malware_table ingestion csv mapping "malware_table_mapping"
'['
'{"column":"MachineIdentifier","DataType":"string","Properties":{"Ordinal":"0"}},'
'{"column":"ProductName","DataType":"string","Properties":{"Ordinal":"1"}},'
'{"column":"EngineVersion","DataType":"string","Properties":{"Ordinal":"2"}},'
'{"column":"AppVersion","DataType":"string","Properties":{"Ordinal":"3"}},'
'{"column":"AvSigVersion","DataType":"string","Properties":{"Ordinal":"4"}},'
'{"column":"IsBeta","DataType":"int","Properties":{"Ordinal":"5"}},'
'{"column":"RtpStateBitfield","DataType":"real","Properties":{"Ordinal":"6"}},'
'{"column":"IsSxsPassiveMode","DataType":"int","Properties":{"Ordinal":"7"}},'
'{"column":"DefaultBrowsersIdentifier","DataType":"real","Properties":{"Ordinal":"8"}},'
'{"column":"AVProductStatesIdentifier","DataType":"real","Properties":{"Ordinal":"9"}},'
'{"column":"AVProductsInstalled","DataType":"real","Properties":{"Ordinal":"10"}},'
'{"column":"AVProductsEnabled","DataType":"real","Properties":{"Ordinal":"11"}},'
'{"column":"HasTpm","DataType":"int","Properties":{"Ordinal":"12"}},'
'{"column":"CountryIdentifier","DataType":"int","Properties":{"Ordinal":"13"}},'
'{"column":"CityIdentifier","DataType":"real","Properties":{"Ordinal":"14"}},'
'{"column":"OrganizationIdentifier","DataType":"real","Properties":{"Ordinal":"15"}},'
'{"column":"GeoNameIdentifier","DataType":"real","Properties":{"Ordinal":"16"}},'
'{"column":"LocaleEnglishNameIdentifier","DataType":"int","Properties":{"Ordinal":"17"}},'
'{"column":"Platform","DataType":"string","Properties":{"Ordinal":"18"}},'
'{"column":"Processor","DataType":"string","Properties":{"Ordinal":"19"}},'
'{"column":"OsVer","DataType":"string","Properties":{"Ordinal":"20"}},'
'{"column":"OsBuild","DataType":"int","Properties":{"Ordinal":"21"}},'
'{"column":"OsSuite","DataType":"int","Properties":{"Ordinal":"22"}},'
'{"column":"OsPlatformSubRelease","DataType":"string","Properties":{"Ordinal":"23"}},'
'{"column":"OsBuildLab","DataType":"string","Properties":{"Ordinal":"24"}},'
'{"column":"SkuEdition","DataType":"string","Properties":{"Ordinal":"25"}},'
'{"column":"IsProtected","DataType":"real","Properties":{"Ordinal":"26"}},'
'{"column":"AutoSampleOptIn","DataType":"int","Properties":{"Ordinal":"27"}},'
'{"column":"PuaMode","DataType":"string","Properties":{"Ordinal":"28"}},'
'{"column":"SMode","DataType":"real","Properties":{"Ordinal":"29"}},'
'{"column":"IeVerIdentifier","DataType":"real","Properties":{"Ordinal":"30"}},'
'{"column":"SmartScreen","DataType":"string","Properties":{"Ordinal":"31"}},'
'{"column":"Firewall","DataType":"real","Properties":{"Ordinal":"32"}},'
'{"column":"UacLuaenable","DataType":"real","Properties":{"Ordinal":"33"}},'
'{"column":"Census_MDC2FormFactor","DataType":"string","Properties":{"Ordinal":"34"}},'
'{"column":"Census_DeviceFamily","DataType":"string","Properties":{"Ordinal":"35"}},'
'{"column":"Census_OEMNameIdentifier","DataType":"real","Properties":{"Ordinal":"36"}},'
'{"column":"Census_OEMModelIdentifier","DataType":"real","Properties":{"Ordinal":"37"}},'
'{"column":"Census_ProcessorCoreCount","DataType":"real","Properties":{"Ordinal":"38"}},'
'{"column":"Census_ProcessorManufacturerIdentifier","DataType":"real","Properties":{"Ordinal":"39"}},'
'{"column":"Census_ProcessorModelIdentifier","DataType":"real","Properties":{"Ordinal":"40"}},'
'{"column":"Census_ProcessorClass","DataType":"string","Properties":{"Ordinal":"41"}},'
'{"column":"Census_PrimaryDiskTotalCapacity","DataType":"real","Properties":{"Ordinal":"42"}},'
'{"column":"Census_PrimaryDiskTypeName","DataType":"string","Properties":{"Ordinal":"43"}},'
'{"column":"Census_SystemVolumeTotalCapacity","DataType":"real","Properties":{"Ordinal":"44"}},'
'{"column":"Census_HasOpticalDiskDrive","DataType":"int","Properties":{"Ordinal":"45"}},'
'{"column":"Census_TotalPhysicalRAM","DataType":"real","Properties":{"Ordinal":"46"}},'
'{"column":"Census_ChassisTypeName","DataType":"string","Properties":{"Ordinal":"47"}},'
'{"column":"Census_InternalPrimaryDiagonalDisplaySizeInInches","DataType":"real","Properties":{"Ordinal":"48"}},'
'{"column":"Census_InternalPrimaryDisplayResolutionHorizontal","DataType":"real","Properties":{"Ordinal":"49"}},'
'{"column":"Census_InternalPrimaryDisplayResolutionVertical","DataType":"real","Properties":{"Ordinal":"50"}},'
'{"column":"Census_PowerPlatformRoleName","DataType":"string","Properties":{"Ordinal":"51"}},'
'{"column":"Census_InternalBatteryType","DataType":"string","Properties":{"Ordinal":"52"}},'
'{"column":"Census_InternalBatteryNumberOfCharges","DataType":"real","Properties":{"Ordinal":"53"}},'
'{"column":"Census_OSVersion","DataType":"string","Properties":{"Ordinal":"54"}},'
'{"column":"Census_OSArchitecture","DataType":"string","Properties":{"Ordinal":"55"}},'
'{"column":"Census_OSBranch","DataType":"string","Properties":{"Ordinal":"56"}},'
'{"column":"Census_OSBuildNumber","DataType":"int","Properties":{"Ordinal":"57"}},'
'{"column":"Census_OSBuildRevision","DataType":"int","Properties":{"Ordinal":"58"}},'
'{"column":"Census_OSEdition","DataType":"string","Properties":{"Ordinal":"59"}},'
'{"column":"Census_OSSkuName","DataType":"string","Properties":{"Ordinal":"60"}},'
'{"column":"Census_OSInstallTypeName","DataType":"string","Properties":{"Ordinal":"61"}},'
'{"column":"Census_OSInstallLanguageIdentifier","DataType":"real","Properties":{"Ordinal":"62"}},'
'{"column":"Census_OSUILocaleIdentifier","DataType":"int","Properties":{"Ordinal":"63"}},'
'{"column":"Census_OSWUAutoUpdateOptionsName","DataType":"string","Properties":{"Ordinal":"64"}},'
'{"column":"Census_IsPortableOperatingSystem","DataType":"int","Properties":{"Ordinal":"65"}},'
'{"column":"Census_GenuineStateName","DataType":"string","Properties":{"Ordinal":"66"}},'
'{"column":"Census_ActivationChannel","DataType":"string","Properties":{"Ordinal":"67"}},'
'{"column":"Census_IsFlightingInternal","DataType":"real","Properties":{"Ordinal":"68"}},'
'{"column":"Census_IsFlightsDisabled","DataType":"real","Properties":{"Ordinal":"69"}},'
'{"column":"Census_FlightRing","DataType":"string","Properties":{"Ordinal":"70"}},'
'{"column":"Census_ThresholdOptIn","DataType":"real","Properties":{"Ordinal":"71"}},'
'{"column":"Census_FirmwareManufacturerIdentifier","DataType":"real","Properties":{"Ordinal":"72"}},'
'{"column":"Census_FirmwareVersionIdentifier","DataType":"real","Properties":{"Ordinal":"73"}},'
'{"column":"Census_IsSecureBootEnabled","DataType":"int","Properties":{"Ordinal":"74"}},'
'{"column":"Census_IsWIMBootEnabled","DataType":"real","Properties":{"Ordinal":"75"}},'
'{"column":"Census_IsVirtualDevice","DataType":"real","Properties":{"Ordinal":"76"}},'
'{"column":"Census_IsTouchEnabled","DataType":"int","Properties":{"Ordinal":"77"}},'
'{"column":"Census_IsPenCapable","DataType":"int","Properties":{"Ordinal":"78"}},'
'{"column":"Census_IsAlwaysOnAlwaysConnectedCapable","DataType":"real","Properties":{"Ordinal":"79"}},'
'{"column":"Wdft_IsGamer","DataType":"real","Properties":{"Ordinal":"80"}},'
'{"column":"Wdft_RegionIdentifier","DataType":"real","Properties":{"Ordinal":"81"}},'
'{"column":"HasDetections","DataType":"int","Properties":{"Ordinal":"82"}}'
']'

 

테이블에 ingest를 수행

.ingest into table malware_table 'https://<YOUR-STORAGE-ACCOUNT-NAME>.blob.core.windows.net/bulk-insert/train_10000.csv?sv=2020-04-08&st=2021-10-22T04%3A46%3A56Z&se=2021-10-23T04%3A46%3A56Z&sr=b&sp=r&sig=XXXXXXXXXXXXXXXXXXXXXXXXXXXX' 
with(ignoreFirstRecord=True)

 

수행이 완료되면 ingest가 잘 되는지 테스트 한다.

// 테스트 쿼리
malware_table | take 10 // test query

malware_table 
| summarize Count=count()

 

Update policy 추가

오늘의 목표인 update policy를 추가한다. "malware_table"에 데이터가 ingest되면, ingest된 데이터(추가된 데이터)에 대해 일부 version 컬럼만 가져와 "malware_version_table" 이라는 테이블에 추가하는 패턴이다.

 

// update policy function 생성
// .drop function malware_table_fetch_versions
.create function malware_table_fetch_versions()
{
    malware_table
    | project MachineIdentifier, ProductName, EngineVersion, AppVersion, ctime=now()
}

 

// update policy function 생성
// malware_table에 ingest된 데이터(새로 추가된)가 들어오면 trigger되어 수행될 function을 생성
// .drop function malware_table_fetch_versions
.create function malware_table_fetch_versions()
{
    malware_table
    | project MachineIdentifier, ProductName, EngineVersion, AppVersion, ctime=now()
}

// 일부 컬럼 데이터만 저장할 "malware_version_table" 생성. 빈 테이블을 이용해 테이블 구조만 생성.
// .drop table malware_version_table
.set-or-append malware_version_table <| malware_table_fetch_versions() | limit 0


// malware_version_table에 update policy 추가
.alter table malware_version_table policy update
@'[{"IsEnabled": true, "Source": "malware_table", "Query": "malware_table_fetch_versions()", "IsTransactional": false, "PropagateIngestionProperties": false}]'

 

update policy를 사용하려면, 이렇게 function을 생성하고, target 테이블을 생성 후, 이어서 이 function을 target 테이블에 추가해야 한다.

ADX function에 대한 추가정보는 이 문서를 참조 - .create function - Azure Data Explorer | Microsoft Docs

 

"malware_table"에 추가 ingest를 수행해 "malware_version_table" 테이블 데이터를 조회.

// 테스트 조회
malware_version_table
| summarize cnt=count() 
// 현재 0건

// 추가 ingest를 malware_table로 수행
.ingest into table malware_table 'https://<YOUR-STORAGE-ACCOUNT-NAME>.blob.core.windows.net/bulk-insert/train_10000.csv?sv=2020-04-08&st=2021-10-22T04%3A46%3A56Z&se=2021-10-23T04%3A46%3A56Z&sr=b&sp=r&sig=XXXXXXXXXXXXXXXXXXXXXXXXXXXX' 
with(ignoreFirstRecord=True)

// 테스트 조회
malware_version_table
| summarize cnt=count() 
// 현재 9999건 

// 테스트 조회
malware_version_table | limit 5

function에서 생성한대로, 일부 컬럼의 정보와 추가된 현재 시각 정보 컬럼이 잘 표시된다.

 

kusto-result.png

 

이렇게 update policy를 이용해 batch(CSV 파일) ingest에 대한 처리를 수행했다. 다음은 streaming ingest에 대한 처리.

 

ADX에 update policy를 적용 - Event hub streaming ingest

위에서 batch ingest를 수행하였다. 이번에는 ADX의 주요 기능인 streaming ingest에 대해 마찬가지로, update policy를 수행한다.

 

Event hub - ADX streaming ingest 환경 구성

먼저, 전에 작성한 아래 kafka - event hub - ADX sink를 수행하는 과정을 수행해 지속적으로 ingest되는 상태를 수행한다.

개발자 커뮤니티 SQLER.com - Azure Event Hubs로 kafka message 전송 처리

개발자 커뮤니티 SQLER.com - Azure Event Hubs의 데이터를 Azure Data Explorer로 전송

 

위의 작업을 완료하면, dw_evthub_ingest 테이블로 streaming ingest가 수행되는 환경이 구성되었을 것이다.

streaming ingest 구성이 완료되었으면, 아래 쿼리를 수행해 update policy를 수행한다.

 

streaming ingest에 대해 update policy 수행

// update policy를 위한 function을 추가한다.
// .drop function ehub_table_get_product
.create function ehub_table_get_product()
{
    dw_evthub_ingest
    | project product_num, product_price, ctime=now()
}

// 일부 컬럼 데이터만 저장할 "ehub_table_only_product_price" 테이블을 생성한다.
// .drop table malware_version_table
.set-or-append ehub_table_only_product_price <| ehub_table_get_product() | limit 0


// 테이블에 update policy를 추가한다.
.alter table ehub_table_only_product_price policy update
@'[{"IsEnabled": true, "Source": "dw_evthub_ingest", "Query": "ehub_table_get_product()", "IsTransactional": false, "PropagateIngestionProperties": false}]'


// 완료되면 추가된 정보만 저장하는 ehub_table_only_product_price 테이블을 조회한다.(streaming ingest에 동작에 최대 5~10분 소요될 수 있다.)
ehub_table_only_product_price

// 약 5분 간격, 연속적으로 streaming ingest가 수행되고 수행될 때마다 데이터가 추가되는 것을 확인.
ehub_table_only_product_price
| summarize cnt=count() by ctime

 

이렇게 streaming ingest 역시 update policy를 적용하였다.

 

수행한 과정은 원본 테이블의 일부 컬럼과 now() 로 UTC 정보만 저장하도록 구성했으나, Kusto query를 사용 가능해 쿼리로 원하는 데이터를 조합해 타겟 테이블에 저장 가능하다.

 

ADX update policy 제한사항

- 새로 추가된 데이터에 대해서만 동작한다. 이미 기존 테이블에 존재하는 데이터는 update policy에서 처리 못한다.

- 타겟 테이블은 반드시 같은 데이터베이스 scope에 존재해야 한다. cross-database나 cross-cluster 처리는 불가하다. 트랜젝션 환경 하에 동작할 수 있기 때문이다.

참조링크: ADX update policy query limitation

 

Streaming ingest에서 데이터 누락 없이 update policy를 수행하려면

새로 추가되는 데이터에만 동작하기 때문에, streaming ingest에서 데이터 누락 없이 update policy를 수행하려면

- 현재 proxy에서 event hub로 보내는 message 전송을 중지

- 5~10분 정도 ADX가 event hub의 모든 message를 ingest하도록 대기

- kusto 쿼리로 source 테이블의 정보를 변환해 target 테이블로 전송하는 쿼리를 수행

- update policy를 target 테이블에 추가해 새로 추가되는 데이터는 자동 전송되도록 구성

- 다시 proxy에서 event hub로 보내는 message 시작

 

이런 패턴이면 누락되는 데이터 없이 모든 데이터를 안전하게 update policy로 target 테이블과 동기화 가능할 것이다.

 

참고링크

Kusto update policy - Azure Data Explorer | Microsoft Docs

Kusto update policy management - Azure Data Explorer | Microsoft Docs

CloudBreadPaPa/adx-update-policy: Azure Data Explorer update policy code - CSV file and event hub streaming update policy (github.com)

개발자 커뮤니티 SQLER.com - Azure Data Explorer에 대량 CSV 파일 ingest

개발자 커뮤니티 SQLER.com - Azure Event Hubs로 kafka message 전송 처리

개발자 커뮤니티 SQLER.com - Azure Event Hubs의 데이터를 Azure Data Explorer로 전송

Configure streaming ingestion on your Azure Data Explorer cluster | Microsoft Docs

.create function - Azure Data Explorer | Microsoft Docs

 

 

No. Subject Author Date Views
Notice SQL강좌: 챗GPT와 함께 배우는 SQL Server 무료 강좌 목차와 소개 (2023년 9월 업데이트) 코난(김대우) 2023.08.18 33945
Notice Python 무료 강좌 - 기초, 중급, 머신러닝(2023년 6월 업데이트) 코난(김대우) 2021.01.01 17161
338 Azure Data Explorer - SELECT INTO(CTAS) 또는 INSERT SELECT 쿼리 수행 코난(김대우) 2021.10.26 326
» Azure Data Explorer에서 Trigger 기능 구현 - update policy file 코난(김대우) 2021.10.22 280
336 vscode에서 일관된 팀 단위 개발 환경 구성 - devcontainer file 코난(김대우) 2021.10.19 566
335 Bicep - Azure 클라우드 리소스 배포를 위한 언어 file 코난(김대우) 2021.10.19 139
334 Azure Data Explorer - Event Hub 스트리밍 ingest 중 컬럼추가 file 코난(김대우) 2021.10.18 126
333 SonarQube 리뷰 및 Azure DevOps 연결 file 코난(김대우) 2021.10.01 216
332 PySpark, koalas와 pandas dataframe file 코난(김대우) 2021.09.29 249
331 Apache Spark, pyspark 설치 후 jupyter notebook 실행 file 코난(김대우) 2021.09.29 359
330 Azure Data Explorer의 데이터를 Python Pandas Dataframe과 CSV로 변환 코난(김대우) 2021.09.28 162
329 Azure Blob Storage SAS token 생성 코난(김대우) 2021.09.17 183
328 Azure Data Factory를 이용해 ADX에서 SQL로 900만건의 데이터 전송 file 코난(김대우) 2021.09.16 237
327 Azure Data Explorer에서 SQL서버 데이터베이스 테이블 조회/삽입 - sql_request plugin file 코난(김대우) 2021.09.16 148
326 Azure Data Explorer에 대량 CSV 파일 ingest 코난(김대우) 2021.09.15 148
325 Azure Event Hubs의 데이터를 Azure Data Explorer로 전송 file 코난(김대우) 2021.09.15 179
324 Azure Event Hubs로 kafka message 전송 처리 file 코난(김대우) 2021.09.15 225
323 Service Principal과 Azure 리소스 접근/사용을 위한 인증 방법 3+1가지 file 코난(김대우) 2020.12.26 559
322 Azure storage 관리 도구 - storage explorer 설치와 사용 방법 코난(김대우) 2020.12.25 400
321 Azure cli - command line interface 명령줄 인터페이스 도구를 쓰는 이유와 방법 코난(김대우) 2020.12.25 335
320 클라우드 오픈소스 개발환경 - WSL [1] file 코난(김대우) 2020.12.20 1225
319 Cloud RoadShow 세션 발표 자료 코난(김대우) 2016.05.04 11416





XE Login