From 4118688fba789a84956d77415c87d049d46a7bf4 Mon Sep 17 00:00:00 2001 From: Emelia Smith Date: Tue, 27 Aug 2024 10:40:04 +0200 Subject: [PATCH] Streaming: Refactor move database and redis logic into separate files (#31567) --- streaming/database.js | 128 +++++++++++++++++++++++++++ streaming/index.js | 201 ++---------------------------------------- streaming/redis.js | 65 ++++++++++++++ streaming/utils.js | 20 +++++ 4 files changed, 219 insertions(+), 195 deletions(-) create mode 100644 streaming/database.js create mode 100644 streaming/redis.js diff --git a/streaming/database.js b/streaming/database.js new file mode 100644 index 000000000..9f1d74214 --- /dev/null +++ b/streaming/database.js @@ -0,0 +1,128 @@ +import pg from 'pg'; +import pgConnectionString from 'pg-connection-string'; + +import { parseIntFromEnvValue } from './utils.js'; + +/** + * @param {NodeJS.ProcessEnv} env the `process.env` value to read configuration from + * @param {string} environment + * @returns {pg.PoolConfig} the configuration for the PostgreSQL connection + */ +export function configFromEnv(env, environment) { + /** @type {Record} */ + const pgConfigs = { + development: { + user: env.DB_USER || pg.defaults.user, + password: env.DB_PASS || pg.defaults.password, + database: env.DB_NAME || 'mastodon_development', + host: env.DB_HOST || pg.defaults.host, + port: parseIntFromEnvValue(env.DB_PORT, pg.defaults.port ?? 5432, 'DB_PORT') + }, + + production: { + user: env.DB_USER || 'mastodon', + password: env.DB_PASS || '', + database: env.DB_NAME || 'mastodon_production', + host: env.DB_HOST || 'localhost', + port: parseIntFromEnvValue(env.DB_PORT, 5432, 'DB_PORT') + }, + }; + + /** + * @type {pg.PoolConfig} + */ + let baseConfig = {}; + + if (env.DATABASE_URL) { + const parsedUrl = pgConnectionString.parse(env.DATABASE_URL); + + // The result of dbUrlToConfig from pg-connection-string is not type + // compatible with pg.PoolConfig, since parts of the connection URL may be + // `null` when pg.PoolConfig expects `undefined`, as such we have to + // manually create the baseConfig object from the properties of the + // parsedUrl. + // + // For more information see: + // https://github.com/brianc/node-postgres/issues/2280 + // + // FIXME: clean up once brianc/node-postgres#3128 lands + if (typeof parsedUrl.password === 'string') baseConfig.password = parsedUrl.password; + if (typeof parsedUrl.host === 'string') baseConfig.host = parsedUrl.host; + if (typeof parsedUrl.user === 'string') baseConfig.user = parsedUrl.user; + if (typeof parsedUrl.port === 'string') { + const parsedPort = parseInt(parsedUrl.port, 10); + if (isNaN(parsedPort)) { + throw new Error('Invalid port specified in DATABASE_URL environment variable'); + } + baseConfig.port = parsedPort; + } + if (typeof parsedUrl.database === 'string') baseConfig.database = parsedUrl.database; + if (typeof parsedUrl.options === 'string') baseConfig.options = parsedUrl.options; + + // The pg-connection-string type definition isn't correct, as parsedUrl.ssl + // can absolutely be an Object, this is to work around these incorrect + // types, including the casting of parsedUrl.ssl to Record + if (typeof parsedUrl.ssl === 'boolean') { + baseConfig.ssl = parsedUrl.ssl; + } else if (typeof parsedUrl.ssl === 'object' && !Array.isArray(parsedUrl.ssl) && parsedUrl.ssl !== null) { + /** @type {Record} */ + const sslOptions = parsedUrl.ssl; + baseConfig.ssl = {}; + + baseConfig.ssl.cert = sslOptions.cert; + baseConfig.ssl.key = sslOptions.key; + baseConfig.ssl.ca = sslOptions.ca; + baseConfig.ssl.rejectUnauthorized = sslOptions.rejectUnauthorized; + } + + // Support overriding the database password in the connection URL + if (!baseConfig.password && env.DB_PASS) { + baseConfig.password = env.DB_PASS; + } + } else if (Object.hasOwn(pgConfigs, environment)) { + baseConfig = pgConfigs[environment]; + + if (env.DB_SSLMODE) { + switch(env.DB_SSLMODE) { + case 'disable': + case '': + baseConfig.ssl = false; + break; + case 'no-verify': + baseConfig.ssl = { rejectUnauthorized: false }; + break; + default: + baseConfig.ssl = {}; + break; + } + } + } else { + throw new Error('Unable to resolve postgresql database configuration.'); + } + + return { + ...baseConfig, + max: parseIntFromEnvValue(env.DB_POOL, 10, 'DB_POOL'), + connectionTimeoutMillis: 15000, + // Deliberately set application_name to an empty string to prevent excessive + // CPU usage with PG Bouncer. See: + // - https://github.com/mastodon/mastodon/pull/23958 + // - https://github.com/pgbouncer/pgbouncer/issues/349 + application_name: '', + }; +} + +let pool; +/** + * + * @param {pg.PoolConfig} config + * @returns {pg.Pool} + */ +export function getPool(config) { + if (pool) { + return pool; + } + + pool = new pg.Pool(config); + return pool; +} diff --git a/streaming/index.js b/streaming/index.js index 2267c469c..d94649d6e 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -8,15 +8,14 @@ import url from 'node:url'; import cors from 'cors'; import dotenv from 'dotenv'; import express from 'express'; -import { Redis } from 'ioredis'; import { JSDOM } from 'jsdom'; -import pg from 'pg'; -import pgConnectionString from 'pg-connection-string'; import { WebSocketServer } from 'ws'; +import * as Database from './database.js'; import { AuthenticationError, RequestError, extractStatusAndMessage as extractErrorStatusAndMessage } from './errors.js'; import { logger, httpLogger, initializeLogLevel, attachWebsocketHttpLogger, createWebsocketLogger } from './logging.js'; import { setupMetrics } from './metrics.js'; +import * as Redis from './redis.js'; import { isTruthy, normalizeHashtag, firstParam } from './utils.js'; const environment = process.env.NODE_ENV || 'development'; @@ -48,23 +47,6 @@ initializeLogLevel(process.env, environment); * @property {string} deviceId */ -/** - * @param {RedisConfiguration} config - * @returns {Promise} - */ -const createRedisClient = async ({ redisParams, redisUrl }) => { - let client; - - if (typeof redisUrl === 'string') { - client = new Redis(redisUrl, redisParams); - } else { - client = new Redis(redisParams); - } - - client.on('error', (err) => logger.error({ err }, 'Redis Client Error!')); - - return client; -}; /** * Attempts to safely parse a string as JSON, used when both receiving a message @@ -97,177 +79,6 @@ const parseJSON = (json, req) => { } }; -/** - * Takes an environment variable that should be an integer, attempts to parse - * it falling back to a default if not set, and handles errors parsing. - * @param {string|undefined} value - * @param {number} defaultValue - * @param {string} variableName - * @returns {number} - */ -const parseIntFromEnv = (value, defaultValue, variableName) => { - if (typeof value === 'string' && value.length > 0) { - const parsedValue = parseInt(value, 10); - if (isNaN(parsedValue)) { - throw new Error(`Invalid ${variableName} environment variable: ${value}`); - } - return parsedValue; - } else { - return defaultValue; - } -}; - -/** - * @param {NodeJS.ProcessEnv} env the `process.env` value to read configuration from - * @returns {pg.PoolConfig} the configuration for the PostgreSQL connection - */ -const pgConfigFromEnv = (env) => { - /** @type {Record} */ - const pgConfigs = { - development: { - user: env.DB_USER || pg.defaults.user, - password: env.DB_PASS || pg.defaults.password, - database: env.DB_NAME || 'mastodon_development', - host: env.DB_HOST || pg.defaults.host, - port: parseIntFromEnv(env.DB_PORT, pg.defaults.port ?? 5432, 'DB_PORT') - }, - - production: { - user: env.DB_USER || 'mastodon', - password: env.DB_PASS || '', - database: env.DB_NAME || 'mastodon_production', - host: env.DB_HOST || 'localhost', - port: parseIntFromEnv(env.DB_PORT, 5432, 'DB_PORT') - }, - }; - - /** - * @type {pg.PoolConfig} - */ - let baseConfig = {}; - - if (env.DATABASE_URL) { - const parsedUrl = pgConnectionString.parse(env.DATABASE_URL); - - // The result of dbUrlToConfig from pg-connection-string is not type - // compatible with pg.PoolConfig, since parts of the connection URL may be - // `null` when pg.PoolConfig expects `undefined`, as such we have to - // manually create the baseConfig object from the properties of the - // parsedUrl. - // - // For more information see: - // https://github.com/brianc/node-postgres/issues/2280 - // - // FIXME: clean up once brianc/node-postgres#3128 lands - if (typeof parsedUrl.password === 'string') baseConfig.password = parsedUrl.password; - if (typeof parsedUrl.host === 'string') baseConfig.host = parsedUrl.host; - if (typeof parsedUrl.user === 'string') baseConfig.user = parsedUrl.user; - if (typeof parsedUrl.port === 'string') { - const parsedPort = parseInt(parsedUrl.port, 10); - if (isNaN(parsedPort)) { - throw new Error('Invalid port specified in DATABASE_URL environment variable'); - } - baseConfig.port = parsedPort; - } - if (typeof parsedUrl.database === 'string') baseConfig.database = parsedUrl.database; - if (typeof parsedUrl.options === 'string') baseConfig.options = parsedUrl.options; - - // The pg-connection-string type definition isn't correct, as parsedUrl.ssl - // can absolutely be an Object, this is to work around these incorrect - // types, including the casting of parsedUrl.ssl to Record - if (typeof parsedUrl.ssl === 'boolean') { - baseConfig.ssl = parsedUrl.ssl; - } else if (typeof parsedUrl.ssl === 'object' && !Array.isArray(parsedUrl.ssl) && parsedUrl.ssl !== null) { - /** @type {Record} */ - const sslOptions = parsedUrl.ssl; - baseConfig.ssl = {}; - - baseConfig.ssl.cert = sslOptions.cert; - baseConfig.ssl.key = sslOptions.key; - baseConfig.ssl.ca = sslOptions.ca; - baseConfig.ssl.rejectUnauthorized = sslOptions.rejectUnauthorized; - } - - // Support overriding the database password in the connection URL - if (!baseConfig.password && env.DB_PASS) { - baseConfig.password = env.DB_PASS; - } - } else if (Object.hasOwn(pgConfigs, environment)) { - baseConfig = pgConfigs[environment]; - - if (env.DB_SSLMODE) { - switch(env.DB_SSLMODE) { - case 'disable': - case '': - baseConfig.ssl = false; - break; - case 'no-verify': - baseConfig.ssl = { rejectUnauthorized: false }; - break; - default: - baseConfig.ssl = {}; - break; - } - } - } else { - throw new Error('Unable to resolve postgresql database configuration.'); - } - - return { - ...baseConfig, - max: parseIntFromEnv(env.DB_POOL, 10, 'DB_POOL'), - connectionTimeoutMillis: 15000, - // Deliberately set application_name to an empty string to prevent excessive - // CPU usage with PG Bouncer. See: - // - https://github.com/mastodon/mastodon/pull/23958 - // - https://github.com/pgbouncer/pgbouncer/issues/349 - application_name: '', - }; -}; - -/** - * @typedef RedisConfiguration - * @property {import('ioredis').RedisOptions} redisParams - * @property {string} redisPrefix - * @property {string|undefined} redisUrl - */ - -/** - * @param {NodeJS.ProcessEnv} env the `process.env` value to read configuration from - * @returns {RedisConfiguration} configuration for the Redis connection - */ -const redisConfigFromEnv = (env) => { - // ioredis *can* transparently add prefixes for us, but it doesn't *in some cases*, - // which means we can't use it. But this is something that should be looked into. - const redisPrefix = env.REDIS_NAMESPACE ? `${env.REDIS_NAMESPACE}:` : ''; - - let redisPort = parseIntFromEnv(env.REDIS_PORT, 6379, 'REDIS_PORT'); - let redisDatabase = parseIntFromEnv(env.REDIS_DB, 0, 'REDIS_DB'); - - /** @type {import('ioredis').RedisOptions} */ - const redisParams = { - host: env.REDIS_HOST || '127.0.0.1', - port: redisPort, - // Force support for both IPv6 and IPv4, by default ioredis sets this to 4, - // only allowing IPv4 connections: - // https://github.com/redis/ioredis/issues/1576 - family: 0, - db: redisDatabase, - password: env.REDIS_PASSWORD || undefined, - }; - - // redisParams.path takes precedence over host and port. - if (env.REDIS_URL && env.REDIS_URL.startsWith('unix://')) { - redisParams.path = env.REDIS_URL.slice(7); - } - - return { - redisParams, - redisPrefix, - redisUrl: typeof env.REDIS_URL === 'string' ? env.REDIS_URL : undefined, - }; -}; - const PUBLIC_CHANNELS = [ 'public', 'public:media', @@ -291,10 +102,12 @@ const CHANNEL_NAMES = [ ]; const startServer = async () => { - const pgPool = new pg.Pool(pgConfigFromEnv(process.env)); + const pgPool = Database.getPool(Database.configFromEnv(process.env, environment)); const metrics = setupMetrics(CHANNEL_NAMES, pgPool); + const redisConfig = Redis.configFromEnv(process.env); + const redisClient = Redis.createClient(redisConfig, logger); const server = http.createServer(); const wss = new WebSocketServer({ noServer: true }); @@ -386,9 +199,7 @@ const startServer = async () => { */ const subs = {}; - const redisConfig = redisConfigFromEnv(process.env); - const redisSubscribeClient = await createRedisClient(redisConfig); - const redisClient = await createRedisClient(redisConfig); + const redisSubscribeClient = Redis.createClient(redisConfig, logger); const { redisPrefix } = redisConfig; // When checking metrics in the browser, the favicon is requested this diff --git a/streaming/redis.js b/streaming/redis.js new file mode 100644 index 000000000..208d6ae07 --- /dev/null +++ b/streaming/redis.js @@ -0,0 +1,65 @@ +import { Redis } from 'ioredis'; + +import { parseIntFromEnvValue } from './utils.js'; + +/** + * @typedef RedisConfiguration + * @property {import('ioredis').RedisOptions} redisParams + * @property {string} redisPrefix + * @property {string|undefined} redisUrl + */ + +/** + * @param {NodeJS.ProcessEnv} env the `process.env` value to read configuration from + * @returns {RedisConfiguration} configuration for the Redis connection + */ +export function configFromEnv(env) { + // ioredis *can* transparently add prefixes for us, but it doesn't *in some cases*, + // which means we can't use it. But this is something that should be looked into. + const redisPrefix = env.REDIS_NAMESPACE ? `${env.REDIS_NAMESPACE}:` : ''; + + let redisPort = parseIntFromEnvValue(env.REDIS_PORT, 6379, 'REDIS_PORT'); + let redisDatabase = parseIntFromEnvValue(env.REDIS_DB, 0, 'REDIS_DB'); + + /** @type {import('ioredis').RedisOptions} */ + const redisParams = { + host: env.REDIS_HOST || '127.0.0.1', + port: redisPort, + // Force support for both IPv6 and IPv4, by default ioredis sets this to 4, + // only allowing IPv4 connections: + // https://github.com/redis/ioredis/issues/1576 + family: 0, + db: redisDatabase, + password: env.REDIS_PASSWORD || undefined, + }; + + // redisParams.path takes precedence over host and port. + if (env.REDIS_URL && env.REDIS_URL.startsWith('unix://')) { + redisParams.path = env.REDIS_URL.slice(7); + } + + return { + redisParams, + redisPrefix, + redisUrl: typeof env.REDIS_URL === 'string' ? env.REDIS_URL : undefined, + }; +} + +/** + * @param {RedisConfiguration} config + * @param {import('pino').Logger} logger + * @returns {Redis} + */ +export function createClient({ redisParams, redisUrl }, logger) { + let client; + + if (typeof redisUrl === 'string') { + client = new Redis(redisUrl, redisParams); + } else { + client = new Redis(redisParams); + } + + client.on('error', (err) => logger.error({ err }, 'Redis Client Error!')); + + return client; +} diff --git a/streaming/utils.js b/streaming/utils.js index 4610bf660..47c63dd4c 100644 --- a/streaming/utils.js +++ b/streaming/utils.js @@ -59,3 +59,23 @@ export function firstParam(arrayOrString) { return arrayOrString; } } + +/** + * Takes an environment variable that should be an integer, attempts to parse + * it falling back to a default if not set, and handles errors parsing. + * @param {string|undefined} value + * @param {number} defaultValue + * @param {string} variableName + * @returns {number} + */ +export function parseIntFromEnvValue(value, defaultValue, variableName) { + if (typeof value === 'string' && value.length > 0) { + const parsedValue = parseInt(value, 10); + if (isNaN(parsedValue)) { + throw new Error(`Invalid ${variableName} environment variable: ${value}`); + } + return parsedValue; + } else { + return defaultValue; + } +}