// @ts-check
const EventEmitter = require('events');
const fs = require('fs');
const path = require('path');
const os = require('os');
const Util = require('../Util/Util.js');
const Queue = require('../Structures/Queue.js');
const Cluster = require('./Cluster.js');
const PromiseHandler = require('../Structures/PromiseHandler.js');
class ClusterManager extends EventEmitter {
/**
* @param {string} file Path to your bot file
* @param {object} [options] Options for the cluster manager
* @param {string|number} [options.totalShards='auto'] Number of total internal shards or "auto"
* @param {string|number} [options.totalClusters='auto'] Number of total Clusters\Process to spawn
* @param {number} [options.shardsPerClusters] Number of shards per cluster
* @param {string[]} [options.shardArgs=[]] Arguments to pass to the clustered script when spawning
* (only available when using the `process` mode)
* @param {string[]} [options.execArgv=[]] Arguments to pass to the clustered script executable when spawning
* @param {boolean} [options.respawn=true] Whether clusters should automatically respawn upon exiting
* (only available when using the `process` mode)
* @param {ClusterManagerMode} [options.mode='worker'] Which mode to use for clustering
* @param {number[]} [options.shardList] A Array of Internal Shards Ids, which should get spawned
* @param {string} [options.token] Token to use for automatic internal shard count and passing to bot file
* @param {object} [options.restarts] Restart options
* @param {number} [options.restarts.interval] Interval in milliseconds on which the current restarts amount of a cluster will be resetted
* @param {number} [options.restarts.max] Maximum amount of restarts a cluster can have in the interval
* @param {object} [options.queue] Control the Spawn Queue
* @param {boolean} [options.queue.auto=true] Whether the spawn queue be automatically managed
*/
constructor(file, options = {}) {
super();
options = Util.mergeDefault(
{
totalClusters: 'auto',
totalShards: 'auto',
shardArgs: [],
execArgv: [],
respawn: true,
mode: 'process',
token: process.env.DISCORD_TOKEN,
queue: {
auto: true,
},
restarts: {
max: 3,
interval: 60000*60,
current: 0,
},
clusterData: {},
clusterOptions: {},
},
options,
);
if(options.keepAlive) throw new Error('keepAlive is not supported anymore on and above v1.6.0. Import it as plugin ("HeartBeatManager"), therefore check the libs readme');
/**
* Whether clusters should automatically respawn upon exiting
* @type {boolean}
*/
this.respawn = options.respawn;
/**
* How many times a cluster can maximally restart in the given interval
* @type {Object}
* @param {number} [interval=60000*60] Interval in milliseconds
* @param {number} [max=3] Max amount of restarts
* @param {number} [current=0] Current amount of restarts
*/
this.restarts = options.restarts;
/**
* Data, which is passed to the workerData or the processEnv
* @type {object}
*/
this.clusterData = options.clusterData;
/**
* Options, which is passed when forking a child or creating a thread
* @type {object}
*/
this.clusterOptions = options.clusterOptions;
/**
* Path to the bot script file
* @type {string}
*/
this.file = file;
if (!file) throw new Error('CLIENT_INVALID_OPTION | No File specified.');
if (!path.isAbsolute(file)) this.file = path.resolve(process.cwd(), file);
const stats = fs.statSync(this.file);
if (!stats.isFile()) throw new Error('CLIENT_INVALID_OPTION | Provided is file is not type of file');
/**
* Amount of internal shards in total
* @type {number}
*/
this.totalShards = options.totalShards || 'auto';
if (this.totalShards !== 'auto') {
if (typeof this.totalShards !== 'number' || isNaN(this.totalShards)) {
throw new TypeError('CLIENT_INVALID_OPTION | Amount of internal shards must be a number.');
}
if (this.totalShards < 1)
throw new RangeError('CLIENT_INVALID_OPTION | Amount of internal shards must be at least 1.');
if (!Number.isInteger(this.totalShards)) {
throw new RangeError('CLIENT_INVALID_OPTION | Amount of internal shards must be an integer.');
}
}
/**
* Amount of total clusters to spawn
* @type {number}
*/
this.totalClusters = options.totalClusters || 'auto';
if (this.totalClusters !== 'auto') {
if (typeof this.totalClusters !== 'number' || isNaN(this.totalClusters)) {
throw new TypeError('CLIENT_INVALID_OPTION | Amount of Clusters must be a number.');
}
if (this.totalClusters < 1)
throw new RangeError('CLIENT_INVALID_OPTION | Amount of Clusters must be at least 1.');
if (!Number.isInteger(this.totalClusters)) {
throw new RangeError('CLIENT_INVALID_OPTION | Amount of Clusters must be an integer.');
}
}
/**
* Amount of Shards per Clusters
* @type {number}
*/
this.shardsPerClusters = options.shardsPerClusters;
if (this.shardsPerClusters) {
if (typeof this.shardsPerClusters !== 'number' || isNaN(this.shardsPerClusters)) {
throw new TypeError('CLIENT_INVALID_OPTION | Amount of ShardsPerClusters must be a number.');
}
if (this.shardsPerClusters < 1)
throw new RangeError('CLIENT_INVALID_OPTION | Amount of shardsPerClusters must be at least 1.');
if (!Number.isInteger(this.shardsPerClusters)) {
throw new RangeError('CLIENT_INVALID_OPTION | Amount of Shards Per Clusters must be an integer.');
}
}
/**
* Mode for shards to spawn with
* @type {ClusterManagerMode}
*/
this.mode = options.mode;
if (this.mode !== 'worker' && this.mode !== 'process') {
throw new RangeError('CLIENT_INVALID_OPTION' + 'Cluster mode must be ' + '"worker" or "process"');
}
/**
* An array of arguments to pass to clusters (only when {@link ClusterManager#mode} is `process`)
* @type {string[]}
*/
this.shardArgs = options.shardArgs;
/**
* An array of arguments to pass to the executable (only when {@link ClusterManager#mode} is `process`)
* @type {string[]}
*/
this.execArgv = options.execArgv;
/**
* List of internal shard ids this cluster manager spawns
* @type {string|number[]}
*/
this.shardList = options.shardList || 'auto';
if (this.shardList !== 'auto') {
if (!Array.isArray(this.shardList)) {
throw new TypeError('CLIENT_INVALID_OPTION | shardList must be an array.');
}
this.shardList = [...new Set(this.shardList)];
if (this.shardList.length < 1) throw new RangeError('CLIENT_INVALID_OPTION | shardList must contain at least 1 ID.');
if (
this.shardList.some(
shardID =>
typeof shardID !== 'number' || isNaN(shardID) || !Number.isInteger(shardID) || shardID < 0,
)
) {
throw new TypeError('CLIENT_INVALID_OPTION | shardList has to contain an array of positive integers.');
}
}
/**
* Token to use for obtaining the automatic internal shards count, and passing to bot script
* @type {?string}
*/
this.token = options.token ? options.token.replace(/^Bot\s*/i, '') : null;
/**
* A collection of all clusters the manager spawned
* @type {Collection<number, Cluster>}
*/
this.clusters = new Map();
this.shardClusterList = null;
process.env.SHARD_LIST = undefined;
process.env.TOTAL_SHARDS = this.totalShards;
process.env.CLUSTER = undefined;
process.env.CLUSTER_COUNT = this.totalClusters;
process.env.CLUSTER_MANAGER = true;
process.env.CLUSTER_MANAGER_MODE = this.mode;
process.env.DISCORD_TOKEN = this.token;
process.env.MAINTENANCE = undefined;
if (options.queue.auto) process.env.CLUSTER_QUEUE_MODE = 'auto';
else process.env.CLUSTER_QUEUE_MODE = 'manual';
/**
* A Array of IDS[Number], which should be assigned to the spawned Clusters
* @type {number[]}
*/
this.clusterList = options.clusterList || [];
this.queue = new Queue(options.queue);
this._debug(`[START] Cluster Manager has been initialized`);
this.promise = new PromiseHandler();
}
/**
* Spawns multiple internal shards.
* @typedef {Object} ClusterSpawnOptions
* @property {number|string} [amount=this.totalShards] Number of internal shards to spawn
* @property {number} [delay=7000] How long to wait in between spawning each cluster (in milliseconds)
* @property {number} [tTimeout=30000] The amount in milliseconds to wait until the {@link Client} has become ready
* before resolving. (-1 or Infinity for no wait)
* @returns {Promise<Collection<number, Cluster>>}
*/
async spawn({ amount = this.totalShards, delay = 7000, timeout = -1 } = {}) {
if (delay < 7000) {
process.emitWarning(
`Spawn Delay (delay: ${delay}) is smaller than 7s, this can cause global rate limits on /gateway/bot`,
{
code: 'CLUSTER_MANAGER',
},
);
}
if (amount === 'auto') {
amount = await Util.fetchRecommendedShards(this.token, 1000);
this.totalShards = amount;
this._debug(`Discord recommended a total shard count of ${amount}`);
} else {
if (typeof amount !== 'number' || isNaN(amount)) {
throw new TypeError('CLIENT_INVALID_OPTION | Amount of Internal Shards must be a number.');
}
if (amount < 1) throw new RangeError('CLIENT_INVALID_OPTION | Amount of Internal Shards must be at least 1.');
if (!Number.isInteger(amount)) {
throw new RangeError('CLIENT_INVALID_OPTION | Amount of Internal Shards must be an integer.');
}
}
let clusterAmount = this.totalClusters;
if (clusterAmount === 'auto') {
clusterAmount = os.cpus().length;
this.totalClusters = clusterAmount;
} else {
if (typeof clusterAmount !== 'number' || isNaN(clusterAmount)) {
throw new TypeError('CLIENT_INVALID_OPTION | Amount of Clusters must be a number.');
}
if (clusterAmount < 1) throw new RangeError('CLIENT_INVALID_OPTION | Amount of Clusters must be at least 1.');
if (!Number.isInteger(clusterAmount)) {
throw new RangeError('CLIENT_INVALID_OPTION | Amount of Clusters must be an integer.');
}
}
if (this.shardList === 'auto') this.shardList = [...Array(amount).keys()];
//Calculate Shards per Cluster:
if (this.shardsPerClusters) this.totalClusters = Math.ceil(this.shardList.length / this.shardsPerClusters);
this.shardClusterList = this.shardList.chunk(Math.ceil(this.shardList.length / this.totalClusters));
if (this.shardClusterList.length !== this.totalClusters) {
this.totalClusters = this.shardClusterList.length;
}
if (this.shardList.some(shardID => shardID >= amount)) {
throw new RangeError('CLIENT_INVALID_OPTION | Shard IDs must be smaller than the amount of shards.');
}
this._debug(`[Spawning Clusters]
ClusterCount: ${this.totalClusters}
ShardCount: ${amount}
ShardList: ${this.shardClusterList.join(', ')}`);
for (let i = 0; i < this.totalClusters; i++) {
const clusterId = this.clusterList[i] || i;
const readyTimeout = timeout !== -1 ? timeout + delay * this.shardClusterList[i].length : timeout;
const spawnDelay = delay * this.shardClusterList[i].length;
this.queue.add({
run: (...a) => {
const cluster = this.createCluster(clusterId, this.shardClusterList[i], this.totalShards);
return cluster.spawn(...a);
},
args: [readyTimeout],
timeout: spawnDelay,
});
}
return this.queue.start();
}
/**
* Sends a message to all clusters.
* @param {*} message Message to be sent to the clusters
* @returns {Promise<Cluster[]>}
*/
broadcast(message) {
const promises = [];
for (const cluster of this.clusters.values()) promises.push(cluster.send(message));
return Promise.all(promises).then(e => e._result);
}
/**
* Creates a single cluster.
* <warn>Using this method is usually not necessary if you use the spawn method.</warn>
* <info>This is usually not necessary to manually specify.</info>
* @param id
* @param shardsToSpawn
* @param totalShards
* @returns {CLUSTER} Note that the created cluster needs to be explicitly spawned using its spawn method.
*/
createCluster(id, shardsToSpawn, totalShards, recluster = false) {
const cluster = new Cluster(this, id, shardsToSpawn, totalShards);
if(!recluster) this.clusters.set(id, cluster);
/**
* Emitted upon creating a cluster.
* @event ClusterManager#clusterCreate
* @param {Cluster} cluster Cluster that was created
*/
// @todo clusterReady event
this.emit('clusterCreate', cluster);
this._debug(`[CREATE] Created Cluster ${cluster.id}`);
return cluster;
}
/**
* Evaluates a script on all clusters, or a given cluster, in the context of the {@link Client}s.
* @param {string|Function} script JavaScript to run on each cluster
* @param {Object} [options={}] The options for the broadcastEVal
* @returns {Promise<*>|Promise<Array<*>>} Results of the script execution
*/
broadcastEval(script, options = {}) {
if (!script || (typeof script !== 'string' && typeof script !== 'function'))
return Promise.reject(new TypeError('ClUSTERING_INVALID_EVAL_BROADCAST'));
script = typeof script === 'function' ? `(${script})(this, ${JSON.stringify(options.context)})` : script;
if(Object.prototype.hasOwnProperty.call(options, 'cluster')) {
if(typeof options.cluster === 'number'){
if(options.cluster < 0 ) throw new RangeError('CLUSTER_ID_OUT_OF_RANGE');
}
if(Array.isArray(options.cluster)){
if(options.cluster.length === 0) throw new RangeError('ARRAY_MUST_CONTAIN_ONE CLUSTER_ID');
}
}
if(options.guildId){
options.shard = Util.shardIdForGuildId(options.guildId, this.totalShards);
console.log(options.shard);
}
if(options.shard){
if(typeof options.shard === 'number'){
if(options.shard < 0 ) throw new RangeError('SHARD_ID_OUT_OF_RANGE');
}
if(Array.isArray(options.shard)){
if(options.shard.length === 0) throw new RangeError('ARRAY_MUST_CONTAIN_ONE SHARD_ID');
}
options.cluster = [...this.clusters.values()].find(c => c.shardId === options.shard);
}
return this._performOnClusters('eval', [script], options.cluster, options.timeout);
}
/**
* Fetches a client property value of each cluster, or a given cluster.
* @param {string} prop Name of the client property to get, using periods for nesting
* @param {number} [cluster] Cluster to fetch property from, all if undefined
* @returns {Promise<*>|Promise<Array<*>>}
* @example
* manager.fetchClientValues('guilds.cache.size')
* .then(results => console.log(`${results.reduce((prev, val) => prev + val, 0)} total guilds`))
* .catch(console.error);
*/
fetchClientValues(prop, cluster) {
return this.broadcastEval(`this.${prop}`, { cluster });
}
/**
* Runs a method with given arguments on all clusters, or a given cluster.
* @param {string} method Method name to run on each cluster
* @param {Array<*>} args Arguments to pass through to the method call
* @param {number|array} [cluster] cluster to run on, all if undefined
* @param {number} [timeout] the amount of of time to wait until the promise will be rejected
* @returns {Promise<*>|Promise<Array<*>>} Results of the method execution
* @private
*/
_performOnClusters(method, args, cluster, timeout) {
if (this.clusters.size === 0) return Promise.reject(new Error('CLUSTERING_NO_CLUSTERS'));
if (typeof cluster === 'number') {
if (this.clusters.has(cluster)) return this.clusters.get(cluster)[method](...args, undefined, timeout);
return Promise.reject(new Error('CLUSTERING_CLUSTER_NOT_FOUND FOR ClusterId: ' + cluster));
}
let clusters = [...this.clusters.values()];
if (cluster) clusters = clusters.filter(c => cluster.includes(c.id));
if(clusters.length === 0) return Promise.reject(new Error('CLUSTERING_NO_CLUSTERS_FOUND'));
/* if (this.clusters.size !== this.totalClusters && !cluster) return Promise.reject(new Error('CLUSTERING_IN_PROCESS')); */
const promises = [];
for (const cl of clusters) promises.push(cl[method](...args, undefined, timeout));
return Promise.all(promises);
}
/**
* Kills all running clusters and respawns them.
* @param {ClusterRespawnOptions} [options] Options for respawning shards
* @returns {Promise<Collection<number, Cluster>>}
*/
async respawnAll({ clusterDelay = 5500, respawnDelay = 500, timeout = -1 } = {}) {
this.promise.nonce.clear();
let s = 0;
let i = 0;
for (const cluster of [...this.clusters.values()]) {
const promises = [cluster.respawn({ delay: respawnDelay, timeout })];
if (++s < this.clusters.size && clusterDelay > 0)
promises.push(Util.delayFor(this.shardClusterList[i].length * clusterDelay));
i++;
await Promise.all(promises); // eslint-disable-line no-await-in-loop
}
this._debug('Respawning all Clusters');
return this.clusters;
}
//Custom Functions:
/**
* Runs a method with given arguments on the Manager itself
* @param script
* @returns {Promise<*>|Promise<Array<*>>} Results of the script execution
* @private
*/
async evalOnManager(script) {
script = typeof script === 'function' ? `(${script})(this)` : script;
let result;
let error;
try {
result = await eval(script);
} catch (err) {
error = err;
}
return {_result: result, _error: error ? Util.makePlainError(error) : null};
}
/**
* Runs a method with given arguments on the provided Cluster Client
* @param script
* @param options
* @returns {Promise<*>|Promise<Array<*>>} Results of the script execution
* @private
*/
evalOnCluster(script, options) {
return this.broadcastEval(script, options).then(r => r[0]);
}
/**
* Adds a plugin to the cluster manager
*/
extend(...plugins){
if(!plugins) throw new Error('NO_PLUGINS_PROVIDED');
if(!Array.isArray(plugins)) plugins = [plugins];
for(const plugin of plugins){
if(!plugin) throw new Error('PLUGIN_NOT_PROVIDED');
if(typeof plugin !== 'object') throw new Error('PLUGIN_NOT_A_OBJECT');
plugin.build(this);
}
}
/**
* @param {string} reason If maintenance should be enabled on all clusters with a given reason or disabled when nonce provided
*/
triggerMaintenance(reason) {
return [...this.clusters.values()].forEach(cluster => cluster.triggerMaintenance(reason));
}
/**
* Logs out the Debug Messages
* <warn>Using this method just emits the Debug Event.</warn>
* <info>This is usually not necessary to manually specify.</info>
* @param message
* @param cluster
* @returns {string} returns the log message
*/
_debug(message, cluster) {
let log;
if (cluster === undefined) {
log = `[CM => Manager] ` + message;
} else {
log = `[CM => Cluster ${cluster}] ` + message;
}
/**
* Emitted upon receiving a message
* @event ClusterManager#debug
* @param {string} Message, which was received
*/
this.emit('debug', log);
return log;
}
}
module.exports = ClusterManager;
Object.defineProperty(Array.prototype, 'chunk', {
value: function (chunkSize) {
var R = [];
for (var i = 0; i < this.length; i += chunkSize) R.push(this.slice(i, i + chunkSize));
return R;
},
});