Kafka Connect is an extensible framework that allows the moving of data from external systems into Kafka topics or the reverse. Kafka sink or source connectors are the components that allow the framework to connect to the external systems. You can use many sources and sink connectors for common data sources and sinks or implement connectors for custom use cases.

To run these connectors, we have the dependencies on Kafka broker and the Kafka Connect runtime. To reduce the burden of managing these dependencies we can rely on Oracle Cloud Infrastructure (OCI) Streaming service for the Kafka broker. OCI Streaming provides support for the Kafka Connect runtime by providing a Kafka Connect configuration. Each Connect configuration provides the configuration, status, and offset topics for Kafka Connect runtime to connect to. Kafka Connect runtime can be hosted on Oracle Container Engine for Kubernetes (OKE).

This blog post focuses on how to set up your Kafka Connect runtime with OCI Streaming on an OKE cluster.

Dependencies

Creating the Kafka Connect configuration

Log in to your cloud account and select Streaming under the Analytics section. Under the Kafka Connect configuration menu, select the option to create configuration.

A screenshot of the Kafka Connect Configurations page in the example compartment.

After creation, you use the names of configuration, status, and storage topics. Save them because we use them while configuring Kafka Connect Docker image.

A screenshot of the Kafka Connect Configuration Information tab on the details page.

Setting up the OKE

You can skip this section if you already have a K8 cluster running and want to utilize that for setting up the Kafka Connect runtime.

If you’re setting up the K8 cluster for the first time, you can use the Quick Create workflow provided in the Oracle Cloud Console under the “Kubernetes Clusters (OKE)” section. For more information, see Using the Console to create a Cluster with Default Settings in the ‘Quick Create’ workflow.

You can also use the Terraform scripts to quickly launch the cluster. A quick start for this method is available on GitHub.

Setting up Kafka Connect runtime on OKE

We start by creating a namespace in K8.


kubectl create ns kafka-connect

Create a properties file connect-distributed.env with the following content:


LOG_LEVEL=INFO

BOOTSTRAP_SERVERS=cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092
GROUP_ID=grp-1

CONFIG_STORAGE_TOPIC=<connect_configuration_ocid>-config
OFFSET_STORAGE_TOPIC=<connect_configuration_ocid>-offset
CONNECT_STATUS_STORAGE_TOPIC=<connect_configuration_ocid>-status

CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000
CONNECT_SECURITY_PROTOCOL=SASL_SSL
CONNECT_SASL_MECHANISM=PLAIN
CONNECT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy>/<user>/<stream_pool_ocid>" password="<auth_code>";

CONNECT_PRODUCER_BUFFER_MEMORY=10240
CONNECT_PRODUCER_BATCH_SIZE=2048
CONNECT_PRODUCER_SASL_MECHANISM=PLAIN
CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL
CONNECT_PRODUCER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="<tanancy>/<user>/<stream_pool_ocid>" password="<auth_code>";

CONNECT_CONSUMER_SASL_MECHANISM=PLAIN
CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL
CONNECT_CONSUMER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="<teanancy>/<user>/<stream_pool_ocid>" password="<auth_code>";

Run the following command to create the secret by the name “kafka-connect-config” in “kafka-connect” namespace. These environment variables are available to containers that run Kafka Connect runtime.


kubectl create secret generic kafka-connect-config --from-env-file=connect-distributed.env -n kafka-connect

Create a deployment file (kafka-connect.yaml) with the following contents:


apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-connect-deployment
spec:
  selector:
    matchLabels:
      app: kafka-connect
  replicas: 1 # tells deployment to run 1 pods matching the template
  template: # create pods using pod definition in this template
    metadata:
      labels:
        app: kafka-connect
    spec:
      containers:
      - image: debezium/connect:1.2.4.Final
        name: kafka-connect
        ports:
        - containerPort: 8083
        envFrom:
          - secretRef:
              name: kafka-connect-config

---

apiVersion: v1
kind: Service
metadata:
  name: kafka-connect-lb
  labels:
    app: kafka-connect-lb
  annotations:
    oci.oraclecloud.com/load-balancer-type: "lb"
spec:
  type: LoadBalancer
  ports:
  - port: 80
    protocol: TCP
    targetPort: 8083
  selector:
    app: kafka-connect

Note the following details of this deployment:

  • Kafka runtime uses the debezium/connect:1.2.4.Final Docker image.

  • Containers in the deployment expose port 8083 for the Kafka Connect runtime. This port listens for the REST API queries. For reference for the API, see the Kafka documentation.

  • Environment variables in the container are set using the secret “kafka-connect-config.”

  • We create a load balancer, which balances the traffic across the multiple pods deployed. We bind port 80 of the load balancer with port 8083 of the pods. For more information on the load balancer in OKE, see Defining Kubernetes Services of Type LoadBalancer.

Create the deployment in the kafka-connect namespace of K8 cluster:


kubectl apply -f kafka-connect.yml -n kafka-connect

Check the logs of the container to verify if the pod started successfully.


kubectl logs --follow deployment/kafka-connect-deployment -n kafka-connect

You can also check the deployment in the k8 dashboard. For more information on configuring K8 dashboard on OKE, see Accessing a Cluster Using the Kubernetes Dashboard.

A screenshot of the deployments of K8 dashboard.

Now you’re set to interact with your Kafka Connect runtime and start the connectors by using the Kafka Connect REST API. The following code block shows an example to run the file source connector:


curl -i -X POST -H "Content-Type: application/json" -d "@file_connector.json" http://<ip and port of lb>/connectors

Contents of file_connector.json:


{
    "name": "file-connector",
    "config": {
       "connector.class": "FileStreamSource",
       "tasks.max": "1",
       "topic": "from_log_file",
        "batch.size": "100",
       "file": "/kafka/logs/connect-service.log"
    }
}

The connector, which comes bundled with the Kafka Connect runtime, moves the content of the file to the topic. Here, we start the connector to move the content of the log file to the topic by the name “from_log_file.”

After running this connector, you can verify the content of the topic from the Oracle Cloud Console by clicking load messages.

A screenshot of the Stream Details page showing the Stream Information tab in the Oracle Cloud Console.

Conclusion

The Kafka Connect ecosystem has many connectors that you can use to build data pipelines. To enable this capability, OCI Streaming and OKE provide the platform to run your Kafka Connect runtime.

Try out the solution yourself! Sign up for the Oracle Cloud Free Tier or sign in to your account.