Event hub를 이용하는 여러 방안들은 해 보았으나, kafka를 이용하는 방법은 기회가 많지 않았다.

이번에 kafka를 일부 진행하게 되어 간단히 정리.

 

Apache kafka

큐(queue)나 토픽(topic)을 이용하며, 메세지 스트리밍 기능을 제공하는 분산 시스템.

아파치 카프카 (apache.org)

 

Azure Eventhub에서 kafka message 수신 및 처리

Azure에서 제공하는 Event hub는 아무 추가 조치 없이 kafka protocol로 전송되는 메세지를 처리 가능.

Event hub가 제공하는 여러 PaaS 기능과 분산 처리 기술, azure의 여러 리소스들과 연동도 다양하게 제공한다.

Quickstart: Data streaming with Azure Event Hubs using the Kafka protocol - Azure Event Hubs | Microsoft Docs

Kafka와 event hub의 비교는 아래 문서를 참조.

Use event hub from Apache Kafka app - Azure Event Hubs - Azure Event Hubs | Microsoft Docs

 

Azure에서 event hub 생성

다른 조치가 필요하지 않다. 아래 내용을 챀조해 생성하면 된다. 기본적으로 kafka 프로토콜은 standard 이상의 scale 에서만 제공된다.

Azure Quickstart - Create an event hub using the Azure portal - Azure Event Hubs | Microsoft Docs

 

Kafka Python 코드를 이용해 event hub로 메세지 전송

Python을 이용해 간단히 kafka 메세지를 전송 가능하다. 관련 코드는 아래에 올려 두었다.

CloudBreadPaPa/azure-eventhub-kafka-python: produce messages to event hub for kafka protocol (github.com)

 

Event hub를 처음 생성하면 event name space를 생성하고, 그 이후 eventhub에서 수신할 eventhub를 생성해야 한다.

즉, 처음 접하면 namespace의 endpoint와 개별 eventhub의 endpoint들이 있어 혼란스러울 수 있으니 주의.

 

대부분의 kafka client에서 필요로 하는 정보는 다음과 같다.

 

ssl_ca_location="/usr/lib/ssl/certs/ca-certificates.crt"  # ubuntu ca certificate
bootstrap_servers="<YOUR-EVENTHUB-NS>.servicebus.windows.net:9093"
sasl_password="<YOUR-EVENTHUB-CONNECTION-STRING>"
topic_name="<YOUR-EVENTHUB-NAME>"

 

ssl_ca_location: 우분투의 경우와 MacOS의 경우 차이가 있다. 현재 Ubuntu WSL을 이용해 테스트 중이라 위와 같이 설정

Ubuntu distributions can typically use the default /usr/lib/ssl/certs/ca-certificates.crt; MacOS users that installed openssl via Homebrew can typically use /usr/local/etc/openssl/cert.pem.

azure-event-hubs-for-kafka/quickstart/python at master · Azure/azure-event-hubs-for-kafka (github.com)

bootstrap_servers: eventhub namespace 정보이다.

sasl_password: 개별 eventhub의 endpoint connection string이다.

개별 event hub를 생성하려면, 이렇게 eventhub에서 add를 수행하고

Add Event Hub - button

 

생성된 eventhub에서 "shared access policies"를 추가하고,

Create event hub shared access policy

 

생성된 ploicy를 열면 Endpoint connection string이 있다. 이것을 추가한다.

Save the shared access policy connection string

 

끝으로, topic_name은 eventhub의 이름을 넣는다.

 

Kafka clinet 설치 및 python package 설치

setup.sh를 실행해 구성한다. 이때, conda env를 이용할 경우 pip 설치시 sudo를 제거하고 수행한다.

azure-eventhub-kafka-python/setup.sh at main · CloudBreadPaPa/azure-eventhub-kafka-python (github.com)

 

Kafka 클라이언트 종류와 차이

 

참고자료 : Getting started with Apache Kafka in Python | by Adnan Siddiqi | Towards Data Science

