Core_Cluster.js

// @ts-check
const EventEmitter = require('events');
const path = require('path');
const Util = require('../Util/Util.js');

const { messageType } = require('../Util/Constants.js');
const { IPCMessage, BaseMessage } = require('../Structures/IPCMessage.js');
const { ClusterHandler } = require('../Structures/IPCHandler.js');

const { Worker } = require('../Structures/Worker.js');
const { Child } = require('../Structures/Child.js');

let Thread = null;

/**
 * A self-contained cluster created by the {@link ClusterManager}. Each one has a {@link Child} that contains
 * an instance of the bot and its {@link Client}. When its child process/worker exits for any reason, the cluster will
 * spawn a new one to replace it as necessary.
 * @augments EventEmitter
 */
class Cluster extends EventEmitter {
    /**
     * @param {ClusterManager} manager Manager that is creating this cluster
     * @param {number} id ID of this cluster
     * @param shardList
     * @param totalShards
     */
    constructor(manager, id, shardList, totalShards) {
        super();
        if (manager.mode === 'process') Thread = Child;
        else if (manager.mode === 'worker') Thread = Worker;
        /**
         * Manager that created the cluster
         * @type {ClusterManager}
         */
        this.manager = manager;
        /**
         * ID of the cluster in the manager
         * @type {number}
         */
        this.id = id;

        /**
         * Arguments for the shard's process (only when {@link ShardingManager#mode} is `process`)
         * @type {string[]}
         */
        this.args = manager.shardArgs || [];

        /**
         * Arguments for the shard's process executable (only when {@link ShardingManager#mode} is `process`)
         * @type {string[]}
         */
        this.execArgv = manager.execArgv;
        /**
         * Internal Shards which will get spawned in the cluster
         * @type {number}
         */
        this.shardList = shardList;
        /**
         * the amount of real shards
         * @type {number}
         */
        this.totalShards = totalShards;
        /**
         * Environment variables for the cluster's process, or workerData for the cluster's worker
         * @type {object}
         */
        this.env = Object.assign({}, process.env, {
            SHARD_LIST: this.shardList,
            TOTAL_SHARDS: this.totalShards,
            CLUSTER_MANAGER: true,
            CLUSTER: this.id,
            CLUSTER_COUNT: this.manager.totalClusters,
            DISCORD_TOKEN: this.manager.token,
        });
        /**
         * Whether the cluster's {@link Client} is ready
         * @type {boolean}
         */

        /**
         * Process of the cluster (if {@link ClusterManager#mode} is `process`)
         * @type {?Child|?Worker}
         */
        this.thread = null;


        this.restarts = {
            current: this.manager.restarts.current,
            max: this.manager.restarts.max,
            interval: this.manager.restarts.interval,
            resetRestarts: () =>{
               this.restarts.reset = setInterval(() => {
                    this.restarts.current = 0;
                }, this.manager.restarts.interval)
            },
            cleanup: () => {
                clearInterval(this.restarts.reset);
            },
            append: () => {
                this.restarts.current++;
            }
        }
    }
    /**
     * Forks a child process or creates a worker thread for the cluster.
     * <warn>You should not need to call this manually.</warn>
     * @param {number} [spawnTimeout=30000] The amount in milliseconds to wait until the {@link Client} has become ready
     * before resolving. (-1 or Infinity for no wait)
     * @returns {Promise<Child>}
     */
    async spawn(spawnTimeout = 30000) {
        if (this.thread) throw new Error('CLUSTER ALREADY SPAWNED | ClusterId: ' + this.id);
        this.thread = new Thread(path.resolve(this.manager.file), {
            ...this.manager.clusterOptions,
            execArgv: this.execArgv,
            env: this.env,
            args: this.args,
            clusterData: { ...this.env, ...this.manager.clusterData },
        });
        this.messageHandler = new ClusterHandler(this.manager, this, this.thread);

        this.thread
            .spawn()
            .on('message', this._handleMessage.bind(this))
            .on('exit', this._handleExit.bind(this))
            .on('error', this._handleError.bind(this));

        /**
         * Emitted upon the creation of the cluster's child process/worker.
         * @event Cluster#spawn
         * @param {Child|Worker} process Child process/worker that was created
         */
        this.emit('spawn', this.thread.process);

        if (spawnTimeout === -1 || spawnTimeout === Infinity) return this.thread.process;

        await new Promise((resolve, reject) => {
            const cleanup = () => {
                clearTimeout(spawnTimeoutTimer);
                this.off('ready', onReady);
                this.off('death', onDeath);
            };

            const onReady = () => {
                this.manager.emit('clusterReady', this);
                this.restarts.cleanup();
                this.restarts.resetRestarts();
                cleanup();
                resolve();
            };

            const onDeath = () => {
                cleanup();
                reject(new Error('CLUSTERING_READY_DIED | ClusterId: ' + this.id));
            };

            const onTimeout = () => {
                cleanup();
                reject(new Error('CLUSTERING_READY_TIMEOUT | ClusterId: '+ this.id));
            };

            const spawnTimeoutTimer = setTimeout(onTimeout, spawnTimeout);
            this.once('ready', onReady);
            this.once('death', onDeath);
        });
        return this.thread.process;
    }
    /**
     * Immediately kills the clusters's process/worker and does not restart it.
     * @param {object} options Some Options for managing the Kill
     * @param {object} options.force Whether the Cluster should be force kill and be ever respawned...
     */
    kill(options = {}) {
        this.thread.kill(options);
        this.manager.heartbeat?.clusters.get(this.id)?.stop();
        this.restarts.cleanup();
        this._handleExit(false);
    }
    /**
     * Kills and restarts the cluster's process/worker.
     * @param {ClusterRespawnOptions} [options] Options for respawning the cluster
     * @returns {Promise<Child>}
     */
    async respawn({ delay = 500, timeout = 30000 } = {}) {
        if (this.thread) this.kill({ force: true });
        if (delay > 0) await Util.delayFor(delay);
        this.manager.heartbeat?.clusters.get(this.id)?.stop();
        return this.spawn(timeout);
    }
    /**
     * Sends a message to the cluster's process/worker.
     * @param {*|BaseMessage} message Message to send to the cluster
     * @returns {Promise<Shard>}
     */
    send(message) {
        if (typeof message === 'object') message = new BaseMessage(message).toJSON();
        return this.thread.send(message);
    }

