이 포스팅에서는, Azure Event hubs를 capture해 storage로 저장하면 생성되는 Avro 데이터를 Python 읽고 재처리 하는 방안에 대해 정리
Azure Event hubs - Apache Avro 데이터 처리
이 문서에서 사용하는 전체 코드는 CloudBreadPaPa/apache-avro: Review Apache Avro dataset - Azure event hub captured data (github.com) github repository에서 확인 가능.
Azure Event hubs와 Avro 데이터
Azure event hub의 데이터를 쉽게 아카이브 하거나 백업하는 방안으로 "Capture"가 제공된다.
Capture streaming events - Azure Event Hubs - Azure Event Hubs | Microsoft Docs
간략히, eventhub의 메세지를 이렇게 주기적으로 storage에 저장 하는 처리이다.
Hot path와 cold path 분석
Event hub를 활용하면 훌륭한 real-time hot path 구성을 처리할 수 있다. 이 hot path를 통해 메세지를 사용하고, 다시 storage로 적재해 cold path로 활용 가능하지만, 이렇게 capture를 이용하면 마찬가지로 cold path를 더 쉽게 구성 가능하다.
이렇게 Event hub에서 Capture된 데이터 포맷이 Apache Avro 포맷이다.
Event hub에서 이렇게 capture 처리 구성이 가능하다.
Apache Avro란 무엇인가?
Apache Avro™ is a data serialization system.
Avro provides:
- Rich data structures.
- A compact, fast, binary data format.
- A container file, to store persistent data.
- Remote procedure call (RPC).
- Simple integration with dynamic languages. Code generation is not required to read or write data files nor to use or implement RPC protocols. Code generation as an optional optimization, only worth implementing for statically typed languages.
Avro를 지원하는 여러 오픈소스 기반 분석 시스템에서 사용 가능하며, 다양한 프로그래밍 언어 라이브러리를 제공한다.
Avro 데이터 확인 방안
여러 방안이 있다. Apache Drill을 이용하거나, Avro Tools cli를 사용할 수도 있다. Elm으로 제작한 avro-viewer 프로젝트도 있고, demo로 체크해 볼 수도 있다. 이 문서에서는 Avro를 Python으로 탐색해 본다.
Python으로 Avro 데이터 탐색
코드는 지난번 수행한 Azure Event Hubs로 kafka message 전송 처리 구조를 다시 수행하였다.
현재 Python 코드가 실행 중이고, JSON 메세지가 지속적으로 event hub로 전송되는 중이며, 위의 Capture streaming events - Azure Event Hubs - Azure Event Hubs | Microsoft Docs 링크를 통해 capture 작업도 완료 했다고 가정하고 코드를 수행한다.
예제 데이터 파일을 data/event_hub_sample.avro 경로에 올려 두었다. 이 파일을 Avro DataFileReader를 통해 open한다.
... # Read avro sample file from data directory sample_file = 'data/event_hub_sample.avro' reader = DataFileReader(open(sample_file, 'rb'), DatumReader()) ...
Avro schema 체크
Avro는 inline으로 여러 메타 정보와 schema를 포함하고 있다. 우선 meta data와 schema 체크 수행.
# Print meta data and schema reader.meta
출력 결과
{'avro.codec': b'null', 'avro.schema': b'{"type":"record","name":"EventData","namespace":"Microsoft.ServiceBus.Messaging","fields":[{"name":"SequenceNumber","type":"long"},{"name":"Offset","type":"string"},{"name":"EnqueuedTimeUtc","type":"string"},{"name":"SystemProperties","type":{"type":"map","values":["long","double","string","bytes"]}},{"name":"Properties","type":{"type":"map","values":["long","double","string","bytes","null"]}},{"name":"Body","type":["null","bytes"]}]}'}
Avro 데이터 조회
개별 데이터를 iteration 하고 출력한다.
lst = [] for reading in reader: lst.append(json.loads(reading["Body"])) json.dumps(lst[0:5])
결과
[{"product_num": 5, "product_price": 5273, "product_description": "1pq5EDoT5u", "product_production_dt": "2021-10-28 03:15:38.314706"}, {"product_num": 8, "product_price": 6028, "product_description": "zl8E2UIgHz", "product_production_dt": "2021-10-28 03:15:39.316801"}, {"product_num": 6, "product_price": 448, "product_description": "qlfqPcJkbI", "product_production_dt": "2021-10-28 03:15:40.319594"}, {"product_num": 9, "product_price": 6313, "product_description": "Ys1xvquiuV", "product_production_dt": "2021-10-28 03:15:41.322595"}, {"product_num": 8, "product_price": 7685, "product_description": "uhxCO8LFoW", "product_production_dt": "2021-10-28 03:15:42.324536"}]
Event hub로부터 Capture된 Avro 데이터 처리를 Azure Blob Storage에서 처리
그러면, Event hub에서 Capture된, Container에 저장된 blob 파일들을 읽고, Avro data를 처리한다.
... container = ContainerClient.from_connection_string(conn_str=AZURE_STORAGE_CONN_STR, container_name=AZURE_CONTAINTER_NAME) blob_list = container.list_blobs() ...
Blob storage container에서 blob들을 읽는다.
읽은 blob 파일을 위에서 처리했던 대로, Avro DataFileReader로 로드하고, 데이터를 list에 담아 json으로 출력한다.
for blob in blob_list: .. blob_client = ContainerClient.get_blob_client(container, blob=blob.name) .. reader = DataFileReader(open(cleanName, 'rb'), DatumReader()) .. for reading in reader: lst.append(json.loads(reading["Body"])) ...
전체 코드는 CloudBreadPaPa/apache-avro: Review Apache Avro dataset - Azure event hub captured data (github.com) github repository를 참고.
이렇게 간략히, Apache Avro에 대해서 처리하였다. 이후에 Spark에서 Avro 데이터를 읽고 처리하는 방안도 진행 예정.
참고링크
개발자 커뮤니티 SQLER.com - Azure Event Hubs로 kafka message 전송 처리
CloudBreadPaPa/apache-avro: Review Apache Avro dataset - Azure event hub captured data (github.com)
Capture streaming events - Azure Event Hubs - Azure Event Hubs | Microsoft Docs
Apache Avro™ 1.10.2 Getting Started (Python)