위의 문서에 따르면, 세가지 정도의 방법이 있다. Confluent kafka가 성능면에서 우세한 것으로 보이나, 깊이있는 차이가 필요하면 조금 더 확인할 필요가 있다.

Accessing Kafka in Python

There are multiple Python libraries available for usage:

  • Kafka-Python — An open-source community-based library.
  • PyKafka — This library is maintained by Parsly and it’s claimed to be a Pythonic API. Unlike Kafka-Python you can’t create dynamic topics.
  • Confluent Python Kafka:- It is offered by Confluent as a thin wrapper around librdkafka, hence it’s performance is better than the two.

 

실행을 위한 환경변수 설정

github에 올려둔 producer.py 코드는 환경변수를 이용하도록 작업했다.

azure-eventhub-kafka-python/producer.py at main · CloudBreadPaPa/azure-eventhub-kafka-python (github.com)

이를 위해 sh 파일을 생성해 환경 변수를 넣거나, bash profile 등에 넣어 처리 가능하다. 이런 식으로 만들어서 실행했다.

export ssl_ca_location="/usr/lib/ssl/certs/ca-certificates.crt"  # ubuntu ca certificate
export bootstrap_servers="<YOUR-EVENTHUB-NS>.servicebus.windows.net:9093"
export sasl_password="<YOUR-EVENTHUB-CONNECTION-STRING>"
export topic_name="<YOUR-EVENTHUB-NAME>"

python producer.py

 

실행 후 발생하는 오류

ca-certificate.crt 위치가 잘못되었거나 존재하지 않는 오류 발생했다.

다시 체크해 처리했으며 Ubuntu의 경우 "/usr/lib/ssl/certs/ca-certificates.crt" 위치에 존재한다.

 

%3|1631670594.732|SSL|python-example-producer#producer-1| [thrd:app]: error:02001002:system library:fopen:No such file or directory: fopen('/path/to/ca-certificate.crt','r')
%3|1631670594.732|SSL|python-example-producer#producer-1| [thrd:app]: error:2006D080:BIO routines:BIO_new_file:no such file
Traceback (most recent call last):
  File "producer.py", line 33, in <module>
    p = Producer(**conf)
cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Failed to create producer: ssl.ca.location failed: error:0B084002:x509 certificate routines:X509_load_cert_crl_file:system lib"}

 

Azure Event hub의 SKU가 낮아 발생하는 오류. standard 이상으로 올리면 해결된다.

% Waiting for 1 deliveries
%3|1631671736.190|FAIL|python-example-producer#producer-1| [thrd:sasl_ssl://dw-evthub-kafka.servicebus.windows.net:9093/bootstra]: sasl_ssl://dw-evthub-kafka.servicebus.windows.net:9093/bootstrap: SASL authentication error: Kafka protocol is supported for Standard, Premium and Dedicated SKU only. Namespace: dw-evthub-kafka. (after 17ms in state AUTH_REQ)

 

Topic의 이름을 잘못 지정할 경우 발생하는 오류. Event hub의 이름으로 topic을 지정하면 해결된다.

% Waiting for 1 deliveries
% Message failed delivery: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}

 

Event hub에 message가 도착했는지 확인하는 방법

여러 방법이 있으나, "Service Bus Explorer" 이용을 추천한다.

paolosalvatori/ServiceBusExplorer: The Service Bus Explorer allows users to connect to a Service Bus namespace and administer messaging entities in an easy manner. The tool provides advanced features like import/export functionality or the ability to test topic, queues, subscriptions, relay services, notification hubs and events hubs. (github.com)

 

Service Bus Explorer

설치 후 실행하면 이런 화면이고,

 

ServiceBusExplorer01.png

이렇게 "Create Consumer Group Lsitener" Listener를 eventhub에 붙여서 아래처럼 메세지를 확인 가능하다.

 

ServiceBusExplorer02.png

 

 

참고링크:

CloudBreadPaPa/azure-eventhub-kafka-python: produce messages to event hub for kafka protocol (github.com)

