Azure, AWS, GCP 클라우드 활용 Tip과 강좌 게시판

이곳은 개발자를 위한 Azure, AWS, GCP등 클라우드 활용 Tip과 강좌 게시판 입니다. 클라우드 환경을 개발하면서 알아내신 Tip이나 강좌, 새로운 소식을 적어 주시면 다른 클라우드를 공부하는 개발자 분들에게 큰 도움이 됩니다. 감사합니다. SQLER.com은 개발자와 IT전문가의 지식 나눔을 실천하기 위해 노력하고 있습니다.

이 포스팅에서는, Azure Event hubs를 capture해 storage로 저장하면 생성되는 Avro 데이터를 Python 읽고 재처리 하는 방안에 대해 정리

 

Azure Event hubs - Apache Avro 데이터 처리

 

이 문서에서 사용하는 전체 코드는 CloudBreadPaPa/apache-avro: Review Apache Avro dataset - Azure event hub captured data (github.com) github repository에서 확인 가능.

 

Azure Event hubs와 Avro 데이터

Azure event hub의 데이터를 쉽게 아카이브 하거나 백업하는 방안으로 "Capture"가 제공된다.

Capture streaming events - Azure Event Hubs - Azure Event Hubs | Microsoft Docs

 

간략히, eventhub의 메세지를 이렇게 주기적으로 storage에 저장 하는 처리이다.

azure-eventhub-capture.png

 

Hot path와 cold path 분석

Event hub를 활용하면 훌륭한 real-time hot path 구성을 처리할 수 있다. 이 hot path를 통해 메세지를 사용하고, 다시 storage로 적재해 cold path로 활용 가능하지만, 이렇게 capture를 이용하면 마찬가지로 cold path를 더 쉽게 구성 가능하다.

 

이렇게 Event hub에서 Capture된 데이터 포맷이 Apache Avro 포맷이다.

 

azure-eventhub-capture_02.png

Event hub에서 이렇게 capture 처리 구성이 가능하다.

 

Apache Avro란 무엇인가?

Apache Avro™ is a data serialization system.

 

Avro provides:

- Rich data structures.
- A compact, fast, binary data format.
- A container file, to store persistent data.
- Remote procedure call (RPC).
- Simple integration with dynamic languages. Code generation is not required to read or write data files nor to use or implement RPC protocols. Code generation as an optional optimization, only worth implementing for statically typed languages.

 

Apache Avro™ 1.10.2 Documentation

Avro를 지원하는 여러 오픈소스 기반 분석 시스템에서 사용 가능하며, 다양한 프로그래밍 언어 라이브러리를 제공한다.

 

Avro 데이터 확인 방안

여러 방안이 있다. Apache Drill을 이용하거나, Avro Tools cli를 사용할 수도 있다. Elm으로 제작한 avro-viewer 프로젝트도 있고, demo로 체크해 볼 수도 있다. 이 문서에서는 Avro를 Python으로 탐색해 본다.

 

Python으로 Avro 데이터 탐색

코드는 지난번 수행한 Azure Event Hubs로 kafka message 전송 처리 구조를 다시 수행하였다.

현재 Python 코드가 실행 중이고, JSON 메세지가 지속적으로 event hub로 전송되는 중이며, 위의 Capture streaming events - Azure Event Hubs - Azure Event Hubs | Microsoft Docs 링크를 통해 capture 작업도 완료 했다고 가정하고 코드를 수행한다.

 

예제 데이터 파일을 data/event_hub_sample.avro 경로에 올려 두었다. 이 파일을 Avro DataFileReader를 통해 open한다.

...
# Read avro sample file from data directory
sample_file = 'data/event_hub_sample.avro'
reader = DataFileReader(open(sample_file, 'rb'), DatumReader())
...

 

Avro schema 체크

Avro는 inline으로 여러 메타 정보와 schema를 포함하고 있다. 우선 meta data와 schema 체크 수행.

# Print meta data and schema
reader.meta

 

출력 결과

{'avro.codec': b'null',
 'avro.schema': b'{"type":"record","name":"EventData","namespace":"Microsoft.ServiceBus.Messaging","fields":[{"name":"SequenceNumber","type":"long"},{"name":"Offset","type":"string"},{"name":"EnqueuedTimeUtc","type":"string"},{"name":"SystemProperties","type":{"type":"map","values":["long","double","string","bytes"]}},{"name":"Properties","type":{"type":"map","values":["long","double","string","bytes","null"]}},{"name":"Body","type":["null","bytes"]}]}'}

 

Avro 데이터 조회

개별 데이터를 iteration 하고 출력한다. 

lst = []
for reading in reader:
    lst.append(json.loads(reading["Body"]))

json.dumps(lst[0:5])

 

결과

