Apache Kafka Connect to Azure Event Hubs

Victor Leung
4 min readApr 15, 2022

--

Recently I was doing integration with Azure Event Hubs. A colleague struggled to export the messages in an existing Kafka topic and import them to Event Hubs. Therefore I document the steps below, which you may find helpful.

Step 1: Get Kafka, download and extract it:

Apache Kafka is an open-source distributed event streaming platform. It helps build distributed system and ensures high throughput. The Apache Kafka can be downloaded from this address: https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.13-3.1.0.tgz

$ tar -xzf kafka_2.13–3.1.0.tgz$ cd kafka_2.13–3.1.0

Step 2: Start the Kafka environment

If your local environment already has Java 8+ installed, follow and run the below command to start all services. (If not, download and install Java: https://www.oracle.com/java/technologies/downloads/#jdk18-mac )

Run zookeeper service:

$ bin/zookeeper-server-start.sh config/zookeeper.properties

Start Kafka broker

$ bin/kafka-server-start.sh config/server.properties

Step 3: Create and setup config

Create a new file `connector.properties` with the values below.

bootstrap.servers={NAMESPACE.NAME}.servicebus.windows.net:9093security.protocol=SASL_SSLsasl.mechanism=PLAINsasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=”$ConnectionString” password=”Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}”;

And create a new file `connect-distributed.properties`

bootstrap.servers={NAMESPACE.NAME}.servicebus.windows.net:9093group.id=connect-cluster-group# connect internal topic names, automatically created by Kafka Connect with AdminClient API if not existsconfig.storage.topic=connect-cluster-configsoffset.storage.topic=connect-cluster-offsetsstatus.storage.topic=connect-cluster-status# internal topic replication factors — auto 3x replication in Azure Storageconfig.storage.replication.factor=1offset.storage.replication.factor=1status.storage.replication.factor=1rest.advertised.host.name=connectoffset.flush.interval.ms=10000connections.max.idle.ms=180000metadata.max.age.ms=180000key.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter=org.apache.kafka.connect.json.JsonConverterinternal.value.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter.schemas.enable=falseinternal.value.converter.schemas.enable=false# required EH Kafka security settingssecurity.protocol=SASL_SSLsasl.mechanism=PLAINsasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=”$ConnectionString” password=”Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}”;producer.security.protocol=SASL_SSLproducer.sasl.mechanism=PLAINproducer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=”$ConnectionString” password=”Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}”;consumer.security.protocol=SASL_SSLconsumer.sasl.mechanism=PLAINconsumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=”$ConnectionString” password=”Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}”;plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Replace the placeholder values with the Azure endpoint. Create a new namespace and deploy Events Hubs resources if you haven’t already in the Azure portal. Note that you might need to select pricing tier `Standard` or higher to create Kafka topics successfully in the next step.

The password above can be found in the settings of Event Hub namespace, `Shared access policies` and using the SAS Policy: `RootManageSharedAccessKey`. Copy the `Connection string–primary key` values and replace the above config file values.

Step 4: Create 3 kafka topics:

We would use `kafka-topics` commands to create the topics ourselves:

Create configs topic:

$ bin/kafka-topics.sh — bootstrap-server {NAMESPACE.NAME}.servicebus.windows.net:9093 — command-config path/to/connector.properties — create — topic CONFIGS-TOPIC-NAME — config cleanup.policy=compact — partitions 1 — replication-factor 1

If successful, you would see the response `Created topic CONFIGS-TOPIC-NAME.`

Create offsets topic:

$ bin/kafka-topics.sh — bootstrap-server {NAMESPACE.NAME}.servicebus.windows.net:9093 — command-config path/to/connector.properties — create — topic OFFSETS-TOPIC-NAME — config cleanup.policy=compact — partitions 25 — replication-factor 1

If successful, you would see the response `Created topic OFFSETS-TOPIC-NAME.`

Create status topic:

$ bin/kafka-topics.sh — bootstrap-server {NAMESPACE.NAME}.servicebus.windows.net:9093 — command-config path/to/connector.properties — create — topic STATUS-TOPIC-NAME — config cleanup.policy=compact — partitions 5 — replication-factor 1

If successful, you would see the response `Created topic STATUS-TOPIC-NAME.`

Step 5: Running Kafka Connect

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and Azure Event Hubs. It allows you to continuously ingest data from Azure Event Hubs into Kafka and vice versa. To continuously import/export your data into and out of Kafka, start worker locally in distributed mode.

$ bin/connect-distributed.sh path/to/connect-distributed.properties

With everything above up and running, you can test out the import and export in the next step.

Step 6: Create input and output files:

Create a directory and then create two files: one file with seed data from which the FileStreamSource connector will read and another to which our FileStreamSink connector will write.

$ mkdir ~/connect-demo$ seq 1000 > ~/connect-demo/input.txt$ touch ~/connect-demo/output.txt

Step 7: Create filestream source connector:

Next, I will walk you through spinning up FileStreamSource.

curl -s -X POST -H “Content-Type: application/json” — data ‘{“name”: “file-source”,”config”: {“connector.class”:”org.apache.kafka.connect.file.FileStreamSourceConnector”,”tasks.max”:”1",”topic”:”connect-demo”,”file”: “{YOUR/HOME/PATH}/connect-demo/input.txt”}}’ http://localhost:8083/connectors

And check the status:

curl -s http://localhost:8083/connectors/file-source/status

If it’s a success, it would respond:

{“name”:”file-source”,”connector”:{“state”:”RUNNING”,”worker_id”:”connect:8083"},”tasks”:[{“id”:0,”state”:”RUNNING”,”worker_id”:”connect:8083"}],”type”:”source”}

Step 8: Create FileStreamSink Connector

Similar to above, spinning up FileStreamSink connectors.

curl -X POST -H “Content-Type: application/json” — data ‘{“name”: “file-sink”, “config”: {“connector.class”:”org.apache.kafka.connect.file.FileStreamSinkConnector”, “tasks.max”:”1", “topics”:”connect-demo”, “file”: “{YOUR/HOME/PATH}/connect-demo/output.txt”}}’ http://localhost:8083/connectors

And check the status:

curl -s http://localhost:8083/connectors/file-sink/status

Finally, verify that data has been replicated between files and that the data is identical across both files.

Read the file

cat ~/connect-demo/output.txt

You would see the `output.txt` has 1 to 1000 like the `input.txt` file. That’s it, and if you change the `input.txt`, the output would be synced to update accordingly.

Finally, please note that Event Hubs support for the Kafka Connect API is in public preview. The deployed FileStreamSource and FileStreamSink connectors are not meant for production use. They are only used for demonstration purposes.

Originally published at http://victorleungtw.com on April 15, 2022.

--

--

Victor Leung
Victor Leung

Written by Victor Leung

I write about business, technology and personal development

No responses yet