이 포스팅에서는, Azure Event hubs를 capture해 storage로 저장하면 생성되는 Avro 데이터를 Python 읽고 재처리 하는 방안에 대해 정리

 

Azure Event hubs - Apache Avro 데이터 처리

 

이 문서에서 사용하는 전체 코드는 CloudBreadPaPa/apache-avro: Review Apache Avro dataset - Azure event hub captured data (github.com) github repository에서 확인 가능.

 

Azure Event hubs와 Avro 데이터

Azure event hub의 데이터를 쉽게 아카이브 하거나 백업하는 방안으로 "Capture"가 제공된다.

Capture streaming events - Azure Event Hubs - Azure Event Hubs | Microsoft Docs

 

간략히, eventhub의 메세지를 이렇게 주기적으로 storage에 저장 하는 처리이다.

azure-eventhub-capture.png

 

Hot path와 cold path 분석

Event hub를 활용하면 훌륭한 real-time hot path 구성을 처리할 수 있다. 이 hot path를 통해 메세지를 사용하고, 다시 storage로 적재해 cold path로 활용 가능하지만, 이렇게 capture를 이용하면 마찬가지로 cold path를 더 쉽게 구성 가능하다.

 

이렇게 Event hub에서 Capture된 데이터 포맷이 Apache Avro 포맷이다.

 

azure-eventhub-capture_02.png

Event hub에서 이렇게 capture 처리 구성이 가능하다.

 

Apache Avro란 무엇인가?

Apache Avro™ is a data serialization system.

 

Avro provides:

- Rich data structures.
- A compact, fast, binary data format.
- A container file, to store persistent data.
- Remote procedure call (RPC).
- Simple integration with dynamic languages. Code generation is not required to read or write data files nor to use or implement RPC protocols. Code generation as an optional optimization, only worth implementing for statically typed languages.

 

Apache Avro™ 1.10.2 Documentation

Avro를 지원하는 여러 오픈소스 기반 분석 시스템에서 사용 가능하며, 다양한 프로그래밍 언어 라이브러리를 제공한다.

 

Avro 데이터 확인 방안

여러 방안이 있다. Apache Drill을 이용하거나, Avro Tools cli를 사용할 수도 있다. Elm으로 제작한 avro-viewer 프로젝트도 있고, demo로 체크해 볼 수도 있다. 이 문서에서는 Avro를 Python으로 탐색해 본다.

 

Python으로 Avro 데이터 탐색

코드는 지난번 수행한 Azure Event Hubs로 kafka message 전송 처리 구조를 다시 수행하였다.

현재 Python 코드가 실행 중이고, JSON 메세지가 지속적으로 event hub로 전송되는 중이며, 위의 Capture streaming events - Azure Event Hubs - Azure Event Hubs | Microsoft Docs 링크를 통해 capture 작업도 완료 했다고 가정하고 코드를 수행한다.

 

예제 데이터 파일을 data/event_hub_sample.avro 경로에 올려 두었다. 이 파일을 Avro DataFileReader를 통해 open한다.

...
# Read avro sample file from data directory
sample_file = 'data/event_hub_sample.avro'
reader = DataFileReader(open(sample_file, 'rb'), DatumReader())
...

 

Avro schema 체크

Avro는 inline으로 여러 메타 정보와 schema를 포함하고 있다. 우선 meta data와 schema 체크 수행.

# Print meta data and schema
reader.meta

 

출력 결과

{'avro.codec': b'null',
 'avro.schema': b'{"type":"record","name":"EventData","namespace":"Microsoft.ServiceBus.Messaging","fields":[{"name":"SequenceNumber","type":"long"},{"name":"Offset","type":"string"},{"name":"EnqueuedTimeUtc","type":"string"},{"name":"SystemProperties","type":{"type":"map","values":["long","double","string","bytes"]}},{"name":"Properties","type":{"type":"map","values":["long","double","string","bytes","null"]}},{"name":"Body","type":["null","bytes"]}]}'}

 

Avro 데이터 조회

개별 데이터를 iteration 하고 출력한다. 

lst = []
for reading in reader:
    lst.append(json.loads(reading["Body"]))

json.dumps(lst[0:5])

 

결과

[{"product_num": 5, "product_price": 5273, "product_description": "1pq5EDoT5u", "product_production_dt": "2021-10-28 03:15:38.314706"}, {"product_num": 8, "product_price": 6028, "product_description": "zl8E2UIgHz", "product_production_dt": "2021-10-28 03:15:39.316801"}, {"product_num": 6, "product_price": 448, "product_description": "qlfqPcJkbI", "product_production_dt": "2021-10-28 03:15:40.319594"}, {"product_num": 9, "product_price": 6313, "product_description": "Ys1xvquiuV", "product_production_dt": "2021-10-28 03:15:41.322595"}, {"product_num": 8, "product_price": 7685, "product_description": "uhxCO8LFoW", "product_production_dt": "2021-10-28 03:15:42.324536"}]

 

 

