이번 포스팅에서는 Azure Event hub를 통해 streaming ingest 중인 Azure Data Explorer 테이블에 컬럼을 추가하는 방안에 대해서 정리.
이 문서는 아래 두개의 포스팅과 이어지는 내용을 포함.
개발자 커뮤니티 SQLER.com - Azure Event Hubs로 kafka message 전송 처리
개발자 커뮤니티 SQLER.com - Azure Event Hubs의 데이터를 Azure Data Explorer로 전송
Azure Data Explorer - Event Hub 스트리밍 ingest 중 컬럼추가
간략히, ADX(Azure Data Explorer, 이하 ADX) 테이블에 컬럼을 추가하고, table mapping에 컬럼 정의를 추가하면 해결된다.
ADX 테이블에 컬럼 추가
지난 포스팅에서 ADX 테이블을 생성하였다. 생성한 테이블에 컬럼을 아래 .alter table 구문으로 수행.
// add column .alter-merge table dw_evthub_ingest (ColumnX:string) // .show table dw_evthub_ingest
이렇게 수행하고 이어서 현재 streaming ingest 중이기 때문에 mapping을 추가
ADX 테이블에 table mapping 추가
// alter mapping .alter table dw_evthub_ingest ingestion json mapping 'evthub_ingest_mapping' '[{"column":"product_num", "Properties": {"Path": "$.product_num", "datatype":"int"}}, {"column":"product_price", "Properties": {"Path":"$.product_price", "datatype":"int"}} ,{"column":"product_description", "Properties": {"Path":"$.product_description", "datatype":"string"}}, {"column":"product_production_dt", "Properties": {"Path":"$.product_production_dt", "datatype":"datetime"}}, {"column":"ColumnX", "Properties": {"Path":"$.ColumnX", "datatype":"string"}}]' // .show table dw_evthub_ingest ingestion mappings
ColumnX 라는 컬럼의 mapping을 이렇게 수정한다.
ADX 테이블에 추가된 컬럼 확인
컬럼을 추가하고 쿼리해 보면, 이렇게 테이블에 컬럼이 추가되었고, 값은 null 로 지정된다.
어플리케이션 코드에서 json 데이터에 ColumnX 데이터 추가
지난 포스팅에서 json 데이터를 이용해 event hub로 sink 했다. 이 json 데이터에 컬럼을 추가한다.
def build_message(): msg = {} msg["product_num"] = round(random.uniform(1, 10)) msg["product_price"] = round(random.uniform(100, 10000)) msg["product_description"] = ''.join(random.SystemRandom().choice(string.ascii_letters+string.digits) for _ in range(10)) msg["product_production_dt"] = str(datetime.utcnow()) msg["ColumnX"] = ''.join(random.SystemRandom().choice(string.ascii_letters+string.digits) for _ in range(5)) return json.dumps(msg)
Azure Event hub로 kafka message를 json으로 전송하는 Github 코드 링크
이렇게 ColumnX를 추가하고, 잠시 기다렸다가 다시 쿼리해 보면, 아래처럼 추가된 column에 데이터가 들어온 것을 확인 가능하다.
============================================================================
2021년 10월 19일 추가
만약, 자동적으로 새로 추가된 ADX 테이블 컬럼으로 event hub의 message가 sink 되지 않는다면, cache 삭제를 해본다.
Clearing cached schema for streaming ingestion - Azure Data Explorer | Microsoft Docs
.clear database cache streamingingestion schema .show table 테이블명 cache streamingingestion schema
참고링크
.alter-merge table - Azure Data Explorer | Microsoft Docs
개발자 커뮤니티 SQLER.com - Azure Event Hubs로 kafka message 전송 처리
개발자 커뮤니티 SQLER.com - Azure Event Hubs의 데이터를 Azure Data Explorer로 전송