Event hub를 이용하는 여러 방안들은 해 보았으나, kafka를 이용하는 방법은 기회가 많지 않았다.
이번에 kafka를 일부 진행하게 되어 간단히 정리.
Apache kafka
큐(queue)나 토픽(topic)을 이용하며, 메세지 스트리밍 기능을 제공하는 분산 시스템.
Azure Eventhub에서 kafka message 수신 및 처리
Azure에서 제공하는 Event hub는 아무 추가 조치 없이 kafka protocol로 전송되는 메세지를 처리 가능.
Event hub가 제공하는 여러 PaaS 기능과 분산 처리 기술, azure의 여러 리소스들과 연동도 다양하게 제공한다.
Kafka와 event hub의 비교는 아래 문서를 참조.
Use event hub from Apache Kafka app - Azure Event Hubs - Azure Event Hubs | Microsoft Docs
Azure에서 event hub 생성
다른 조치가 필요하지 않다. 아래 내용을 챀조해 생성하면 된다. 기본적으로 kafka 프로토콜은 standard 이상의 scale 에서만 제공된다.
Azure Quickstart - Create an event hub using the Azure portal - Azure Event Hubs | Microsoft Docs
Kafka Python 코드를 이용해 event hub로 메세지 전송
Python을 이용해 간단히 kafka 메세지를 전송 가능하다. 관련 코드는 아래에 올려 두었다.
Event hub를 처음 생성하면 event name space를 생성하고, 그 이후 eventhub에서 수신할 eventhub를 생성해야 한다.
즉, 처음 접하면 namespace의 endpoint와 개별 eventhub의 endpoint들이 있어 혼란스러울 수 있으니 주의.
대부분의 kafka client에서 필요로 하는 정보는 다음과 같다.
ssl_ca_location="/usr/lib/ssl/certs/ca-certificates.crt" # ubuntu ca certificate bootstrap_servers="<YOUR-EVENTHUB-NS>.servicebus.windows.net:9093" sasl_password="<YOUR-EVENTHUB-CONNECTION-STRING>" topic_name="<YOUR-EVENTHUB-NAME>"
ssl_ca_location: 우분투의 경우와 MacOS의 경우 차이가 있다. 현재 Ubuntu WSL을 이용해 테스트 중이라 위와 같이 설정
Ubuntu distributions can typically use the default
/usr/lib/ssl/certs/ca-certificates.crt
; MacOS users that installedopenssl
via Homebrew can typically use/usr/local/etc/openssl/cert.pem
.
bootstrap_servers: eventhub namespace 정보이다.
sasl_password: 개별 eventhub의 endpoint connection string이다.
개별 event hub를 생성하려면, 이렇게 eventhub에서 add를 수행하고
생성된 eventhub에서 "shared access policies"를 추가하고,
생성된 ploicy를 열면 Endpoint connection string이 있다. 이것을 추가한다.
끝으로, topic_name은 eventhub의 이름을 넣는다.
Kafka clinet 설치 및 python package 설치
setup.sh를 실행해 구성한다. 이때, conda env를 이용할 경우 pip 설치시 sudo를 제거하고 수행한다.
Kafka 클라이언트 종류와 차이
참고자료 : Getting started with Apache Kafka in Python | by Adnan Siddiqi | Towards Data Science
위의 문서에 따르면, 세가지 정도의 방법이 있다. Confluent kafka가 성능면에서 우세한 것으로 보이나, 깊이있는 차이가 필요하면 조금 더 확인할 필요가 있다.
Accessing Kafka in Python
There are multiple Python libraries available for usage:
- Kafka-Python — An open-source community-based library.
- PyKafka — This library is maintained by Parsly and it’s claimed to be a Pythonic API. Unlike Kafka-Python you can’t create dynamic topics.
- Confluent Python Kafka:- It is offered by Confluent as a thin wrapper around librdkafka, hence it’s performance is better than the two.
실행을 위한 환경변수 설정
github에 올려둔 producer.py 코드는 환경변수를 이용하도록 작업했다.
이를 위해 sh 파일을 생성해 환경 변수를 넣거나, bash profile 등에 넣어 처리 가능하다. 이런 식으로 만들어서 실행했다.
export ssl_ca_location="/usr/lib/ssl/certs/ca-certificates.crt" # ubuntu ca certificate export bootstrap_servers="<YOUR-EVENTHUB-NS>.servicebus.windows.net:9093" export sasl_password="<YOUR-EVENTHUB-CONNECTION-STRING>" export topic_name="<YOUR-EVENTHUB-NAME>" python producer.py
실행 후 발생하는 오류
ca-certificate.crt 위치가 잘못되었거나 존재하지 않는 오류 발생했다.
다시 체크해 처리했으며 Ubuntu의 경우 "/usr/lib/ssl/certs/ca-certificates.crt" 위치에 존재한다.
%3|1631670594.732|SSL|python-example-producer#producer-1| [thrd:app]: error:02001002:system library:fopen:No such file or directory: fopen('/path/to/ca-certificate.crt','r') %3|1631670594.732|SSL|python-example-producer#producer-1| [thrd:app]: error:2006D080:BIO routines:BIO_new_file:no such file Traceback (most recent call last): File "producer.py", line 33, in <module> p = Producer(**conf) cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Failed to create producer: ssl.ca.location failed: error:0B084002:x509 certificate routines:X509_load_cert_crl_file:system lib"}
Azure Event hub의 SKU가 낮아 발생하는 오류. standard 이상으로 올리면 해결된다.
% Waiting for 1 deliveries %3|1631671736.190|FAIL|python-example-producer#producer-1| [thrd:sasl_ssl://dw-evthub-kafka.servicebus.windows.net:9093/bootstra]: sasl_ssl://dw-evthub-kafka.servicebus.windows.net:9093/bootstrap: SASL authentication error: Kafka protocol is supported for Standard, Premium and Dedicated SKU only. Namespace: dw-evthub-kafka. (after 17ms in state AUTH_REQ)
Topic의 이름을 잘못 지정할 경우 발생하는 오류. Event hub의 이름으로 topic을 지정하면 해결된다.
% Waiting for 1 deliveries % Message failed delivery: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
Event hub에 message가 도착했는지 확인하는 방법
여러 방법이 있으나, "Service Bus Explorer" 이용을 추천한다.
설치 후 실행하면 이런 화면이고,
이렇게 "Create Consumer Group Lsitener" Listener를 eventhub에 붙여서 아래처럼 메세지를 확인 가능하다.
참고링크:
Azure Quickstart - Create an event hub using the Azure portal - Azure Event Hubs | Microsoft Docs
Use event hub from Apache Kafka app - Azure Event Hubs - Azure Event Hubs | Microsoft Docs
Getting started with Apache Kafka in Python | by Adnan Siddiqi | Towards Data Science
Azure/azure-event-hubs-for-kafka: Azure Event Hubs for Apache Kafka Ecosystems (github.com)
edenhill/librdkafka: The Apache Kafka C/C++ library (github.com)