MongoDB Kafka Connector

  • A fault-tolerant runtime for transferring data to and from datastores.
  • A framework for the Apache Kafka community to share solutions for connecting Apache Kafka to different datastores.
$ curl https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz -o kafka_2.13-3.2.0.tgz
$ tar -xzf kafka_2.13-3.2.0.tgz
$ cd kafka_2.13-3.2.0
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties
curl -L https://search.maven.org/remotecontent?filepath=org/mongodb/kafka/mongo-kafka-connect/1.7.0/mongo-kafka-connect-1.7.0-all.jar -o plugin/mongo-kafka-connect-1.7.0-all.jar
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#plugin.path=
plugin.path=/home/ubuntu/kafka_2.13-3.2.0/libs/mongo-kafka-connect-1.7.0-all.jar
name=mongo-sink
topics=quickstart.sampleData
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
# Message types
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# Specific global MongoDB Sink Connector configuration
connection.url=mongodb://localhost:27017
database=quickstart
collection=topicData
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler
name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
# Connection and source configuration
connection.uri=mongodb://localhost:27017
database=quickstart
collection=sampleData
wget -qO - https://www.mongodb.org/static/pgp/server-5.0.asc | sudo apt-key add -
echo "deb [ arch=amd64,arm64 ] https://repo.mongodb.org/apt/ubuntu focal/mongodb-org/5.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-5.0.list
sudo apt-get update
sudo apt-get install -y mongodb-org
The following packages have unmet dependencies:mongodb-org-mongos : Depends: libssl1.1 (>= 1.1.1) but it is not installablemongodb-org-server : Depends: libssl1.1 (>= 1.1.1) but it is not installablemongodb-org-shell : Depends: libssl1.1 (>= 1.1.1) but it is not installableE: Unable to correct problems, you have held broken packages.
echo "deb http://security.ubuntu.com/ubuntu impish-security main" | sudo tee /etc/apt/sources.list.d/impish-security.list

sudo apt-get update
sudo apt-get install libssl1.1
sudo systemctl status mongod
sudo systemctl restart mongod
bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties config/MongoSinkConnector.properties
$ bin/kafka-console-producer.sh --topic connect-test --bootstrap-server localhost:9092
This is my first event
This is my second event
mongosh mongodb://127.0.0.1:27017/
use quickstart
db.sampleData.insertOne({"hello":"world"})
db.topicData.find())
[
{
_id: ObjectId(...),
hello: 'world',
travel: 'MongoDB Kafka Connector'
}
]

--

--

I’m a keen traveler to see every country in the world, passionate about cutting edge technologies.

Love podcasts or audiobooks? Learn on the go with our new app.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Victor Leung

Victor Leung

311 Followers

I’m a keen traveler to see every country in the world, passionate about cutting edge technologies.