From c245a2044ecf5418b8010e3e8340a5899db38cf3 Mon Sep 17 00:00:00 2001
From: Emelia Smith <ThisIsMissEm@users.noreply.github.com>
Date: Mon, 26 Aug 2024 10:08:21 +0200
Subject: [PATCH] Streaming: Refactor to use metrics.$name instead of
 destructuring (#31566)

---
 streaming/index.js   | 53 ++++++++++++++++----------------------------
 streaming/metrics.js | 17 ++++++++++++--
 2 files changed, 34 insertions(+), 36 deletions(-)

diff --git a/streaming/index.js b/streaming/index.js
index 8f6636217..2267c469c 100644
--- a/streaming/index.js
+++ b/streaming/index.js
@@ -292,6 +292,9 @@ const CHANNEL_NAMES = [
 
 const startServer = async () => {
   const pgPool = new pg.Pool(pgConfigFromEnv(process.env));
+
+  const metrics = setupMetrics(CHANNEL_NAMES, pgPool);
+
   const server = http.createServer();
   const wss = new WebSocketServer({ noServer: true });
 
@@ -388,16 +391,6 @@ const startServer = async () => {
   const redisClient = await createRedisClient(redisConfig);
   const { redisPrefix } = redisConfig;
 
-  const metrics = setupMetrics(CHANNEL_NAMES, pgPool);
-  // TODO: migrate all metrics to metrics.X.method() instead of just X.method()
-  const {
-    connectedClients,
-    connectedChannels,
-    redisSubscriptions,
-    redisMessagesReceived,
-    messagesSent,
-  } = metrics;
-
   // When checking metrics in the browser, the favicon is requested this
   // prevents the request from falling through to the API Router, which would
   // error for this endpoint:
@@ -408,15 +401,7 @@ const startServer = async () => {
     res.end('OK');
   });
 
-  app.get('/metrics', async (req, res) => {
-    try {
-      res.set('Content-Type', metrics.register.contentType);
-      res.end(await metrics.register.metrics());
-    } catch (ex) {
-      req.log.error(ex);
-      res.status(500).end();
-    }
-  });
+  app.get('/metrics', metrics.requestHandler);
 
   /**
    * @param {string[]} channels
@@ -443,7 +428,7 @@ const startServer = async () => {
    * @param {string} message
    */
   const onRedisMessage = (channel, message) => {
-    redisMessagesReceived.inc();
+    metrics.redisMessagesReceived.inc();
 
     const callbacks = subs[channel];
 
@@ -481,7 +466,7 @@ const startServer = async () => {
         if (err) {
           logger.error(`Error subscribing to ${channel}`);
         } else if (typeof count === 'number') {
-          redisSubscriptions.set(count);
+          metrics.redisSubscriptions.set(count);
         }
       });
     }
@@ -508,7 +493,7 @@ const startServer = async () => {
         if (err) {
           logger.error(`Error unsubscribing to ${channel}`);
         } else if (typeof count === 'number') {
-          redisSubscriptions.set(count);
+          metrics.redisSubscriptions.set(count);
         }
       });
       delete subs[channel];
@@ -688,13 +673,13 @@ const startServer = async () => {
       unsubscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
       unsubscribe(`${redisPrefix}${systemChannelId}`, listener);
 
-      connectedChannels.labels({ type: 'eventsource', channel: 'system' }).dec(2);
+      metrics.connectedChannels.labels({ type: 'eventsource', channel: 'system' }).dec(2);
     });
 
     subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
     subscribe(`${redisPrefix}${systemChannelId}`, listener);
 
