이전 포스트에서 kafka protocol을 이용해 message를 Azure Event Hubs로 전달하는 과정을 코드로 진행.
개발자 커뮤니티 SQLER.com - Azure Event Hubs로 kafka message 전송 처리
대량의 메세지를 이렇게 event hub로 받고, 이어서 메세지를 처리/분석해야 한다.
이때 Azure Data Explorer는 좋은 옵션이 될 수 있다.
What is Azure Data Explorer? | Microsoft Docs
Azure Data Explorer의 장점
특히, 대량의 로그 데이터나 telemetry 데이터에 처리에 유용하며, 여러 종류의 스트림 데이터를 collect, store, 분석에 사용할 수 있다. 구글의 big query와 유사하며, kusto query를 제공해 SQL 쿼리와 비슷한 대부분의 기능을 이 kusto query로 처리할 수 있다. 기본 분산 처리를 제공하기 때문에 성능 확장이 쉽고(scale) 개발자의 입장에서 이 kusto query만 하면 되기 떼문에 대규모 데이터 분석에 특히 유용하다.
Azure Event Hub의 데이터를 Azure Data Explorer로 Ingest
진행 방향은 일반 데이터베이스 처리 방식과 같다.
- ADX(Azure Data Explorer, 이하 ADX) 클러스터 생성
- 데이터베이스 생성
- 테이블 생성
- JSON(또는 CSV) 등의 mapping 생성
- Ingest 설정(Azure Portal UI 또는 C#이나 Python에서 API 활용)
의 절차로 처리할 수 있다.
ADX 클러스터와 데이터베이스 생성
ADX를 생성하는 절차는 Azure 문서에서 제공한다. 아래 문서를 참조해 ADX클러스터와 데이터베이스를 생성한다.
Quickstart: Create an Azure Data Explorer cluster & DB | Microsoft Docs
테이블 생성
ADX에서 테이블은 여러 방식으로 생성이 가능하다. 이 문서에서는 Kusto query를 이용해 생성한다.
Kusto query는 Portal에서 ADX DB를 선택후 쿼리를 수행하거나,
Kusto.Explorer를 이용해 쿼리 가능하다.
Kusto.Explorer installation and user interface | Microsoft Docs
개인적으로는 SSMS와 유사한 kusto.explorer를 설치해 사용한다.
이전 문서에서 kafka client로 다음 구조의 json 메세지를 event hub로 전송하고 있었다.
{ "product_num": 8, "product_price": 1011, "product_description": "dummystring", "product_production_dt": "2021-09-15 03:01:01.100000" }
JSON 데이터를 ADX로 ingest하기 위해 다음 쿼리로 table과 mapping을 생성한다.
// .drop table dw_evthub_ingest ifexists .create table dw_evthub_ingest (product_num: int, product_price: int, product_description: string, product_production_dt:datetime) // .show table dw_evthub_ingest
table을 생성 참조 : .create table - Azure Data Explorer | Microsoft Docs
NoSQL과 SQL의 중간적인 구조이며, Ingestion에서 동적으로 column을 생성하기도 한다.
JSON(또는 CSV) 등의 mapping 생성
테이블을 생성했으면, event hub의 메세지를 table로 ingest하기 위한 "mapping" 구조를 생성해야 한다.
mapping 생성 참조 : .create ingestion mapping - Azure Data Explorer | Microsoft Docs
Ingest maiing은 CSV, JSON, acro, parquet, orc 등을 제공하며, 나의 경우 JSON 포맷이기 때문에 JSON으로 진행한다.
// .drop table dw_evthub_ingest ingestion json mapping "evthub_ingest_mapping" .create table dw_evthub_ingest ingestion json mapping 'evthub_ingest_mapping' '[{"column":"product_num", "Properties": {"Path": "$.product_num", "datatype":"int"}}, {"column":"product_price", "Properties": {"Path":"$.product_price", "datatype":"int"}} ,{"column":"product_description", "Properties": {"Path":"$.product_description", "datatype":"string"}}, {"column":"product_production_dt", "Properties": {"Path":"$.product_production_dt", "datatype":"datetime"}}]' // .show table dw_evthub_ingest ingestion mappings
경험적으로, mapping에서 "datatype"을 선언했으나, show 해보면 지워져있다. 아마도 table의 구조를 기본으로 따라가거나 transform 하는 경우에만 사용되는 듯 하다. 이렇게 mapping을 완료하면, 마지막으로 ingest를 구성한다.
Ingest 설정(Azure Portal UI 또는 C#이나 Python에서 API 활용)
ADX는 이렇게 여러 SDK과 다양한 ingest 방법들을 제공한다. 특히, storage와 같은 곳의 데이터를 batch로 가져오거나, 스트리밍 데이터 소스에서 가져오는 양쪽 방안을 모두 제공한다.
이 문서에서는 event hub에 저장된 메세지를 streaming으로 가져오는 처리를 진행한다.
새로운 데이터 ingest
ADX 포털에 접속하면 이렇게 ingest 메뉴를 볼 수 있고, 바로 선택해 진행한다.
Ingest할 데이터 소스 확인
Ingest할 데이터 schema 확인
위에서 생성한 mapping이 있기 때문에 이 mapping을 그대로 이용한다.
만약 event hub와 같은 데이터 소스에 schema와 맞지 않는 데이터가 있을 경우 오류가 난다.
최종 리뷰 및 ingest 설정 완료
설정이 완료되면 이렇게 완료 화면이 나온다.
완료되면 kusto 쿼리를 이용해 데이터를 확인 가능하다.(event hub와 같은 스트리밍 소스는 최초 ingest에 약간의 시간이 소요될 수 있다.)
이렇게, 데이터가 올라온 것을 확인 가능하며, refresh하면 데이터가 주기적으로 ingest되는 것을 확인 가능하다.
참조링크 : Azure Data Explorer data ingestion overview | Microsoft Docs
SDK를 이용한 event hub 데이터 ingest 처리
Portal에서 UI로 진행한 ingest 과정을 SDK나 one-click등을 이용해 처리 가능하다.
예를 들어, Python으로 처리할 경우 다음 방식으로 진행 가능하다.
from azure.mgmt.kusto import KustoManagementClient from azure.mgmt.kusto.models import EventHubDataConnection from azure.common.credentials import ServicePrincipalCredentials #Directory (tenant) ID tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" #Application ID client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" #Client Secret client_secret = "xxxxxxxxxxxxxx" subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" credentials = ServicePrincipalCredentials( client_id=client_id, secret=client_secret, tenant=tenant_id ) kusto_management_client = KustoManagementClient(credentials, subscription_id) resource_group_name = "testrg" #The cluster and database that are created as part of the Prerequisites cluster_name = "mykustocluster" database_name = "mykustodatabase" data_connection_name = "myeventhubconnect" #The event hub that is created as part of the Prerequisites event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx"; consumer_group = "$Default" location = "Central US" #The table and column mapping that are created as part of the Prerequisites table_name = "StormEvents" mapping_rule_name = "StormEvents_CSV_Mapping" data_format = "csv" #Returns an instance of LROPoller, check https://docs.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name, parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location, table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
물론, SDK들을 이용해 kusto query를 실행 가능하며, 위에서 생성한 table 생성 구문이나 mapping구문을 실행하고 이 ingest 구문을 수행하면 된다.
Create an Event Hub data connection for Azure Data Explorer by using Python | Microsoft Docs
Ingest data using the Azure Data Explorer Python library | Microsoft Docs
전에 진행한 프로젝트에서 ADX로 테라데이터 단위의 데이터를 처리한 경험이 있다, 빠른 성능과 KQL을 이용해 Python과 연동도 좋다 대규모 로그 데이터에서 머신러닝에 활용 가능한 데이터를 추출하는데 이용한 경험이 있다.
이후 기회가 되면 조금 더 ADX에 대한 내용을 풀어볼 수 있길.
참고링크:
What is Azure Data Explorer? | Microsoft Docs
Quickstart: Create an Azure Data Explorer cluster & DB | Microsoft Docs
Ingest data using the Azure Data Explorer Python library | Microsoft Docs
Create an Event Hub data connection for Azure Data Explorer by using Python | Microsoft Docs