PubNub Kafka Sink Connector

This guide provides steps for deploying PubNub Kafka Sink Connector to integrate Apache Kafka with the PubNub real-time messaging service and let you forward data from Apache Kafka topics directly to PubNub channels.

You can choose between two paths: testing with a preconfigured Docker Compose setup or deploying the connector to a production Kafka environment.

Stream events from PubNub to Kafka

Apart from receiving Kafka events in PubNub, PubNub lets you stream PubNub events to Kafka through Kafka Action.

Prerequisites

You need these to get started:

Steps

Follow the installation steps for test or production environments.

  1. Clone the pubnub-kafka-sink-connector repository to your local machine.

    git clone git@github.com:pubnub/pubnub-kafka-sink-connector.git
  2. Change your current directory to the one that contains the cloned source code.

    cd pubnub-kafka-sink-connector
  3. Log in to the Admin Portal and get your development publish and subscribe keys from your app's keyset on the Admin Portal.

  4. Modify the default configuration options of PubNub Kafka Sink Connector in the examples/pubnub-sink-connector.json file and place your keys from the Admin Portal under publish_key and subscribe_key.

  5. Use Docker Compose to build the ready Kafka and Kafka Connect images.

    docker compose up
  6. Deploy the connector to the Kafka Connect cluster.

    curl -X POST \
    -d @examples/pubnub-sink-connector.json \
    -H "Content-Type:application/json" \
    http://localhost:8083/connectors
  7. Verify if the connector works.

    Sample producer included in PubNub Kafka Sink Connector will generate test messages (like {"timestamp":1705689453}) every few seconds.

    You can use the Debug Console to verify that these messages are published on one of the predefined PubNub channels. Ensure to provide your publish key, subscribe key specified during configuration, pubnub as the channel, and a user ID.

  8. Once you're done testing, undeploy the connector from Kafka Connect.

    curl -X DELETE \
    http://localhost:8083/connectors/pubnub-sink-connector
  9. Stop the Kafka and Kafka Connect containers using Docker Compose.

    docker compose down

Configuration options

The configuration from the examples/pubnub-sink-connector-test.json file defines how PubNub Kafka Sink Connector will be set up.

Here's a breakdown of each parameter, detailing its purpose and whether it's required or optional.

{
"name": "pubnub-sink-connector",
"config": {
"topics": "pubnub,pubnub1,pubnub2",
"topics.regex": "",
"pubnub.user_id": "myUserId123",
"pubnub.publish_key": "demo",
"pubnub.subscribe_key": "demo",
"pubnub.secret_key": "demo",
"connector.class": "com.pubnub.kafka.connect.PubNubKafkaSinkConnector",
"tasks.max": "3",
"value.deserializer": "custom.class.serialization.JsonDeserializer",
"value.serializer": "custom.class.serialization.JsonSerializer",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "my_dlq_topic",
show all 18 lines
ParameterRequiredDescription
nameYesUnique name for the connector instance used to identify it.
topicsYesComma-separated list of Kafka topics from which the connector will consume messages. The Kafka topic name will match PubNub channel name. At least one of topics or topics.regex is required.
topics.regexYesJava regular expression for matching Kafka topics from which the connector will consume messages, like "topic.*". The Kafka topic name will match PubNub channel name. At least one of topics or topics.regex is required.
pubnub.user_idYesUTF-8 encoded, unique string of up to 92 characters used to identify a single client (end user, device, or server) that connects to PubNub.
pubnub.publish_keyYesPubNub publish key used to authenticate requests for publishing messages.
pubnub.subscribe_keyYesPubNub subscribe key, part of standard key configuration. It is necessary even if subscribing isn't the primary function of this sink connector.
pubnub.secret_keyNoPubNub secret key used for secure server-to-server communication. Not mandatory for testing purposes.
pubnub.routerNoA router class that processes messages flowing through the KafkaSinkConnector allowing for custom channel ID and message payload overrides. For more information, refer to pubnub.router.
connector.classYesJava class of the connector to specify which connector implementation to use. Use the default value.
tasks.maxYesMaximum number of tasks the connector is allowed to create, controlling the parallelism of the connector.
value.deserializerYesClass used to deserialize the values of messages consumed from Kafka. It is necessary if data format in Kafka requires specific deserialization. Use the default value.
value.serializerYesClass used to serialize messages before publishing to PubNub. It is necessary if the PubNub API expects a specific payload format. Use the default value.
errors.toleranceNoProperty that determines how the connector handles errors when operating. Must be set to "all" (then it skips over problematic records and continuous processing). Requires dead letter queue configuration on your Kafka Connect cluster. Read Confluent docs for details.
errors.deadletterqueue.topic.nameNoName of the topic used for routing messages that fail to process properly. Requires dead letter queue configuration on your Kafka Connect cluster. Read Confluent docs for details.
errors.deadletterqueue.topic.replication.factorNoProperty that defines how many copies of the dead letter records should be replicated across the Kafka cluster. Requires dead letter queue configuration on your Kafka Connect cluster. Read Confluent docs for details.
Topic configuration options

You must specify at least one of topics or topics.regex for the connector to know from which Kafka topics to consume messages.

pubnub.router

The class you provide as the value of the pubnub.router parameter must satisfy the following conditions:

  • Implement the com.pubnub.kafka.connect.PubNubKafkaRouter interface and have a public, no arguments constructor.
public interface PubNubKafkaRouter {
ChannelAndMessage route(SinkRecord record);

class ChannelAndMessage {
String channel;
Object message;

public ChannelAndMessage(String channel, Object message) {
this.channel = channel;
this.message = message;
}
}
}
Last updated on