Azure Quickstart - Create an event hub using the Azure portal - Azure Event Hubs | Microsoft Docs

Use event hub from Apache Kafka app - Azure Event Hubs - Azure Event Hubs | Microsoft Docs

Getting started with Apache Kafka in Python | by Adnan Siddiqi | Towards Data Science

Send or receive events from Azure Event Hubs using Python (latest) - Azure Event Hubs | Microsoft Docs

Azure/azure-event-hubs-for-kafka: Azure Event Hubs for Apache Kafka Ecosystems (github.com)

azure-event-hubs-for-kafka/quickstart/python at master · Azure/azure-event-hubs-for-kafka (github.com)

edenhill/librdkafka: The Apache Kafka C/C++ library (github.com)

paolosalvatori/ServiceBusExplorer: The Service Bus Explorer allows users to connect to a Service Bus namespace and administer messaging entities in an easy manner. The tool provides advanced features like import/export functionality or the ability to test topic, queues, subscriptions, relay services, notification hubs and events hubs. (github.com)

 

 

No. Subject Author Date Views
Notice SQL강좌: 챗GPT와 함께 배우는 SQL Server 무료 강좌 목차와 소개 (2023년 9월 업데이트) 코난(김대우) 2023.08.18 36281
Notice Python 무료 강좌 - 기초, 중급, 머신러닝(2023년 6월 업데이트) 코난(김대우) 2021.01.01 18892
338 Azure Data Explorer - SELECT INTO(CTAS) 또는 INSERT SELECT 쿼리 수행 코난(김대우) 2021.10.26 336
337 Azure Data Explorer에서 Trigger 기능 구현 - update policy file 코난(김대우) 2021.10.22 286
336 vscode에서 일관된 팀 단위 개발 환경 구성 - devcontainer file 코난(김대우) 2021.10.19 579
335 Bicep - Azure 클라우드 리소스 배포를 위한 언어 file 코난(김대우) 2021.10.19 149
334 Azure Data Explorer - Event Hub 스트리밍 ingest 중 컬럼추가 file 코난(김대우) 2021.10.18 150
333 SonarQube 리뷰 및 Azure DevOps 연결 file 코난(김대우) 2021.10.01 235
332 PySpark, koalas와 pandas dataframe file 코난(김대우) 2021.09.29 259
331 Apache Spark, pyspark 설치 후 jupyter notebook 실행 file 코난(김대우) 2021.09.29 393
330 Azure Data Explorer의 데이터를 Python Pandas Dataframe과 CSV로 변환 코난(김대우) 2021.09.28 178
329 Azure Blob Storage SAS token 생성 코난(김대우) 2021.09.17 227
328 Azure Data Factory를 이용해 ADX에서 SQL로 900만건의 데이터 전송 file 코난(김대우) 2021.09.16 276
327 Azure Data Explorer에서 SQL서버 데이터베이스 테이블 조회/삽입 - sql_request plugin file 코난(김대우) 2021.09.16 169
326 Azure Data Explorer에 대량 CSV 파일 ingest 코난(김대우) 2021.09.15 163
325 Azure Event Hubs의 데이터를 Azure Data Explorer로 전송 file 코난(김대우) 2021.09.15 219
» Azure Event Hubs로 kafka message 전송 처리 file 코난(김대우) 2021.09.15 268
323 Service Principal과 Azure 리소스 접근/사용을 위한 인증 방법 3+1가지 file 코난(김대우) 2020.12.26 638
322 Azure storage 관리 도구 - storage explorer 설치와 사용 방법 코난(김대우) 2020.12.25 420
321 Azure cli - command line interface 명령줄 인터페이스 도구를 쓰는 이유와 방법 코난(김대우) 2020.12.25 361
320 클라우드 오픈소스 개발환경 - WSL [1] file 코난(김대우) 2020.12.20 1246
319 Cloud RoadShow 세션 발표 자료 코난(김대우) 2016.05.04 11424





XE Login