How to Throttle or Aggregate Messages with PubNub
Applications that send messages at a high frequency need to consider how readable those messages are for recipients.
Consider a live event where you might have thousands or tens of thousands of participants watching a stream, all reacting and sending chat messages in a large group chat. How do you make sense of all those messages? How many messages do you expect your participants to be able to read before the chat stream just becomes overwhelming?
Some of our live event customers will group participants into cohorts or groups, so any one viewer only sees the comments from a limited number of other viewers. This works very well, but different audience members see different messages… what if you want the entire audience to have the same experience at your event?
Throttling and/or aggregating messages are the most effective ways to ensure that every chat participant has a consistent view of the global conversation. The trade-off is that only a subset of the total number of messages sent are ever seen.
-
Throttling: Restrict the number of messages a participant receives in a given time interval. In general, any throttling system will keep track of the number of messages received over time, and when the rate exceeds a preset threshold, some of the messages will be discarded.
-
Aggregation: Rather than present all information to all participants as it happens, summarize the data into more manageable chunks and only show the summaries to participants. This works especially well when messages are not ‘information dense,’ for example, ”3 users joined” rather than list each join event separately.
A Generic Approach to Throttling
Throttling will be described in detail, but the same process can be applied to data aggregation, which will be discussed later.
Generally, two conceptual processes are required to throttle data at scale.
-
A process to determine the current incoming message rate and determine if that rate has exceeded a preset threshold. This threshold is commonly configurable at runtime to allow for expected spikes. Different thresholds might also be applied based on the type of data - For example, with chat, different languages have different lexical densities, so they are often given different throttling thresholds.
-
A process to throttle messages if the threshold rate has been exceeded. A throttled message might be delayed (republished), discarded entirely, or saved for offline processing without being presented to the current audience.
Throttling PubNub messages
To throttle messages with PubNub, two different serverless functions should be used:
- Determine the throttling rate
- Use the subscribeOnInterval function to analyze traffic on the desired channels, and if it exceeds some specified desired limit, calculate the throttling rate. Please note that the SubscribeOnInterval function type is only available to whitelisted users. Please contact us to discuss using this in your application
- Throttle messages
- Use the beforePublish function to intercept messages synchronously and, based on the channel’s throttling rate, only allow a subset of messages to reach their intended recipients.
Throttling PubNub Messages - Pseudocode
In pseudocode, the solution will work as follows:
Function: subscribeOnInterval executes every n seconds and listens for all messages on the desired channels. For example, you might run the function at 5-second intervals and listen for channels that match chat.*
, e.g., chat.en
or chat.it
. Running at higher intervals will allow the algorithm to better adapt to changing message rates as they spike, but results in more function invocations. For more details about subscribeOnInterval, please refer to the documentation.
On launch: First initialize an array of channels to monitor along with the desired number of messages on each channel per second
On Message Received: IF (Message received on a channel that is being tracked) THEN (Increment the number of messages received on that tracked channel)
On Interval: FOREACH (Channel being monitored) IF (Number of messages received during the last interval exceeded the desired number) THEN (Calculate the percentage of messages that should have been discarded so the desired message receive rate was not exceeded. This is the throttling rate) SEND the latest throttling rate for each channel to the function responsible for throttling messages using the kvstore module.
Function: beforePublish executes before any messages are received by their intended recipients.
On Message Received: IF (Channel the message was received on is subject to a throttling rate) THEN (Calculate whether or not to forward the message to the intended recipient. The likelihood of whether a message is allowed through is based on the throttling rate, so a 50% throttling rate will only allow half of the messages through.)
Throttling PubNub Messages - A Real Example The following code can be copied to 2 PubNub functions to see an example of message throttling. Create a new application in the PubNub admin portal and add the following two functions to it:
Determine the throttling rate
Function name: DetermineThrottlingRate
Event type: Subscribe with On Interval
Interval: 5000
Channel name: chat.*
Code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
const kv = require('kvstore'); const TRACKED_CHANNELS = { 'chat.it': { rpnLimit: 5, }, 'chat.en': { rpnLimit: 5, // Limit of 5 messages per second }, }; let trackedChannelSettings = null; const initTrackedChannelSettings = () => { if (trackedChannelSettings) { return; } trackedChannelSettings = Object.fromEntries( Object.entries(TRACKED_CHANNELS) .map(([channel, config]) => [channel, { ...config, throttlingRate: 0, currentRPN: 0, }]) ); }; // ---------------------- // -- Interval Handler -- // ---------------------- const KV_KEY_THROTTLER_CONFIG = 'throttler_config'; let trackingWindowMillis = null; const intervalEventHandler = (event) => { trackingWindowMillis = event.execInterval; // Calculate throttling rates for upcoming period for each channel Object.values(trackedChannelSettings) .forEach((config) => { if (!config.rpnLimit || !config.currentRPN) { // do nothing, we just don't throttle this channel regardless of RPN config.throttlingRate = 0; } else { const currentReal = (config.currentRPN / (100 - config.throttlingRate)) * 100; const throttleRate = Math.round((1 - (config.rpnLimit / currentReal)) * 100); config.throttlingRate = throttleRate < 1 ? 0 : throttleRate; //console.log('Setting throttling rate to ' + config.throttlingRate); } }); const newConfig = { configUpdatePeriodMillis: trackingWindowMillis, channelSettings: trackedChannelSettings, }; // Send the throttling rate through kvstore to the beforePublish function return kv.set(KV_KEY_THROTTLER_CONFIG, newConfig).then(() => { // end of an interval, so reset counters Object.values(trackedChannelSettings) .forEach((config) => config.currentRPN = 0); return event.ok(); }); } // --------------------- // -- Publish Handler -- // --------------------- const publishEventHandler = (request) => { for (const channel of request.channels) { const isTrackedChannel = channel in trackedChannelSettings; if (isTrackedChannel) { trackedChannelSettings[channel].currentRPN = (trackedChannelSettings[channel]?.currentRPN || 0) + 1; //console.log('Setting channel ' + channel + ' rpn to ' + trackedChannelSettings[channel]?.currentRPN) } } return request.ok(); } // ------------------- // -- Main function -- // ------------------- export default (event) => { initTrackedChannelSettings(); return event.verb === 'interval' ? intervalEventHandler(event) : publishEventHandler(event); };
Throttle messages
Function name: ThrottleMessages
Event type: Before Publish or Fire
Channel name: chat.*
Code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
const kv = require('kvstore'); const KV_KEY_THROTTLER_CONFIG = 'throttler_config'; let config = { configUpdatePeriodMillis: 20 * 1000, // 20s }; let lastKVConfigFetchTimestamp = null; export default (request) => { let promise = Promise.resolve(); // refetch config from KV periodically if (!lastKVConfigFetchTimestamp || (Date.now() - lastKVConfigFetchTimestamp) > config.configUpdatePeriodMillis ) { promise = kv.get(KV_KEY_THROTTLER_CONFIG).then((refetchedConfig) => { lastKVConfigFetchTimestamp = Date.now(); config = refetchedConfig; //console.info('Local config was updated to: ', config); }); } const channel = request.channels[0]; return promise.then(() => { const throttlingPercentage = config?.channelSettings?.[channel]?.throttlingRate || 0; if (throttlingPercentage > 0) { // The throttling rate for the channel on which the messages was received was non-zero. const throttlingDecimal = throttlingPercentage / 100; const rand = Math.random(); if (rand <= throttlingDecimal) { // todo take some action when the message was throttled, e.g. use PubNub’s Events & Actions feature to store offline for later analysis console.log('Request to ' + request.channels[0] + ' was throttled (%age was ' + throttlingPercentage + ')'); } else { console.log('Request to ' + request.channels[0] + ' was NOT throttled (%age was ' + throttlingPercentage + ')'); } } else { console.log('Request to ' + request.channels[0] + ' was NOT throttled (%age was 0)'); } return request.ok(); }); }
Testing
Once you have created the two above functions, you can test that they are working correctly by sending a message to the ‘chat.en’ channel repeatedly. After a few seconds of sending repeated messages you will see console logs indicating that some of the messages were throttled
1 2 3 4 5
8:16:02js: "Request to chat.en was NOT throttled (%age was 78)" 8:16:03js: "Request to throttled-chat.en was throttled (%age was 78)" 8:16:03js: "Request to throttled-chat.en was throttled (%age was 78)" 8:16:03js: "Request to chat.en was NOT throttled (%age was 78)" 8:16:04js: "Request to throttled-chat.en was throttled (%age was 78)"
Taking action on throttled messages
The simplest action you could take for a throttled message would be to discard it but in doing so you would lose potentially valuable analytics data.
With PubNub’s Events & Actions feature, you can redirect any throttled message to be stored in an external service such as Kinesis or S3 for later retrieval and analysis.
To set up Events & Actions for your throttled messages, first forward any throttled message to a dedicated channel, used exclusively for this purpose:
1 2 3 4 5 6 7 8 9
// todo take some action when the message was throttled, e.g. use PubNub’s Events & Actions feature to store offline for later analysis return pubnub.publish({ "channel": "throttled-messages", "message": request.message }).then((publishResponse) => { return request.ok(); }).catch((err) => { console.error(err); });
Next, configure Events & Actions to listen for a message event and perform the desired action.
Finally, create an action to be performed after the event is processed, at the time of writing the available actions are:
- A custom webhook
- Amazon SQS
- Amazon Kinesis
- Amazon S3 with batching.
But this list is continuously expanding
Aggregating PubNub messages
For high-volume messaging systems, you might want to summarize events and periodically notify participants with that summary rather than try to keep the audience informed of every small change. The example given earlier described users joining a live stream where the number of attendees becomes very large; it does not make sense to notify everyone whenever each person reacts, but it would be interesting to know that “26 users reacted with ♥️” for example.
A generic approach to message aggregation might look something like this:
- An event occurs that is subject to aggregation, for example, “User A has reacted with ♥️”
- This event should not be distributed to all attendees but instead captured in some central location.
- Each event should be categorized; for example, you might have the following events you want to aggregate: “User reacted with ♥️,” “User reacted with a smiley face,” or “User reacted with a frowny face.”
- Every time an event occurs in each category, keep a running total.
- At some periodic interval, notify all recipients with a summary of everything that happened during the interval. To continue the earlier examples, the summaries might be “26 people ♥️ this”, “17 people like this”, or “2 people are sad.”
Aggregating PubNub messages - Pseudocode
In pseudocode, an aggregation solution would work as follows:
Function: subscribeOnInterval executes every n seconds and listens for all messages on the desired channels. For example, you might run the function at 10-second intervals and listen for channels that match ‘reaction.*’, e.g., ‘reaction.smile’ or 'reaction.frown’.
On launch: First, initialize an array of channels to monitor (in this example, ‘reaction.smile’ and 'reaction.frown’.)
On Message Received: IF (Message received on a channel that is being tracked) THEN (Increment the number of messages received on that tracked channel)
On Interval: FOREACH (Channel being monitored) DO (Summarize the aggregated data into a consumable format, e.g. “17 people like this”) Republish the summary on a channel which all clients are listening for, e.g. ‘reactionSummary.smile’
In Conclusion
This how-to article has described how you can throttle and aggregate PubNub messages with serverless functions, specifically by using the subscribeOnInterval function. This is the simplest architectural approach and handles all scaling concerns on your behalf, so you do not need to consider the fact that functions are running distributedly.
If you would like to learn more about PubNub’s serverless processing capabilities, please check out our PubNub Functions documentation or contact our experts, who will be happy to walk through how we can help address your use cases. It is strongly recommended to contact us first if you plan on throttling or aggregating more than 1000 messages or requests a second.