3
0
Fork 0
mirror of https://github.com/ZeppelinBot/Zeppelin.git synced 2025-03-16 14:11:50 +00:00
zeppelin/backend/src/Queue.ts

68 lines
1.5 KiB
TypeScript
Raw Normal View History

2019-08-18 16:40:15 +03:00
import { SECONDS } from "./utils";
2021-05-22 13:34:25 +03:00
type InternalQueueFn = () => Promise<void>;
type AnyFn = (...args: any[]) => any;
2019-08-18 16:40:15 +03:00
const DEFAULT_TIMEOUT = 10 * SECONDS;
2019-02-23 21:19:46 +02:00
2021-05-22 13:34:25 +03:00
export class Queue<TQueueFunction extends AnyFn = AnyFn> {
protected running = false;
protected queue: InternalQueueFn[] = [];
2021-05-22 13:57:54 +03:00
protected _timeout: number;
2019-02-23 21:19:46 +02:00
constructor(timeout = DEFAULT_TIMEOUT) {
2021-05-22 13:57:54 +03:00
this._timeout = timeout;
}
get timeout(): number {
return this._timeout;
2019-02-23 21:19:46 +02:00
}
2021-05-22 13:46:55 +03:00
/**
* The number of operations that are currently queued up or running.
* I.e. backlog (queue) + current running process, if any.
*
* If this is 0, queueing a function will run it as soon as possible.
*/
get length(): number {
return this.queue.length + (this.running ? 1 : 0);
}
public add(fn: TQueueFunction): Promise<any> {
const promise = new Promise<any>((resolve, reject) => {
this.queue.push(async () => {
try {
const result = await fn();
resolve(result);
} catch (err) {
reject(err);
}
});
if (!this.running) this.next();
});
return promise;
}
2021-05-22 13:34:25 +03:00
public next(): void {
this.running = true;
if (this.queue.length === 0) {
this.running = false;
return;
}
const fn = this.queue.shift()!;
2021-09-11 19:06:51 +03:00
new Promise((resolve) => {
// Either fn() completes or the timeout is reached
2021-05-22 13:34:25 +03:00
void fn().then(resolve);
2021-05-22 13:57:54 +03:00
setTimeout(resolve, this._timeout);
}).then(() => this.next());
}
2020-07-27 20:42:10 +03:00
public clear() {
this.queue.splice(0, this.queue.length);
}
}