[{"product_num": 5, "product_price": 5273, "product_description": "1pq5EDoT5u", "product_production_dt": "2021-10-28 03:15:38.314706"}, {"product_num": 8, "product_price": 6028, "product_description": "zl8E2UIgHz", "product_production_dt": "2021-10-28 03:15:39.316801"}, {"product_num": 6, "product_price": 448, "product_description": "qlfqPcJkbI", "product_production_dt": "2021-10-28 03:15:40.319594"}, {"product_num": 9, "product_price": 6313, "product_description": "Ys1xvquiuV", "product_production_dt": "2021-10-28 03:15:41.322595"}, {"product_num": 8, "product_price": 7685, "product_description": "uhxCO8LFoW", "product_production_dt": "2021-10-28 03:15:42.324536"}]

 

 

Event hub로부터 Capture된 Avro 데이터 처리를 Azure Blob Storage에서 처리

그러면, Event hub에서 Capture된, Container에 저장된 blob 파일들을 읽고, Avro data를 처리한다. 

...
container = ContainerClient.from_connection_string(conn_str=AZURE_STORAGE_CONN_STR, container_name=AZURE_CONTAINTER_NAME)
blob_list = container.list_blobs()
...

Blob storage container에서 blob들을 읽는다.

 

읽은 blob 파일을 위에서 처리했던 대로, Avro DataFileReader로 로드하고, 데이터를 list에 담아 json으로 출력한다.

for blob in blob_list:
  ..
  blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
  ..
  reader = DataFileReader(open(cleanName, 'rb'), DatumReader())
  ..
  for reading in reader:
    lst.append(json.loads(reading["Body"]))
...

 

전체 코드는 CloudBreadPaPa/apache-avro: Review Apache Avro dataset - Azure event hub captured data (github.com) github repository를 참고.

 

이렇게 간략히, Apache Avro에 대해서 처리하였다. 이후에 Spark에서 Avro 데이터를 읽고 처리하는 방안도 진행 예정.

 

참고링크

개발자 커뮤니티 SQLER.com - Azure Event Hubs로 kafka message 전송 처리

CloudBreadPaPa/apache-avro: Review Apache Avro dataset - Azure event hub captured data (github.com)

Capture streaming events - Azure Event Hubs - Azure Event Hubs | Microsoft Docs

Apache Avro™ 1.10.2 Getting Started (Python)

apache spark - PySpark: Deserializing an Avro serialized message contained in an eventhub capture avro file - Stack Overflow

 

No. Subject Author Date Views
» Azure Event hubs - Apache Avro 데이터 처리 file 코난(김대우) 2021.10.28 203
338 Azure Data Explorer - SELECT INTO(CTAS) 또는 INSERT SELECT 쿼리 수행 코난(김대우) 2021.10.26 189
337 Azure Data Explorer에서 Trigger 기능 구현 - update policy file 코난(김대우) 2021.10.22 176
336 vscode에서 일관된 팀 단위 개발 환경 구성 - devcontainer file 코난(김대우) 2021.10.19 221
335 Bicep - Azure 클라우드 리소스 배포를 위한 언어 file 코난(김대우) 2021.10.19 55
334 Azure Data Explorer - Event Hub 스트리밍 ingest 중 컬럼추가 file 코난(김대우) 2021.10.18 51
333 SonarQube 리뷰 및 Azure DevOps 연결 file 코난(김대우) 2021.10.01 80
332 PySpark, koalas와 pandas dataframe file 코난(김대우) 2021.09.29 153
331 Apache Spark, pyspark 설치 후 jupyter notebook 실행 file 코난(김대우) 2021.09.29 121
330 Azure Data Explorer의 데이터를 Python Pandas Dataframe과 CSV로 변환 코난(김대우) 2021.09.28 80
329 Azure Blob Storage SAS token 생성 코난(김대우) 2021.09.17 79
328 Azure Data Factory를 이용해 ADX에서 SQL로 900만건의 데이터 전송 file 코난(김대우) 2021.09.16 129
327 Azure Data Explorer에서 SQL서버 데이터베이스 테이블 조회/삽입 - sql_request plugin file 코난(김대우) 2021.09.16 70
326 Azure Data Explorer에 대량 CSV 파일 ingest 코난(김대우) 2021.09.15 67
325 Azure Event Hubs의 데이터를 Azure Data Explorer로 전송 file 코난(김대우) 2021.09.15 92
324 Azure Event Hubs로 kafka message 전송 처리 file 코난(김대우) 2021.09.15 98
323 Service Principal과 Azure 리소스 접근/사용을 위한 인증 방법 3+1가지 file 코난(김대우) 2020.12.26 324
322 Azure storage 관리 도구 - storage explorer 설치와 사용 방법 코난(김대우) 2020.12.25 231
321 Azure cli - command line interface 명령줄 인터페이스 도구를 쓰는 이유와 방법 코난(김대우) 2020.12.25 235
320 클라우드 오픈소스 개발환경 - WSL [1] file 코난(김대우) 2020.12.20 924





XE Login