Apache Kafka Connect to Azure Event Hubs
Recently I was doing integration with Azure Event Hubs. A colleague struggled to export the messages in an existing Kafka topic and import them to Event Hubs. Therefore I document the steps below, which you may find helpful.
Step 1: Get Kafka, download and extract it:
Apache Kafka is an open-source distributed event streaming platform. It helps build distributed system and ensures high throughput. The Apache Kafka can be downloaded from this address: https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.13-3.1.0.tgz
$ tar -xzf kafka_2.13–3.1.0.tgz$ cd kafka_2.13–3.1.0
Step 2: Start the Kafka environment
If your local environment already has Java 8+ installed, follow and run the below command to start all services. (If not, download and install Java: https://www.oracle.com/java/technologies/downloads/#jdk18-mac )
Run zookeeper service:
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Start Kafka broker
$ bin/kafka-server-start.sh config/server.properties
Step 3: Create and setup config
Create a new file `connector.properties` with the values below.
bootstrap.servers={NAMESPACE.NAME}.servicebus.windows.net:9093security.protocol=SASL_SSLsasl.mechanism=PLAINsasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=”$ConnectionString” password=”Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}”;
And create a new file `connect-distributed.properties`
bootstrap.servers={NAMESPACE.NAME}.servicebus.windows.net:9093group.id=connect-cluster-group# connect internal topic names, automatically created by Kafka Connect with AdminClient API if not existsconfig.storage.topic=connect-cluster-configsoffset.storage.topic=connect-cluster-offsetsstatus.storage.topic=connect-cluster-status# internal topic replication factors — auto 3x replication in Azure Storageconfig.storage.replication.factor=1offset.storage.replication.factor=1status.storage.replication.factor=1rest.advertised.host.name=connectoffset.flush.interval.ms=10000connections.max.idle.ms=180000metadata.max.age.ms=180000key.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter=org.apache.kafka.connect.json.JsonConverterinternal.value.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter.schemas.enable=falseinternal.value.converter.schemas.enable=false# required EH Kafka security settingssecurity.protocol=SASL_SSLsasl.mechanism=PLAINsasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=”$ConnectionString” password=”Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}”;producer.security.protocol=SASL_SSLproducer.sasl.mechanism=PLAINproducer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=”$ConnectionString” password=”Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}”;consumer.security.protocol=SASL_SSLconsumer.sasl.mechanism=PLAINconsumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=”$ConnectionString” password=”Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}”;plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release
Replace the placeholder values with the Azure endpoint. Create a new namespace and deploy Events Hubs resources if you haven’t already in the Azure portal. Note that you might need to select pricing tier `Standard` or higher to create Kafka topics successfully in the next step.
The password above can be found in the settings of Event Hub namespace, `Shared access policies` and using the SAS Policy: `RootManageSharedAccessKey`. Copy the `Connection string–primary key` values and replace the above config file values.
Step 4: Create 3 kafka topics:
We would use `kafka-topics` commands to create the topics ourselves:
Create configs topic:
$ bin/kafka-topics.sh — bootstrap-server {NAMESPACE.NAME}.servicebus.windows.net:9093 — command-config path/to/connector.properties — create — topic CONFIGS-TOPIC-NAME — config cleanup.policy=compact — partitions 1 — replication-factor 1
If successful, you would see the response `Created topic CONFIGS-TOPIC-NAME.`
Create offsets topic:
$ bin/kafka-topics.sh — bootstrap-server {NAMESPACE.NAME}.servicebus.windows.net:9093 — command-config path/to/connector.properties — create — topic OFFSETS-TOPIC-NAME — config cleanup.policy=compact — partitions 25 — replication-factor 1
If successful, you would see the response `Created topic OFFSETS-TOPIC-NAME.`
Create status topic:
$ bin/kafka-topics.sh — bootstrap-server {NAMESPACE.NAME}.servicebus.windows.net:9093 — command-config path/to/connector.properties — create — topic STATUS-TOPIC-NAME — config cleanup.policy=compact — partitions 5 — replication-factor 1
If successful, you would see the response `Created topic STATUS-TOPIC-NAME.`
Step 5: Running Kafka Connect
Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and Azure Event Hubs. It allows you to continuously ingest data from Azure Event Hubs into Kafka and vice versa. To continuously import/export your data into and out of Kafka, start worker locally in distributed mode.
$ bin/connect-distributed.sh path/to/connect-distributed.properties
With everything above up and running, you can test out the import and export in the next step.
Step 6: Create input and output files:
Create a directory and then create two files: one file with seed data from which the FileStreamSource connector will read and another to which our FileStreamSink connector will write.
$ mkdir ~/connect-demo$ seq 1000 > ~/connect-demo/input.txt$ touch ~/connect-demo/output.txt
Step 7: Create filestream source connector:
Next, I will walk you through spinning up FileStreamSource.
curl -s -X POST -H “Content-Type: application/json” — data ‘{“name”: “file-source”,”config”: {“connector.class”:”org.apache.kafka.connect.file.FileStreamSourceConnector”,”tasks.max”:”1",”topic”:”connect-demo”,”file”: “{YOUR/HOME/PATH}/connect-demo/input.txt”}}’ http://localhost:8083/connectors
And check the status:
curl -s http://localhost:8083/connectors/file-source/status
If it’s a success, it would respond:
{“name”:”file-source”,”connector”:{“state”:”RUNNING”,”worker_id”:”connect:8083"},”tasks”:[{“id”:0,”state”:”RUNNING”,”worker_id”:”connect:8083"}],”type”:”source”}
Step 8: Create FileStreamSink Connector
Similar to above, spinning up FileStreamSink connectors.
curl -X POST -H “Content-Type: application/json” — data ‘{“name”: “file-sink”, “config”: {“connector.class”:”org.apache.kafka.connect.file.FileStreamSinkConnector”, “tasks.max”:”1", “topics”:”connect-demo”, “file”: “{YOUR/HOME/PATH}/connect-demo/output.txt”}}’ http://localhost:8083/connectors
And check the status:
curl -s http://localhost:8083/connectors/file-sink/status
Finally, verify that data has been replicated between files and that the data is identical across both files.
Read the file
cat ~/connect-demo/output.txt
You would see the `output.txt` has 1 to 1000 like the `input.txt` file. That’s it, and if you change the `input.txt`, the output would be synced to update accordingly.
Finally, please note that Event Hubs support for the Kafka Connect API is in public preview. The deployed FileStreamSource and FileStreamSink connectors are not meant for production use. They are only used for demonstration purposes.
Originally published at http://victorleungtw.com on April 15, 2022.