refactor: clean the rate limit utility
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
import type { RateLimitOptions } from '@server/utils/rateLimit';
|
||||
import rateLimit from '@server/utils/rateLimit';
|
||||
import type NodeCache from 'node-cache';
|
||||
|
||||
@@ -10,10 +11,7 @@ const DEFAULT_ROLLING_BUFFER = 10000;
|
||||
interface ExternalAPIOptions {
|
||||
nodeCache?: NodeCache;
|
||||
headers?: Record<string, unknown>;
|
||||
rateLimit?: {
|
||||
maxRPS: number;
|
||||
maxRequests: number;
|
||||
};
|
||||
rateLimit?: RateLimitOptions;
|
||||
}
|
||||
|
||||
class ExternalAPI {
|
||||
@@ -29,10 +27,7 @@ class ExternalAPI {
|
||||
options: ExternalAPIOptions = {}
|
||||
) {
|
||||
if (options.rateLimit) {
|
||||
this.fetch = rateLimit(fetch, {
|
||||
maxRequests: options.rateLimit.maxRequests,
|
||||
maxRPS: options.rateLimit.maxRPS,
|
||||
});
|
||||
this.fetch = rateLimit(fetch, options.rateLimit);
|
||||
} else {
|
||||
this.fetch = fetch;
|
||||
}
|
||||
|
||||
@@ -112,8 +112,8 @@ class TheMovieDb extends ExternalAPI {
|
||||
{
|
||||
nodeCache: cacheManager.getCache('tmdb').data,
|
||||
rateLimit: {
|
||||
maxRequests: 20,
|
||||
maxRPS: 50,
|
||||
id: 'tmdb',
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import logger from '@server/logger';
|
||||
import type { RateLimitOptions } from '@server/utils/rateLimit';
|
||||
import rateLimit from '@server/utils/rateLimit';
|
||||
import { createHash } from 'crypto';
|
||||
import { promises } from 'fs';
|
||||
@@ -107,10 +108,7 @@ class ImageProxy {
|
||||
baseUrl: string,
|
||||
options: {
|
||||
cacheVersion?: number;
|
||||
rateLimitOptions?: {
|
||||
maxRPS: number;
|
||||
maxRequests: number;
|
||||
};
|
||||
rateLimitOptions?: RateLimitOptions;
|
||||
} = {}
|
||||
) {
|
||||
this.cacheVersion = options.cacheVersion ?? 1;
|
||||
@@ -118,7 +116,9 @@ class ImageProxy {
|
||||
this.key = key;
|
||||
|
||||
if (options.rateLimitOptions) {
|
||||
this.fetch = rateLimit(fetch, options.rateLimitOptions);
|
||||
this.fetch = rateLimit(fetch, {
|
||||
...options.rateLimitOptions,
|
||||
});
|
||||
} else {
|
||||
this.fetch = fetch;
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ import { Router } from 'express';
|
||||
const router = Router();
|
||||
const tmdbImageProxy = new ImageProxy('tmdb', 'https://image.tmdb.org', {
|
||||
rateLimitOptions: {
|
||||
maxRequests: 20,
|
||||
maxRPS: 50,
|
||||
},
|
||||
});
|
||||
|
||||
@@ -1,51 +1,63 @@
|
||||
export type RateLimitOptions = {
|
||||
maxRequests?: number;
|
||||
perMilliseconds?: number;
|
||||
maxRPS?: number;
|
||||
maxRPS: number;
|
||||
id?: string;
|
||||
};
|
||||
|
||||
type RateLimiteState<T extends (...args: Parameters<T>) => Promise<U>, U> = {
|
||||
queue: {
|
||||
args: Parameters<T>;
|
||||
resolve: (value: U) => void;
|
||||
}[];
|
||||
activeRequests: number;
|
||||
timer: NodeJS.Timeout | null;
|
||||
};
|
||||
|
||||
const rateLimitById: Record<string, unknown> = {};
|
||||
|
||||
/**
|
||||
* Add a rate limit to a function so it doesn't exceed a maximum number of requests per second. Function calls exceeding the rate will be delayed.
|
||||
* @param fn The function to rate limit
|
||||
* @param options.maxRPS Maximum number of Requests Per Second
|
||||
* @param options.id An ID to share between rate limits, so it uses the same request queue.
|
||||
* @returns The function with a rate limit
|
||||
*/
|
||||
export default function rateLimit<
|
||||
T extends (...args: Parameters<T>) => Promise<U>,
|
||||
U
|
||||
>(fn: T, options: RateLimitOptions): (...args: Parameters<T>) => Promise<U> {
|
||||
const maxRequests = options.maxRPS ?? options.maxRequests ?? 1;
|
||||
const perMilliseconds = options.maxRPS
|
||||
? 1000
|
||||
: options.perMilliseconds ?? 1000;
|
||||
|
||||
const queue: {
|
||||
args: Parameters<T>;
|
||||
resolve: (value: U) => void;
|
||||
}[] = [];
|
||||
let activeRequests = 0;
|
||||
let timer: NodeJS.Timeout | null = null;
|
||||
const state: RateLimiteState<T, U> = (rateLimitById[
|
||||
options.id || ''
|
||||
] as RateLimiteState<T, U>) || { queue: [], activeRequests: 0, timer: null };
|
||||
if (options.id) {
|
||||
rateLimitById[options.id] = state;
|
||||
}
|
||||
|
||||
const processQueue = () => {
|
||||
if (queue.length === 0) {
|
||||
if (timer) {
|
||||
clearInterval(timer);
|
||||
timer = null;
|
||||
if (state.queue.length === 0) {
|
||||
if (state.timer) {
|
||||
clearInterval(state.timer);
|
||||
state.timer = null;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
while (activeRequests < maxRequests) {
|
||||
activeRequests++;
|
||||
const item = queue.shift();
|
||||
while (state.activeRequests < options.maxRPS) {
|
||||
state.activeRequests++;
|
||||
const item = state.queue.shift();
|
||||
if (!item) break;
|
||||
const { args, resolve } = item;
|
||||
fn(...args)
|
||||
.then(resolve)
|
||||
.finally(() => {
|
||||
activeRequests--;
|
||||
if (queue.length > 0) {
|
||||
if (!timer) {
|
||||
timer = setInterval(processQueue, perMilliseconds);
|
||||
state.activeRequests--;
|
||||
if (state.queue.length > 0) {
|
||||
if (!state.timer) {
|
||||
state.timer = setInterval(processQueue, 1000);
|
||||
}
|
||||
} else {
|
||||
if (timer) {
|
||||
clearInterval(timer);
|
||||
timer = null;
|
||||
if (state.timer) {
|
||||
clearInterval(state.timer);
|
||||
state.timer = null;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -54,7 +66,7 @@ export default function rateLimit<
|
||||
|
||||
return (...args: Parameters<T>): Promise<U> => {
|
||||
return new Promise<U>((resolve) => {
|
||||
queue.push({ args, resolve });
|
||||
state.queue.push({ args, resolve });
|
||||
processQueue();
|
||||
});
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user