이전 포스트에서 kafka protocol을 이용해 message를 Azure Event Hubs로 전달하는 과정을 코드로 진행.

개발자 커뮤니티 SQLER.com - Azure Event Hubs로 kafka message 전송 처리

 

대량의 메세지를 이렇게 event hub로 받고, 이어서 메세지를 처리/분석해야 한다.

 

이때 Azure Data Explorer는 좋은 옵션이 될 수 있다.

What is Azure Data Explorer? | Microsoft Docs

 

Azure Data Explorer의 장점

특히, 대량의 로그 데이터나 telemetry 데이터에 처리에 유용하며, 여러 종류의 스트림 데이터를 collect, store, 분석에 사용할 수 있다. 구글의 big query와 유사하며, kusto query를 제공해 SQL 쿼리와 비슷한 대부분의 기능을 이 kusto query로 처리할 수 있다. 기본 분산 처리를 제공하기 때문에 성능 확장이 쉽고(scale) 개발자의 입장에서 이 kusto query만 하면 되기 떼문에 대규모 데이터 분석에 특히 유용하다.

 

Azure Event Hub의 데이터를 Azure Data Explorer로 Ingest

진행 방향은 일반 데이터베이스 처리 방식과 같다.

- ADX(Azure Data Explorer, 이하 ADX) 클러스터 생성

- 데이터베이스 생성

- 테이블 생성

- JSON(또는 CSV) 등의 mapping 생성

- Ingest 설정(Azure Portal UI 또는 C#이나 Python에서 API 활용)

의 절차로 처리할 수 있다.

 

ADX 클러스터와 데이터베이스 생성

ADX를 생성하는 절차는 Azure 문서에서 제공한다. 아래 문서를 참조해 ADX클러스터와 데이터베이스를 생성한다.

Quickstart: Create an Azure Data Explorer cluster & DB | Microsoft Docs

 

테이블 생성

ADX에서 테이블은 여러 방식으로 생성이 가능하다. 이 문서에서는 Kusto query를 이용해 생성한다.

Kusto query는 Portal에서 ADX DB를 선택후 쿼리를 수행하거나,

Walkthrough of the Kusto Web Explorer experience.

 

Kusto.Explorer를 이용해 쿼리 가능하다.

Kusto Explorer basic query.

Kusto.Explorer installation and user interface | Microsoft Docs

개인적으로는 SSMS와 유사한 kusto.explorer를 설치해 사용한다.

 

이전 문서에서 kafka client로 다음 구조의 json 메세지를 event hub로 전송하고 있었다.

{
    "product_num": 8,
    "product_price": 1011,
    "product_description": "dummystring",
    "product_production_dt": "2021-09-15 03:01:01.100000"
}

JSON 데이터를 ADX로 ingest하기 위해 다음 쿼리로 table과 mapping을 생성한다.

 

// .drop table dw_evthub_ingest ifexists
.create table dw_evthub_ingest (product_num: int, product_price: int, product_description: string, product_production_dt:datetime)

// .show table dw_evthub_ingest

table을 생성 참조 : .create table - Azure Data Explorer | Microsoft Docs

NoSQL과 SQL의 중간적인 구조이며, Ingestion에서 동적으로 column을 생성하기도 한다.

 

JSON(또는 CSV) 등의 mapping 생성

테이블을 생성했으면, event hub의 메세지를 table로 ingest하기 위한 "mapping" 구조를 생성해야 한다.

mapping 생성 참조 : .create ingestion mapping - Azure Data Explorer | Microsoft Docs

 

Ingest maiing은 CSV, JSON, acro, parquet, orc 등을 제공하며, 나의 경우 JSON 포맷이기 때문에 JSON으로 진행한다.

// .drop table dw_evthub_ingest ingestion json mapping "evthub_ingest_mapping" 
.create table dw_evthub_ingest ingestion json mapping 'evthub_ingest_mapping' '[{"column":"product_num", "Properties": {"Path": "$.product_num", "datatype":"int"}}, {"column":"product_price", "Properties": {"Path":"$.product_price", "datatype":"int"}} ,{"column":"product_description", "Properties": {"Path":"$.product_description", "datatype":"string"}}, {"column":"product_production_dt", "Properties": {"Path":"$.product_production_dt", "datatype":"datetime"}}]'

// .show table dw_evthub_ingest ingestion mappings

 

경험적으로, mapping에서 "datatype"을 선언했으나, show 해보면 지워져있다. 아마도 table의 구조를 기본으로 따라가거나 transform 하는 경우에만 사용되는 듯 하다. 이렇게 mapping을 완료하면, 마지막으로 ingest를 구성한다.

 

Ingest 설정(Azure Portal UI 또는 C#이나 Python에서 API 활용)

ADX는 이렇게 여러 SDK과 다양한 ingest 방법들을 제공한다. 특히, storage와 같은 곳의 데이터를 batch로 가져오거나, 스트리밍 데이터 소스에서 가져오는 양쪽 방안을 모두 제공한다.

Overview scheme of data ingestion and management.

이 문서에서는 event hub에 저장된 메세지를 streaming으로 가져오는 처리를 진행한다.

 

새로운 데이터 ingest

ADX 포털에 접속하면 이렇게 ingest 메뉴를 볼 수 있고, 바로 선택해 진행한다.

adx_ingest01.png

 

Ingest할 데이터 소스 확인

adx_ingest02.png

 

Ingest할 데이터 schema 확인

위에서 생성한 mapping이 있기 때문에 이 mapping을 그대로 이용한다.

만약 event hub와 같은 데이터 소스에 schema와 맞지 않는 데이터가 있을 경우 오류가 난다.

adx_ingest03.png

 

최종 리뷰 및 ingest 설정 완료

설정이 완료되면 이렇게 완료 화면이 나온다.

adx_ingest04.png

 

완료되면 kusto 쿼리를 이용해 데이터를 확인 가능하다.(event hub와 같은 스트리밍 소스는 최초 ingest에 약간의 시간이 소요될 수 있다.)

 

adx_ingest05.png

이렇게, 데이터가 올라온 것을 확인 가능하며, refresh하면 데이터가 주기적으로 ingest되는 것을 확인 가능하다.

 

참조링크 : Azure Data Explorer data ingestion overview | Microsoft Docs

 

SDK를 이용한 event hub 데이터 ingest 처리

Portal에서 UI로 진행한 ingest 과정을 SDK나 one-click등을 이용해 처리 가능하다.

예를 들어, Python으로 처리할 경우 다음 방식으로 진행 가능하다.

 

from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials

#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
        client_id=client_id,
        secret=client_secret,
        tenant=tenant_id
    )
kusto_management_client = KustoManagementClient(credentials, subscription_id)

resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://docs.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
                                        parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
                                                                            table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))

