Azure, AWS, GCP 클라우드 활용 Tip과 강좌 게시판

이곳은 개발자를 위한 Azure, AWS, GCP등 클라우드 활용 Tip과 강좌 게시판 입니다. 클라우드 환경을 개발하면서 알아내신 Tip이나 강좌, 새로운 소식을 적어 주시면 다른 클라우드를 공부하는 개발자 분들에게 큰 도움이 됩니다. 감사합니다. SQLER.com은 개발자와 IT전문가의 지식 나눔을 실천하기 위해 노력하고 있습니다.

이 포스팅에서는, Azure Event hubs를 capture해 storage로 저장하면 생성되는 Avro 데이터를 Python 읽고 재처리 하는 방안에 대해 정리

 

Azure Event hubs - Apache Avro 데이터 처리

 

이 문서에서 사용하는 전체 코드는 CloudBreadPaPa/apache-avro: Review Apache Avro dataset - Azure event hub captured data (github.com) github repository에서 확인 가능.

 

Azure Event hubs와 Avro 데이터

Azure event hub의 데이터를 쉽게 아카이브 하거나 백업하는 방안으로 "Capture"가 제공된다.

Capture streaming events - Azure Event Hubs - Azure Event Hubs | Microsoft Docs

 

간략히, eventhub의 메세지를 이렇게 주기적으로 storage에 저장 하는 처리이다.

azure-eventhub-capture.png

 

Hot path와 cold path 분석

Event hub를 활용하면 훌륭한 real-time hot path 구성을 처리할 수 있다. 이 hot path를 통해 메세지를 사용하고, 다시 storage로 적재해 cold path로 활용 가능하지만, 이렇게 capture를 이용하면 마찬가지로 cold path를 더 쉽게 구성 가능하다.

 

이렇게 Event hub에서 Capture된 데이터 포맷이 Apache Avro 포맷이다.

 

azure-eventhub-capture_02.png

Event hub에서 이렇게 capture 처리 구성이 가능하다.

 

Apache Avro란 무엇인가?

Apache Avro™ is a data serialization system.

 

Avro provides:

- Rich data structures.
- A compact, fast, binary data format.
- A container file, to store persistent data.
- Remote procedure call (RPC).
- Simple integration with dynamic languages. Code generation is not required to read or write data files nor to use or implement RPC protocols. Code generation as an optional optimization, only worth implementing for statically typed languages.

 

Apache Avro™ 1.10.2 Documentation

Avro를 지원하는 여러 오픈소스 기반 분석 시스템에서 사용 가능하며, 다양한 프로그래밍 언어 라이브러리를 제공한다.

 

Avro 데이터 확인 방안

여러 방안이 있다. Apache Drill을 이용하거나, Avro Tools cli를 사용할 수도 있다. Elm으로 제작한 avro-viewer 프로젝트도 있고, demo로 체크해 볼 수도 있다. 이 문서에서는 Avro를 Python으로 탐색해 본다.

 

Python으로 Avro 데이터 탐색

코드는 지난번 수행한 Azure Event Hubs로 kafka message 전송 처리 구조를 다시 수행하였다.

현재 Python 코드가 실행 중이고, JSON 메세지가 지속적으로 event hub로 전송되는 중이며, 위의 Capture streaming events - Azure Event Hubs - Azure Event Hubs | Microsoft Docs 링크를 통해 capture 작업도 완료 했다고 가정하고 코드를 수행한다.

 

예제 데이터 파일을 data/event_hub_sample.avro 경로에 올려 두었다. 이 파일을 Avro DataFileReader를 통해 open한다.

...
# Read avro sample file from data directory
sample_file = 'data/event_hub_sample.avro'
reader = DataFileReader(open(sample_file, 'rb'), DatumReader())
...

 

Avro schema 체크

Avro는 inline으로 여러 메타 정보와 schema를 포함하고 있다. 우선 meta data와 schema 체크 수행.

# Print meta data and schema
reader.meta

 

출력 결과

{'avro.codec': b'null',
 'avro.schema': b'{"type":"record","name":"EventData","namespace":"Microsoft.ServiceBus.Messaging","fields":[{"name":"SequenceNumber","type":"long"},{"name":"Offset","type":"string"},{"name":"EnqueuedTimeUtc","type":"string"},{"name":"SystemProperties","type":{"type":"map","values":["long","double","string","bytes"]}},{"name":"Properties","type":{"type":"map","values":["long","double","string","bytes","null"]}},{"name":"Body","type":["null","bytes"]}]}'}

 

Avro 데이터 조회

개별 데이터를 iteration 하고 출력한다. 

lst = []
for reading in reader:
    lst.append(json.loads(reading["Body"]))

json.dumps(lst[0:5])

 

결과

