Kafka Connect with Couchbase

About Kafka

Apache Kafka is a distributed persistent message queuing system. It is used in order to realize publish-subscribe use cases, process streams of data in real-time and store a stream of data safely in a distributed replicated cluster. That said Apache Kafka is not a database system but can stream data from a database system in near-real-time. The data is represented as a message stream with Kafka. Producers put messages in a so called message topic and Consumers take messages out of it for further processing. There is a variety of connectors available. A short introduction to Kafka can be found here: https://www.youtube.com/watch?v=fFPVwYKUTHs . This video explains the basic concepts and how Producers and Consumers are looking like. However, Couchbase supports 'Kafka Connect' since version 3.1 of it's connector. The Kafka documentation says "Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka.". Kafka provides a common framework for Kafka connectors. It can run in a distributed or standalone mode and it distributed and scalable by default.

Setup

Kafka uses Apache Zookeeper. Zookeeper is a cluster management service. The documentation states that "ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications ... ZooKeeper aims at distilling the essence of these different services into a very simple interface to a centralized coordination service."

After downloading and extracting the standard distribution of Apache Kafka, you can start a local Zookeeper instance by using the default configuration the following way:


The next step is to configure 3 Kafka message broker nodes. We will run these services just on the same host for demoing purposes but it's obvious that they can also run more distributed.  In order to so we need to create configurations for the broker servers. So copy the config/server.properties file to server-1.properties and server-2.properties and then edit it. The file 'sever.properties' has the following settings:


Let's assume that $i is the id of the broker. So the first broker has id '0', listens on port 9092 and logs to 'kafka-logs-0'. The second broker has the id '1', listens on port 9093 and logs to 'kafka-logs-1'. The third broker configuration is self-explaining.


The next step is to download and install and  Couchbase Plug-in. Just copy the related libraries to the libs sub-folder and the configuration files to the config sub-folder of your Kafka installation.


Streaming data from Couchbase

Before we can stream data from Couchbase we need to create a topic to which we want to stream to. So let's create a topic which is named 'test-cb'.


You can then describe this topic by using the following command:


The topic which we created has 3 partitions. Each node is the leader for 1 partition. The leader is the node responsible for all reads and writes for the given partition. The Replicas is the list of nodes that replicate the log for this partition.

Now let's create a configuration file for distributed workers under 'config/couchbase-distributed.properties':


The Connect settings are more or less the default ones. Now we also have to provide the connector settings. If using the distributed mode then the settings have to be provided by registering the connector via the Connect REST service:


The configuration file 'couchbase-distributed.json' has a name attribute and an embedded object with the configuration settings:


The Couchbase settings refer to a Couchbase bucket and the topic name to which we want to stream DCP messages out of Couchbase. In order to run the Connect workers in distributed mode, we can now execute:


The log file contains information about the tasks. We configured 2 tasks to run. The output contains the information which task is responsible for which Couchbase shards (vBuckets):


For now let's just consume the 'test-cb' messages by using a console logging consumer:


One entry looks as the following one:


We just used the standard value converter. The value is in reality a JSON document but represented as Base64 encoded string in this case.

Another article will explain how to use Couchbase via Kafka Connect as the sink for messages.

Comments