    /**
     * Sends a Request to the ClusterClient and returns the reply
     * @param {Object} message Message, which should be sent as request
     * @returns {Promise<*>} Reply of the Message
     * @example
     * client.cluster.request({content: 'hello'})
     *   .then(result => console.log(result)) //hi
     *   .catch(console.error);
     * @see {@link IPCMessage#reply}
     */
    request(message = {}) {
        message._sRequest = true;
        message._sReply = false;
        message._type = messageType.CUSTOM_REQUEST;
        this.send(message);
        return this.manager.promise.create(message);
    }
    /**
     * Evaluates a script or function on the cluster, in the context of the {@link Client}.
     * @param {string|Function} script JavaScript to run on the cluster
     * @param context
     * @param timeout
     * @returns {Promise<*>} Result of the script execution
     */
    async eval(script, context, timeout) {
        // Stringify the script if it's a Function
        const _eval = typeof script === 'function' ? `(${script})(this, ${JSON.stringify(context)})` : script;

        // cluster is dead (maybe respawning), don't cache anything and error immediately
        if (!this.thread) return Promise.reject(new Error('CLUSTERING_NO_CHILD_EXISTS | ClusterId: '+  this.id));
        const nonce = Util.generateNonce();
        const message = {nonce, _eval, options: {timeout}, _type: messageType.CLIENT_EVAL_REQUEST};
        await this.send(message);
        return await this.manager.promise.create(message);
    }

    /**
    * @param {string} reason If maintenance should be enabled with a given reason or disabled when nonce provided
    */
    triggerMaintenance(reason) {
        const _type = reason ? messageType.CLIENT_MAINTENANCE_ENABLE : messageType.CLIENT_MAINTENANCE_DISABLE;
        return this.send({ _type, maintenance: reason });
    }

    /**
     * Handles a message received from the child process/worker.
     * @param {*} message Message received
     * @private
     */
    _handleMessage(message) {
        if (!message) return;
        const emit = this.messageHandler.handleMessage(message);
        if(!emit) return;

        let emitMessage;
        if (typeof message === 'object') {
            emitMessage = new IPCMessage(this, message);
            if (emitMessage._sRequest) this.manager.emit('clientRequest', emitMessage);
        } else emitMessage = message;
        /**
         * Emitted upon receiving a message from the child process/worker.
         * @event Shard#message
         * @param {*|IPCMessage} message Message that was received
         */
        this.emit('message', emitMessage);
    }

    /**
     * Handles the cluster's process/worker exiting.
     * @param {boolean} [respawn=this.manager.respawn] Whether to spawn the cluster again
     * @private
     */
    _handleExit(respawn = this.manager.respawn) {
        /**
         * Emitted upon the cluster's child process/worker exiting.
         * @event Cluster#death
         * @param {Child|Worker} process Child process/worker that exited
         */
        this.emit('death', this.thread.process);
        this.manager._debug('[DEATH] Cluster died, attempting respawn | Restarts Left: '+ (this.restarts.max - this.restarts.current), this.id);

        this.ready = false;
        this.thread = null;

        if(this.restarts.current >= this.restarts.max) this.manager._debug('[ATTEMPTED_RESPAWN] Attempted Respawn Declined | Max Restarts have been exceeded', this.id);
        if (respawn && this.restarts.current < this.restarts.max) this.spawn().catch(err => this.emit('error', err));

        this.restarts.append();
    }

    /**
     * Handles the cluster's process/worker error.
     * @param {object} [error] the error, which occurred on the worker/child process
     * @private
     */
    _handleError(error) {
        /**
         * Emitted upon the cluster's child process/worker error.
         * @event Cluster#error
         * @param {Child|Worker} process Child process/worker, where error occurred
         */
        this.manager.emit('error', error);
    }
}

module.exports = Cluster;