-    connectedChannels.labels({ type: 'eventsource', channel: 'system' }).inc(2);
+    metrics.connectedChannels.labels({ type: 'eventsource', channel: 'system' }).inc(2);
   };
 
   /**
@@ -790,7 +775,7 @@ const startServer = async () => {
       // TODO: Replace "string"-based delete payloads with object payloads:
       const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
 
-      messagesSent.labels({ type: destinationType }).inc(1);
+      metrics.messagesSent.labels({ type: destinationType }).inc(1);
 
       log.debug({ event, payload }, `Transmitting ${event} to ${req.accountId}`);
 
@@ -1027,11 +1012,11 @@ const startServer = async () => {
   const streamToHttp = (req, res) => {
     const channelName = channelNameFromPath(req);
 
-    connectedClients.labels({ type: 'eventsource' }).inc();
+    metrics.connectedClients.labels({ type: 'eventsource' }).inc();
 
     // In theory we'll always have a channel name, but channelNameFromPath can return undefined:
     if (typeof channelName === 'string') {
-      connectedChannels.labels({ type: 'eventsource', channel: channelName }).inc();
+      metrics.connectedChannels.labels({ type: 'eventsource', channel: channelName }).inc();
     }
 
     res.setHeader('Content-Type', 'text/event-stream');
@@ -1047,10 +1032,10 @@ const startServer = async () => {
 
       // We decrement these counters here instead of in streamHttpEnd as in that
       // method we don't have knowledge of the channel names
-      connectedClients.labels({ type: 'eventsource' }).dec();
+      metrics.connectedClients.labels({ type: 'eventsource' }).dec();
       // In theory we'll always have a channel name, but channelNameFromPath can return undefined:
       if (typeof channelName === 'string') {
-        connectedChannels.labels({ type: 'eventsource', channel: channelName }).dec();
+        metrics.connectedChannels.labels({ type: 'eventsource', channel: channelName }).dec();
       }
 
       clearInterval(heartbeat);
@@ -1324,7 +1309,7 @@ const startServer = async () => {
       const stopHeartbeat = subscriptionHeartbeat(channelIds);
       const listener = streamFrom(channelIds, request, logger, onSend, undefined, 'websocket', options.needsFiltering);
 
-      connectedChannels.labels({ type: 'websocket', channel: channelName }).inc();
+      metrics.connectedChannels.labels({ type: 'websocket', channel: channelName }).inc();
 
       subscriptions[channelIds.join(';')] = {
         channelName,
@@ -1363,7 +1348,7 @@ const startServer = async () => {
       unsubscribe(`${redisPrefix}${channelId}`, subscription.listener);
     });
 
-    connectedChannels.labels({ type: 'websocket', channel: subscription.channelName }).dec();
+    metrics.connectedChannels.labels({ type: 'websocket', channel: subscription.channelName }).dec();
     subscription.stopHeartbeat();
 
     delete subscriptions[channelIds.join(';')];
@@ -1421,7 +1406,7 @@ const startServer = async () => {
       },
     };
 
-    connectedChannels.labels({ type: 'websocket', channel: 'system' }).inc(2);
+    metrics.connectedChannels.labels({ type: 'websocket', channel: 'system' }).inc(2);
   };
 
   /**
@@ -1433,7 +1418,7 @@ const startServer = async () => {
     // Note: url.parse could throw, which would terminate the connection, so we
     // increment the connected clients metric straight away when we establish
     // the connection, without waiting:
-    connectedClients.labels({ type: 'websocket' }).inc();
+    metrics.connectedClients.labels({ type: 'websocket' }).inc();
 
     // Setup connection keep-alive state:
     ws.isAlive = true;
@@ -1459,7 +1444,7 @@ const startServer = async () => {
       });
 
       // Decrement the metrics for connected clients:
-      connectedClients.labels({ type: 'websocket' }).dec();
+      metrics.connectedClients.labels({ type: 'websocket' }).dec();
 
       // We need to unassign the session object as to ensure it correctly gets
       // garbage collected, without doing this we could accidentally hold on to
diff --git a/streaming/metrics.js b/streaming/metrics.js
index a029d778f..bb6bce3f3 100644
--- a/streaming/metrics.js
+++ b/streaming/metrics.js
@@ -4,12 +4,12 @@ import metrics from 'prom-client';
 
 /**
  * @typedef StreamingMetrics
- * @property {metrics.Registry} register
  * @property {metrics.Gauge<"type">} connectedClients
  * @property {metrics.Gauge<"type" | "channel">} connectedChannels
  * @property {metrics.Gauge} redisSubscriptions
  * @property {metrics.Counter} redisMessagesReceived
  * @property {metrics.Counter<"type">} messagesSent
+ * @property {import('express').RequestHandler<{}>} requestHandler
  */
 
 /**
@@ -92,8 +92,21 @@ export function setupMetrics(channels, pgPool) {
   messagesSent.inc({ type: 'websocket' }, 0);
   messagesSent.inc({ type: 'eventsource' }, 0);
 
+  /**
+   * @type {import('express').RequestHandler<{}>}
+   */
+  const requestHandler = (req, res) => {
+    metrics.register.metrics().then((output) => {
+      res.set('Content-Type', metrics.register.contentType);
+      res.end(output);
+    }).catch((err) => {
+      req.log.error(err, "Error collecting metrics");
+      res.status(500).end();
+    });
+  };
+
   return {
-    register: metrics.register,
+    requestHandler,
     connectedClients,
     connectedChannels,
     redisSubscriptions,