[{"product_num": 5, "product_price": 5273, "product_description": "1pq5EDoT5u", "product_production_dt": "2021-10-28 03:15:38.314706"}, {"product_num": 8, "product_price": 6028, "product_description": "zl8E2UIgHz", "product_production_dt": "2021-10-28 03:15:39.316801"}, {"product_num": 6, "product_price": 448, "product_description": "qlfqPcJkbI", "product_production_dt": "2021-10-28 03:15:40.319594"}, {"product_num": 9, "product_price": 6313, "product_description": "Ys1xvquiuV", "product_production_dt": "2021-10-28 03:15:41.322595"}, {"product_num": 8, "product_price": 7685, "product_description": "uhxCO8LFoW", "product_production_dt": "2021-10-28 03:15:42.324536"}]

 

 

Event hub로부터 Capture된 Avro 데이터 처리를 Azure Blob Storage에서 처리

그러면, Event hub에서 Capture된, Container에 저장된 blob 파일들을 읽고, Avro data를 처리한다. 

...
container = ContainerClient.from_connection_string(conn_str=AZURE_STORAGE_CONN_STR, container_name=AZURE_CONTAINTER_NAME)
blob_list = container.list_blobs()
...

Blob storage container에서 blob들을 읽는다.

 

읽은 blob 파일을 위에서 처리했던 대로, Avro DataFileReader로 로드하고, 데이터를 list에 담아 json으로 출력한다.

for blob in blob_list:
  ..
  blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
  ..
  reader = DataFileReader(open(cleanName, 'rb'), DatumReader())
  ..
  for reading in reader:
    lst.append(json.loads(reading["Body"]))
...

 

전체 코드는 CloudBreadPaPa/apache-avro: Review Apache Avro dataset - Azure event hub captured data (github.com) github repository를 참고.

 

이렇게 간략히, Apache Avro에 대해서 처리하였다. 이후에 Spark에서 Avro 데이터를 읽고 처리하는 방안도 진행 예정.

 

참고링크

개발자 커뮤니티 SQLER.com - Azure Event Hubs로 kafka message 전송 처리

CloudBreadPaPa/apache-avro: Review Apache Avro dataset - Azure event hub captured data (github.com)

Capture streaming events - Azure Event Hubs - Azure Event Hubs | Microsoft Docs

Apache Avro™ 1.10.2 Getting Started (Python)

apache spark - PySpark: Deserializing an Avro serialized message contained in an eventhub capture avro file - Stack Overflow

 

No. Subject Author Date Views
353 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep9. 클로징 | 애저 듣고보는 잡학지식 코난(김대우) 2022.11.11 24
352 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep8. 에필로그 | 애저 듣고보는 잡학지식 코난(김대우) 2022.11.09 11
351 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep7. 쿠버네티스 클러스터를 모니터링하는 방법 | 애저 듣고보는 잡학지식 코난(김대우) 2022.11.05 9
350 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep6. 리눅스와 윈도우 컨테이너 같이 쓰기 | 애저 듣고보는 잡학지식 코난(김대우) 2022.11.02 13
349 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep5. 가성비 좋은 스팟 인스턴스로 비용 아끼기 | 애저 듣고보는 잡학지식 코난(김대우) 2022.11.01 16
348 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep4. 장애에 강한 멀티 마스터 노드 쉽게 만들기 | 애저 듣고보는 잡학지식 코난(김대우) 2022.10.31 10
347 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep3. 머신 러닝용 GPU 클러스터 쉽게 만들기 | 애저 듣고보는 잡학지식 코난(김대우) 2022.10.30 8
346 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep2. 대규모 클러스터 쉽게 만들기 | 애저듣보잡 코난(김대우) 2022.10.28 12
345 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep1. AKS 엔진 소개 | 애저 듣고보는 잡학지식 코난(김대우) 2022.10.27 12
344 애저 쿠버네티스 오픈소스로 설계하는 나만의 인프라 | ep0. 인트로 | 애저 듣고보는 잡학지식 코난(김대우) 2022.10.24 22
343 {MondayAzure}Azure를 처음 공부한다구요? 기초공부 & 자격증까지!!! 코난(김대우) 2022.10.21 13
342 인공지능이 자연어로 비즈니스 앱을 제작 - OpenAI GPT-3 file 코난(김대우) 2022.10.20 35
341 클라우드 Azure 자격증 추가 소개 - AZ-900 Azure Fundamentals 코난(김대우) 2022.10.20 21
340 Azure 자격증 시험 준비 - 애저한발짝 유승호님 코난(김대우) 2022.10.19 24
» Azure Event hubs - Apache Avro 데이터 처리 file 코난(김대우) 2021.10.28 262
338 Azure Data Explorer - SELECT INTO(CTAS) 또는 INSERT SELECT 쿼리 수행 코난(김대우) 2021.10.26 240
337 Azure Data Explorer에서 Trigger 기능 구현 - update policy file 코난(김대우) 2021.10.22 206
336 vscode에서 일관된 팀 단위 개발 환경 구성 - devcontainer file 코난(김대우) 2021.10.19 277
335 Bicep - Azure 클라우드 리소스 배포를 위한 언어 file 코난(김대우) 2021.10.19 71
334 Azure Data Explorer - Event Hub 스트리밍 ingest 중 컬럼추가 file 코난(김대우) 2021.10.18 66





XE Login