From fbfceb9c77b7163806ae82d588c7b2c1077b15c6 Mon Sep 17 00:00:00 2001
From: Emelia Smith <ThisIsMissEm@users.noreply.github.com>
Date: Tue, 19 Sep 2023 12:25:30 +0200
Subject: [PATCH] Add additional metrics for streaming (#26945)

---
 streaming/index.js | 76 +++++++++++++++++++++++++++++++++++-----------
 1 file changed, 59 insertions(+), 17 deletions(-)

diff --git a/streaming/index.js b/streaming/index.js
index c9fac063d..8015c6815 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -152,6 +152,28 @@ const redisConfigFromEnv = (env) => {
   };
 };
 
+const PUBLIC_CHANNELS = [
+  'public',
+  'public:media',
+  'public:local',
+  'public:local:media',
+  'public:remote',
+  'public:remote:media',
+  'hashtag',
+  'hashtag:local',
+];
+
+// Used for priming the counters/gauges for the various metrics that are
+// per-channel
+const CHANNEL_NAMES = [
+  'system',
+  'user',
+  'user:notification',
+  'list',
+  'direct',
+  ...PUBLIC_CHANNELS
+];
+
 const startServer = async () => {
   const app = express();
 
@@ -203,9 +225,6 @@ const startServer = async () => {
     labelNames: ['type'],
   });
 
-  connectedClients.set({ type: 'websocket' }, 0);
-  connectedClients.set({ type: 'eventsource' }, 0);
-
   const connectedChannels = new metrics.Gauge({
     name: 'connected_channels',
     help: 'The number of channels the streaming server is streaming to',
@@ -217,6 +236,35 @@ const startServer = async () => {
     help: 'The number of Redis channels the streaming server is subscribed to',
   });
 
+  const redisMessagesReceived = new metrics.Counter({
+    name: 'redis_messages_received_total',
+    help: 'The total number of messages the streaming server has received from redis subscriptions'
+  });
+
+  const messagesSent = new metrics.Counter({
+    name: 'messages_sent_total',
+    help: 'The total number of messages the streaming server sent to clients per connection type',
+    labelNames: [ 'type' ]
+  });
+
+  // Prime the gauges so we don't loose metrics between restarts:
+  redisSubscriptions.set(0);
+  connectedClients.set({ type: 'websocket' }, 0);
+  connectedClients.set({ type: 'eventsource' }, 0);
+
+  // For each channel, initialize the gauges at zero; There's only a finite set of channels available
+  CHANNEL_NAMES.forEach(( channel ) => {
+    connectedChannels.set({ type: 'websocket', channel }, 0);
+    connectedChannels.set({ type: 'eventsource', channel }, 0);
+  })
+
+  // Prime the counters so that we don't loose metrics between restarts.
+  // Unfortunately counters don't support the set() API, so instead I'm using
+  // inc(0) to achieve the same result.
+  redisMessagesReceived.inc(0);
+  messagesSent.inc({ type: 'websocket' }, 0);
+  messagesSent.inc({ type: 'eventsource' }, 0);
+
   // When checking metrics in the browser, the favicon is requested this
   // prevents the request from falling through to the API Router, which would
   // error for this endpoint:
@@ -262,6 +310,8 @@ const startServer = async () => {
    * @param {string} message
    */
   const onRedisMessage = (channel, message) => {
+    redisMessagesReceived.inc();
+
     const callbacks = subs[channel];
 
     log.silly(`New message on channel ${redisPrefix}${channel}`);
@@ -490,17 +540,6 @@ const startServer = async () => {
     }
   };
 
-  const PUBLIC_CHANNELS = [
-    'public',
-    'public:media',
-    'public:local',
-    'public:local:media',
-    'public:remote',
-    'public:remote:media',
-    'hashtag',
-    'hashtag:local',
-  ];
-
   /**
    * @param {any} req
    * @param {string|undefined} channelName
@@ -705,10 +744,11 @@ const startServer = async () => {
    * @param {any} req
    * @param {function(string, string): void} output
    * @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
+   * @param {'websocket' | 'eventsource'} destinationType
    * @param {boolean=} needsFiltering
    * @returns {SubscriptionListener}
    */
-  const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
+  const streamFrom = (ids, req, output, attachCloseHandler, destinationType, needsFiltering = false) => {
     const accountId = req.accountId || req.remoteAddress;
 
     log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
@@ -717,6 +757,8 @@ const startServer = async () => {
       // TODO: Replace "string"-based delete payloads with object payloads:
       const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
 
+      messagesSent.labels({ type: destinationType }).inc(1);
+
       log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload}`);
       output(event, encodedPayload);
     };
@@ -1031,7 +1073,7 @@ const startServer = async () => {
       const onSend = streamToHttp(req, res);
       const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
 
-      streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering);
+      streamFrom(channelIds, req, onSend, onEnd, 'eventsource', options.needsFiltering);
     }).catch(err => {
       log.verbose(req.requestId, 'Subscription error:', err.toString());
       httpNotFound(res);
@@ -1241,7 +1283,7 @@ const startServer = async () => {
 
       const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
       const stopHeartbeat = subscriptionHeartbeat(channelIds);
-      const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering);
+      const listener = streamFrom(channelIds, request, onSend, undefined, 'websocket', options.needsFiltering);
 
       connectedChannels.labels({ type: 'websocket', channel: channelName }).inc();