Apache Kafka is an open-source publish/subscribe messaging system. And Kafka Connect is a component of Apache Kafka that solves the problem of connecting Apache Kafka to datastores such as MongoDB. Kafka Connect solves this problem by providing the following resources:
- A fault-tolerant runtime for transferring data to and from datastores.
- A framework for the Apache Kafka community to share solutions for connecting Apache Kafka to different datastores.
We would focus on using MongoDB as a data lake for our use case. And the MongoDB Kafka sink connector is a Kafka Connect connector that reads data from Apache Kafka and writes data to MongoDB. The official MongoDB Kafka Connector is at https://github.com/mongodb/mongo-kafka
Start Kafka Environment
Download the latest Kafka: https://www.apache.org/dyn/closer.cgi?path=/kafka/3.2.0/kafka_2.13-3.2.0.tgz
$ curl https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz -o kafka_2.13-3.2.0.tgz
$ tar -xzf kafka_2.13-3.2.0.tgz
$ cd kafka_2.13-3.2.0
Run the following commands to start all services in the correct order. Start the ZooKeeper service.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Open another terminal session and run. Start the Kafka broker service:
$ bin/kafka-server-start.sh config/server.properties
Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.
Download the jar: https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect and change directory to folder /libs
curl -L https://search.maven.org/remotecontent?filepath=org/mongodb/kafka/mongo-kafka-connect/1.7.0/mongo-kafka-connect-1.7.0-all.jar -o plugin/mongo-kafka-connect-1.7.0-all.jar
config/connect-standalone.properties, change the
plugin.path configuration property match the path to the jar:
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
Create config properites
In /config folder, create file
connector.class=com.mongodb.kafka.connect.MongoSinkConnector# Message types
value.converter.schemas.enable=false# Specific global MongoDB Sink Connector configuration
In /config folder, create file
connector.class=com.mongodb.kafka.connect.MongoSourceConnector# Connection and source configuration
Import the public key used by the package management system. From a terminal, issue the following command to import the MongoDB public GPG Key from https://www.mongodb.org/static/pgp/server-5.0.asc
wget -qO - https://www.mongodb.org/static/pgp/server-5.0.asc | sudo apt-key add -
2. Create the
/etc/apt/sources.list.d/mongodb-org-5.0.list file for Ubuntu 20.04 (Focal):
echo "deb [ arch=amd64,arm64 ] https://repo.mongodb.org/apt/ubuntu focal/mongodb-org/5.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-5.0.list
3. Reload local package database.
sudo apt-get update
4. Install the MongoDB packages. Issue the following
sudo apt-get install -y mongodb-org
If you hit an error with:
The following packages have unmet dependencies:mongodb-org-mongos : Depends: libssl1.1 (>= 1.1.1) but it is not installablemongodb-org-server : Depends: libssl1.1 (>= 1.1.1) but it is not installablemongodb-org-shell : Depends: libssl1.1 (>= 1.1.1) but it is not installableE: Unable to correct problems, you have held broken packages.
Fix it with the command below:
echo "deb http://security.ubuntu.com/ubuntu impish-security main" | sudo tee /etc/apt/sources.list.d/impish-security.list
sudo apt-get updatesudo apt-get install libssl1.1
5. Verify that MongoDB has started successfully.
sudo systemctl status mongod
And if it’s inactive and needs to restart, run
sudo systemctl restart mongod
Start the Kafka Connect
Run the command:
bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties config/MongoSinkConnector.properties
Write some data on the topic
A Kafka client communicates with the Kafka brokers via the network for writing events. Once received, the brokers will store the events in a durable and fault-tolerant manner for as long as you need-even forever.
Run the console producer client to write a few events into your topic. By default, each line you enter will result in a separate event being written to the topic.
$ bin/kafka-console-producer.sh --topic connect-test --bootstrap-server localhost:9092
This is my first event
This is my second event
Send the Contents of a Document through Your Connectors
To send the contents of a document through your connectors, insert a document into the MongoDB collection from which your source connector reads data.
To insert a new document into your collection, enter the MongoDB shell from the shell in your Docker container using the following command:
From the MongoDB shell, insert a document into the sampleData collection of the quickstart database using the following commands:
After you insert a document into the sampleData collection, confirm that your connectors processed the change. Check the contents of the topicData collection using the following command:
You should see output that resembles the following:
travel: 'MongoDB Kafka Connector'
Reference: MongoDB Kafka Connector: https://www.mongodb.com/docs/kafka-connector/current/