Event hub로부터 Capture된 Avro 데이터 처리를 Azure Blob Storage에서 처리

그러면, Event hub에서 Capture된, Container에 저장된 blob 파일들을 읽고, Avro data를 처리한다. 

...
container = ContainerClient.from_connection_string(conn_str=AZURE_STORAGE_CONN_STR, container_name=AZURE_CONTAINTER_NAME)
blob_list = container.list_blobs()
...

Blob storage container에서 blob들을 읽는다.

 

읽은 blob 파일을 위에서 처리했던 대로, Avro DataFileReader로 로드하고, 데이터를 list에 담아 json으로 출력한다.

for blob in blob_list:
  ..
  blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
  ..
  reader = DataFileReader(open(cleanName, 'rb'), DatumReader())
  ..
  for reading in reader:
    lst.append(json.loads(reading["Body"]))
...

 

전체 코드는 CloudBreadPaPa/apache-avro: Review Apache Avro dataset - Azure event hub captured data (github.com) github repository를 참고.

 

이렇게 간략히, Apache Avro에 대해서 처리하였다. 이후에 Spark에서 Avro 데이터를 읽고 처리하는 방안도 진행 예정.

 

참고링크

개발자 커뮤니티 SQLER.com - Azure Event Hubs로 kafka message 전송 처리

CloudBreadPaPa/apache-avro: Review Apache Avro dataset - Azure event hub captured data (github.com)

Capture streaming events - Azure Event Hubs - Azure Event Hubs | Microsoft Docs

Apache Avro™ 1.10.2 Getting Started (Python)

apache spark - PySpark: Deserializing an Avro serialized message contained in an eventhub capture avro file - Stack Overflow

 

No. Subject Author Date Views
Notice SQL강좌: 챗GPT와 함께 배우는 SQL Server 무료 강좌 목차와 소개 (2023년 9월 업데이트) 코난(김대우) 2023.08.18 28245
Notice Python 무료 강좌 - 기초, 중급, 머신러닝(2023년 6월 업데이트) 코난(김대우) 2021.01.01 15455
358 [2023 IT트렌드] 2024년까지, OO%의 기업이 IT인력 부족으로 고통 코난(김대우) 2022.12.09 144
357 AKS 실전 | ep0. 인트로 | 애저 듣고보는 잡학지식 코난(김대우) 2022.12.08 38
356 [2023 IT트렌드] 2027년까지 IT 예산 50%가 OO에 사용됩니다. file 코난(김대우) 2022.12.08 162
355 FinOps 정리 코난(김대우) 2022.12.07 50
354 디지털 주권 - 소프트웨어 정책 연구소 발표 file 코난(김대우) 2022.12.05 55
353 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep9. 클로징 | 애저 듣고보는 잡학지식 코난(김대우) 2022.11.11 51
352 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep8. 에필로그 | 애저 듣고보는 잡학지식 코난(김대우) 2022.11.09 36
351 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep7. 쿠버네티스 클러스터를 모니터링하는 방법 | 애저 듣고보는 잡학지식 코난(김대우) 2022.11.05 34
350 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep6. 리눅스와 윈도우 컨테이너 같이 쓰기 | 애저 듣고보는 잡학지식 코난(김대우) 2022.11.02 30
349 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep5. 가성비 좋은 스팟 인스턴스로 비용 아끼기 | 애저 듣고보는 잡학지식 코난(김대우) 2022.11.01 52
348 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep4. 장애에 강한 멀티 마스터 노드 쉽게 만들기 | 애저 듣고보는 잡학지식 코난(김대우) 2022.10.31 29
347 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep3. 머신 러닝용 GPU 클러스터 쉽게 만들기 | 애저 듣고보는 잡학지식 코난(김대우) 2022.10.30 36
346 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep2. 대규모 클러스터 쉽게 만들기 | 애저듣보잡 코난(김대우) 2022.10.28 38
345 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep1. AKS 엔진 소개 | 애저 듣고보는 잡학지식 코난(김대우) 2022.10.27 42
344 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep0. 인트로 | 애저 듣고보는 잡학지식 코난(김대우) 2022.10.24 62
343 {MondayAzure}Azure를 처음 공부한다구요? 기초공부 & 자격증까지!!! 코난(김대우) 2022.10.21 41
342 인공지능이 자연어로 비즈니스 앱을 제작 - OpenAI GPT-3 file 코난(김대우) 2022.10.20 67
341 클라우드 Azure 자격증 추가 소개 - AZ-900 Azure Fundamentals 코난(김대우) 2022.10.20 118
340 Azure 자격증 시험 준비 - 애저한발짝 유승호님 코난(김대우) 2022.10.19 124
» Azure Event hubs - Apache Avro 데이터 처리 file 코난(김대우) 2021.10.28 298





XE Login