물론, SDK들을 이용해 kusto query를 실행 가능하며, 위에서 생성한 table 생성 구문이나 mapping구문을 실행하고 이 ingest 구문을 수행하면 된다.

 

Create an Event Hub data connection for Azure Data Explorer by using Python | Microsoft Docs

Ingest data using the Azure Data Explorer Python library | Microsoft Docs

 

전에 진행한 프로젝트에서 ADX로 테라데이터 단위의 데이터를 처리한 경험이 있다, 빠른 성능과 KQL을 이용해 Python과 연동도 좋다 대규모 로그 데이터에서 머신러닝에 활용 가능한 데이터를 추출하는데 이용한 경험이 있다.

 

이후 기회가 되면 조금 더 ADX에 대한 내용을 풀어볼 수 있길.

 

참고링크:

What is Azure Data Explorer? | Microsoft Docs

Quickstart: Create an Azure Data Explorer cluster & DB | Microsoft Docs

Ingest data using the Azure Data Explorer Python library | Microsoft Docs

Create an Event Hub data connection for Azure Data Explorer by using Python | Microsoft Docs

No. Subject Author Date Views
Notice SQL강좌: 챗GPT와 함께 배우는 SQL Server 무료 강좌 목차와 소개 (2023년 9월 업데이트) 코난(김대우) 2023.08.18 33688
Notice Python 무료 강좌 - 기초, 중급, 머신러닝(2023년 6월 업데이트) 코난(김대우) 2021.01.01 16961
338 Azure Data Explorer - SELECT INTO(CTAS) 또는 INSERT SELECT 쿼리 수행 코난(김대우) 2021.10.26 326
337 Azure Data Explorer에서 Trigger 기능 구현 - update policy file 코난(김대우) 2021.10.22 280
336 vscode에서 일관된 팀 단위 개발 환경 구성 - devcontainer file 코난(김대우) 2021.10.19 561
335 Bicep - Azure 클라우드 리소스 배포를 위한 언어 file 코난(김대우) 2021.10.19 139
334 Azure Data Explorer - Event Hub 스트리밍 ingest 중 컬럼추가 file 코난(김대우) 2021.10.18 125
333 SonarQube 리뷰 및 Azure DevOps 연결 file 코난(김대우) 2021.10.01 216
332 PySpark, koalas와 pandas dataframe file 코난(김대우) 2021.09.29 249
331 Apache Spark, pyspark 설치 후 jupyter notebook 실행 file 코난(김대우) 2021.09.29 357
330 Azure Data Explorer의 데이터를 Python Pandas Dataframe과 CSV로 변환 코난(김대우) 2021.09.28 161
329 Azure Blob Storage SAS token 생성 코난(김대우) 2021.09.17 180
328 Azure Data Factory를 이용해 ADX에서 SQL로 900만건의 데이터 전송 file 코난(김대우) 2021.09.16 235
327 Azure Data Explorer에서 SQL서버 데이터베이스 테이블 조회/삽입 - sql_request plugin file 코난(김대우) 2021.09.16 147
326 Azure Data Explorer에 대량 CSV 파일 ingest 코난(김대우) 2021.09.15 147
» Azure Event Hubs의 데이터를 Azure Data Explorer로 전송 file 코난(김대우) 2021.09.15 177
324 Azure Event Hubs로 kafka message 전송 처리 file 코난(김대우) 2021.09.15 222
323 Service Principal과 Azure 리소스 접근/사용을 위한 인증 방법 3+1가지 file 코난(김대우) 2020.12.26 555
322 Azure storage 관리 도구 - storage explorer 설치와 사용 방법 코난(김대우) 2020.12.25 400
321 Azure cli - command line interface 명령줄 인터페이스 도구를 쓰는 이유와 방법 코난(김대우) 2020.12.25 335
320 클라우드 오픈소스 개발환경 - WSL [1] file 코난(김대우) 2020.12.20 1225
319 Cloud RoadShow 세션 발표 자료 코난(김대우) 2016.05.04 11415





XE Login