PubNub Kafka Sink Connector
This guide provides steps for deploying PubNub Kafka Sink Connector to integrate Apache Kafka with the PubNub real-time messaging service and let you forward data from Apache Kafka topics directly to PubNub channels.
You can choose between two paths: testing with a preconfigured Docker Compose setup or deploying the connector to a production Kafka environment.
Stream events from PubNub to Kafka
Apart from receiving Kafka events in PubNub, PubNub lets you stream PubNub events to Kafka through Kafka Action.
Prerequisites
- I'm just testing
- Use for production
You need these to get started:
- Docker
- Access to PubNub's Admin Portal
You need these to get started:
- Apache Kafka and Kafka Connect setup
- Maven 3.8.6+
- Java 11+
- Access to PubNub's Admin Portal
Steps
Follow the installation steps for test or production environments.
- I'm just testing
- Use for production
-
Clone the
pubnub-kafka-sink-connector
repository to your local machine.git clone git@github.com:pubnub/pubnub-kafka-sink-connector.git
-
Change your current directory to the one that contains the cloned source code.
cd pubnub-kafka-sink-connector
-
Log in to the Admin Portal and get your development publish and subscribe keys from your app's keyset on the Admin Portal.
-
Modify the default configuration options of PubNub Kafka Sink Connector in the
examples/pubnub-sink-connector.json
file and place your keys from the Admin Portal underpublish_key
andsubscribe_key
. -
Use Docker Compose to build the ready Kafka and Kafka Connect images.
docker compose up
-
Deploy the connector to the Kafka Connect cluster.
curl -X POST \
-d @examples/pubnub-sink-connector.json \
-H "Content-Type:application/json" \
http://localhost:8083/connectors -
Verify if the connector works.
Sample producer included in PubNub Kafka Sink Connector will generate test messages (like
{"timestamp":1705689453}
) every few seconds.You can use the Debug Console to verify that these messages are published on one of the predefined PubNub channels. Ensure to provide your publish key, subscribe key specified during configuration,
pubnub
as the channel, and a user ID. -
Once you're done testing, undeploy the connector from Kafka Connect.
curl -X DELETE \
http://localhost:8083/connectors/pubnub-sink-connector -
Stop the Kafka and Kafka Connect containers using Docker Compose.
docker compose down
-
Clone the
pubnub-kafka-sink-connector
repository to your local machine.git clone git@github.com:pubnub/pubnub-kafka-sink-connector.git
-
Change your current directory to the one that contains the cloned source code.
cd pubnub-kafka-sink-connector
-
Log in to the Admin Portal and get your production publish and subscribe keys from your app's keyset on the Admin Portal.
-
Edit the default configuration options in
examples/pubnub-sink-connector-test.json
and put your production details, such as the Kafka topics and PubNub API keys. -
Compile PubNub Kafka Sink Connector locally.
Run the following command in the root directory of your connector source code to compile it into a JAR file:
mvn clean package
After running the command, a file named
pubnub-kafka-connector-1.x.jar
will be created in the target directory. -
Add the packaged connector as a Kafka Connect plugin.
Use the created JAR file to deploy the connector to a Kafka Connect cluster by copying it to the appropriate directory and configuring it accordingly. The exact steps for deploying and configuring Kafka Connect will depend on the specific Kafka Connect cluster setup and requirements.
-
Fill in your Kafka Connect host address (
your_kafka_connect_host
) and run the command to deploy the connector to your production Kafka Connect cluster.curl -X POST \
-d @examples/pubnub-sink-connector.json \
-H "Content-Type:application/json" \
http://your_kafka_connect_host:8083/connectorsAny new data sent to the Kafka topics will be copied to the target devices listening on the PubNub channels defined in the configuration file.
Configuration options
The configuration from the examples/pubnub-sink-connector-test.json
file defines how PubNub Kafka Sink Connector will be set up.
Here's a breakdown of each parameter, detailing its purpose and whether it's required or optional.
{
"name": "pubnub-sink-connector",
"config": {
"topics": "pubnub,pubnub1,pubnub2",
"topics.regex": "",
"pubnub.user_id": "myUserId123",
"pubnub.publish_key": "demo",
"pubnub.subscribe_key": "demo",
"pubnub.secret_key": "demo",
"connector.class": "com.pubnub.kafka.connect.PubNubKafkaSinkConnector",
"tasks.max": "3",
"value.deserializer": "custom.class.serialization.JsonDeserializer",
"value.serializer": "custom.class.serialization.JsonSerializer",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "my_dlq_topic",
show all 18 linesParameter | Required | Description |
---|---|---|
name | Yes | Unique name for the connector instance used to identify it. |
topics | Yes | Comma-separated list of Kafka topics from which the connector will consume messages. The Kafka topic name will match PubNub channel name. At least one of topics or topics.regex is required. |
topics.regex | Yes | Java regular expression for matching Kafka topics from which the connector will consume messages, like "topic.*" . The Kafka topic name will match PubNub channel name. At least one of topics or topics.regex is required. |
pubnub.user_id | Yes | UTF-8 encoded, unique string of up to 92 characters used to identify a single client (end user, device, or server) that connects to PubNub. |
pubnub.publish_key | Yes | PubNub publish key used to authenticate requests for publishing messages. |
pubnub.subscribe_key | Yes | PubNub subscribe key, part of standard key configuration. It is necessary even if subscribing isn't the primary function of this sink connector. |
pubnub.secret_key | No | PubNub secret key used for secure server-to-server communication. Not mandatory for testing purposes. |
pubnub.router | No | A router class that processes messages flowing through the KafkaSinkConnector allowing for custom channel ID and message payload overrides. For more information, refer to pubnub.router . |
connector.class | Yes | Java class of the connector to specify which connector implementation to use. Use the default value. |
tasks.max | Yes | Maximum number of tasks the connector is allowed to create, controlling the parallelism of the connector. |
value.deserializer | Yes | Class used to deserialize the values of messages consumed from Kafka. It is necessary if data format in Kafka requires specific deserialization. Use the default value. |
value.serializer | Yes | Class used to serialize messages before publishing to PubNub. It is necessary if the PubNub API expects a specific payload format. Use the default value. |
errors.tolerance | No | Property that determines how the connector handles errors when operating. Must be set to "all" (then it skips over problematic records and continuous processing). Requires dead letter queue configuration on your Kafka Connect cluster. Read Confluent docs for details. |
errors.deadletterqueue.topic.name | No | Name of the topic used for routing messages that fail to process properly. Requires dead letter queue configuration on your Kafka Connect cluster. Read Confluent docs for details. |
errors.deadletterqueue.topic.replication.factor | No | Property that defines how many copies of the dead letter records should be replicated across the Kafka cluster. Requires dead letter queue configuration on your Kafka Connect cluster. Read Confluent docs for details. |
Topic configuration options
You must specify at least one of topics
or topics.regex
for the connector to know from which Kafka topics to consume messages.
pubnub.router
The class you provide as the value of the pubnub.router
parameter must satisfy the following conditions:
- Implement the
com.pubnub.kafka.connect.PubNubKafkaRouter
interface and have a public, no arguments constructor.
public interface PubNubKafkaRouter {
ChannelAndMessage route(SinkRecord record);
class ChannelAndMessage {
String channel;
Object message;
public ChannelAndMessage(String channel, Object message) {
this.channel = channel;
this.message = message;
}
}
}