Core_ClusterClient.js

// @ts-check
const { IPCMessage, BaseMessage } = require('../Structures/IPCMessage.js');
const Util = require('../Util/Util.js');
const { Events, messageType } = require('../Util/Constants.js');

const { WorkerClient } = require('../Structures/Worker.js');
const { ChildClient } = require('../Structures/Child.js');
const { ClusterClientHandler } = require('../Structures/IPCHandler.js');
const PromiseHandler = require('../Structures/PromiseHandler.js');

const EventEmitter = require('events');
///communicates between the master workers and the process
class ClusterClient extends EventEmitter {
    /**
     * @param {Client} client Client of the current cluster
     */
    constructor(client) {
        super();
        /**
         * Client for the Cluster
         * @type {Client}
         */
        this.client = client;

        /**
         * Mode the Cluster was spawned with
         * @type {ClusterManagerMode}
         */
        this.mode = this.info.CLUSTER_MANAGER_MODE;
        let mode = this.mode;

        /**
         * If the Cluster is spawned automatically or with a own controller
         * @type {Object}
         */
        this.queue = {
            mode: this.info.CLUSTER_QUEUE_MODE,
        };

        /**
         * If the Cluster is under maintenance
         * @type {String}
         */
        this.maintenance = this.info.MAINTENANCE;
        if(this.maintenance === 'undefined') this.maintenance = false;
        if(!this.maintenance) {
            // Wait 100ms so listener can be added
           setTimeout(() => this.triggerClusterReady() , 100);
        }

        this.ready = false;

        this.process = null;

        if (mode === 'process') this.process = new ChildClient(this);
        else if (mode === 'worker') this.process = new WorkerClient(this);

        this.messageHandler = new ClusterClientHandler(this, this.process);

        this.promise = new PromiseHandler();

        this.process.ipc.on('message', this._handleMessage.bind(this));
        client.on?.('ready', () => {
            this.triggerReady();
        });
    }
    /**
     * cluster's id
     * @type {number}
     * @readonly
     */
    get id() {
        return this.info.CLUSTER;
    }
    /**
     * Array of shard IDs of this client
     * @type {Map<number, any>}
     * @readonly
     */
    get ids() {
        if (!this.client.ws) return this.info.SHARD_LIST;
        return this.client.ws.shards;
    }
    /**
     * Total number of clusters
     * @type {number}
     * @readonly
     */
    get count() {
        return this.info.CLUSTER_COUNT;
    }
    /**
     * Gets several Info like Cluster_Count, Number, Total shards...
     * @type {object}
     * @readonly
     */
    get info() {
        return ClusterClient.getInfo();
    }
    /**
     * Sends a message to the master process.
     * @param {*} message Message to send
     * @returns {Promise<void>}
     * @fires Cluster#message
     */
    send(message) {
        if (typeof message === 'object') message = new BaseMessage(message).toJSON();
        return this.process.send(message);
    }
    /**
     * Fetches a client property value of each cluster, or a given cluster.
     * @param {string} prop Name of the client property to get, using periods for nesting
     * @param {number} [cluster] Cluster to fetch property from, all if undefined
     * @returns {Promise<*>|Promise<Array<*>>}
     * @example
     * client.cluster.fetchClientValues('guilds.cache.size')
     *   .then(results => console.log(`${results.reduce((prev, val) => prev + val, 0)} total guilds`))
     *   .catch(console.error);
     * @see {@link ClusterManager#fetchClientValues}
     */
    fetchClientValues(prop, cluster) {
        return this.broadcastEval(`this.${prop}`, {cluster});
    }

    /**
     * Evaluates a script or function on the Cluster Manager
     * @param {string|Function} script JavaScript to run on the Manager
     * @param {object} options Some options such as the Eval timeout or the Context
     * @param {number} [options.timeout=10000] The time in ms to wait, until the eval will be rejected without any response
     * @param {any} [options.context] The context to pass to the script, when providing functions
     * @returns {Promise<*>|Promise<Array<*>>} Result of the script execution
     * @example
     * client.cluster.evalOnManager('process.uptime')
     *   .then(result => console.log(result))
     *   .catch(console.error);
     * @see {@link ClusterManager#evalOnManager}
     */
    async evalOnManager(script, options = {}) {
        options._type = messageType.CLIENT_MANAGER_EVAL_REQUEST
        return await this.broadcastEval(script, options);
    }

    async evalOnCluster(script, options = {}) {
        return await this.broadcastEval(script, options);
    }

    /**
     * Evaluates a script or function on all clusters, or a given cluster, in the context of the {@link Client}s.
     * @param {string|Function} script JavaScript to run on each cluster
     * @param {object} options Some options such as the TargetCluster or the Eval timeout
     * @param {number} [options.context] The Context to pass to the eval script
     * @param {number} [options.cluster] The Id od the target Cluster
     * @param {number} [options.shard] The Id od the target Shard, when the Cluster has not been provided.
     * @param {number} [options.guildId] The Id od the guild the cluster is in, when the Cluster has not been provided.
     * @param {number} [options.timeout=10000] The time in ms to wait, until the eval will be rejected without any response
     * @returns {Promise<*>|Promise<Array<*>>} Results of the script execution
     * @example
     * client.cluster.broadcastEval('this.guilds.cache.size')
     *   .then(results => console.log(`${results.reduce((prev, val) => prev + val, 0)} total guilds`))
     *   .catch(console.error);
     * @see {@link ClusterManager#broadcastEval}
     */
    async broadcastEval(script, options = {}) {
        if (!script || (typeof script !== 'string' && typeof script !== 'function'))
          throw new TypeError(
            'Script for BroadcastEvaling has not been provided or must be a valid String/Function!',
          );
        script = typeof script === 'function' ? `(${script})(this, ${JSON.stringify(options.context)})` : script;
        const nonce = Util.generateNonce();
        const message = {nonce, _eval: script, options, _type: options._type || messageType.CLIENT_BROADCAST_REQUEST};
        await this.send(message);

        return await this.promise.create(message);
    }
    /**
     * Sends a Request to the ParentCluster and returns the reply
     * @param {Object} message Message, which should be sent as request
     * @returns {Promise<*>} Reply of the Message
     * @example
     * client.cluster.request({content: 'hello'})
     *   .then(result => console.log(result)) //hi
     *   .catch(console.error);
     * @see {@link IPCMessage#reply}
     */
    request(message = {}) {
        message._sRequest = true;
        message._sReply = false;
        message._type = messageType.CUSTOM_REQUEST;
        this.send(message);
        return this.promise.create(message);
    }

    /**
     * Requests a respawn of all clusters.
     * @param {ClusterRespawnOptions} [options] Options for respawning shards
     * @returns {Promise<void>} Resolves upon the message being sent
     * @see {@link ClusterManager#respawnAll}
     */
    respawnAll({ clusterDelay = 5000, respawnDelay = 7000, timeout = 30000 } = {}) {
        return this.send({ _type: messageType.CLIENT_RESPAWN_ALL , options: { clusterDelay, respawnDelay, timeout } });
    }

    /**
     * Handles an IPC message.
     * @param {*} message Message received
     * @private
     */
    async _handleMessage(message) {
        if (!message) return;
        const emit = await this.messageHandler.handleMessage(message);
        if(!emit) return;
        let emitMessage;
        if (typeof message === 'object') emitMessage = new IPCMessage(this, message);
        else emitMessage = message;
        /**
         * Emitted upon receiving a message from the parent process/worker.
         * @event ClusterClient#message
         * @param {*} message Message that was received
         */
        this.emit('message', emitMessage);
    }

    async _eval(script) {
        if (this.client._eval) {
            return await this.client._eval(script);
        }
        this.client._eval = function (_) {
            return eval(_);
        }.bind(this.client);
        return await this.client._eval(script);
    }

    /**
     * Sends a message to the master process, emitting an error from the client upon failure.
     * @param {string} type Type of response to send
     * @param {*} message Message to send, which can be a Object or a String
     * @private
     */
    _respond(type, message) {
        this.send(message).catch(err => {
            let error = { err };

            error.message = `Error when sending ${type} response to master process: ${err.message}`;
            /**
             * Emitted when the client encounters an error.
             * @event Client#error
             * @param {Error} error The error encountered
             */
            this.client.emit?.(Events.ERROR, error);
        });
    }

    // Hooks
    triggerReady() {
        this.process.send({ _type: messageType.CLIENT_READY });
        this.ready = true;
        return this.ready;
    }

    triggerClusterReady() {
        return this.emit('ready', this);
    }

    /**
     * 
     * @param {String} maintenance Whether the cluster should opt in maintenance when a reason was provided or opt-out when no reason was provided.
     * @param {Boolean} all Whether to target it on all clusters or just the current one.
     * @returns {String} The maintenance status of the cluster.
     */
    triggerMaintenance(maintenance, all = false) {
        let _type = messageType.CLIENT_MAINTENANCE;
        if(all) _type = messageType.CLIENT_MAINTENANCE_ALL;
        this.process.send({ _type, maintenance });
        this.maintenance = maintenance;
        return this.maintenance;
    }

    /**
     * Manually spawn the next cluster, when queue mode is on 'manual'
     * @returns {Promise<*>}
     */
    spawnNextCluster() {
        if (this.queue.mode === 'auto')
            throw new Error('Next Cluster can just be spawned when the queue is not on auto mode.');
        return this.process.send({ _type: messageType.CLIENT_SPAWN_NEXT_CLUSTER});
    }

    /**
     * gets the total Internal shard count and shard list.
     * @returns {Object}
     */
    static getInfo() {
        let clusterMode = process.env.CLUSTER_MANAGER_MODE;
        if (!clusterMode) return;
        if (clusterMode !== 'worker' && clusterMode !== 'process')
            throw new Error('NO CHILD/MASTER EXISTS OR SUPPLIED CLUSTER_MANAGER_MODE IS INCORRECT');
        let data;
        if (clusterMode === 'process') {
            const shardList = [];
            let parseShardList = process.env.SHARD_LIST.split(',');
            parseShardList.forEach(c => shardList.push(Number(c)));
            data = {
                SHARD_LIST: shardList,
                TOTAL_SHARDS: Number(process.env.TOTAL_SHARDS),
                CLUSTER_COUNT: Number(process.env.CLUSTER_COUNT),
                CLUSTER: Number(process.env.CLUSTER),
                CLUSTER_MANAGER_MODE: clusterMode,
                MAINTENANCE: process.env.MAINTENANCE,
                CLUSTER_QUEUE_MODE: process.env.CLUSTER_QUEUE_MODE,
            };
        } else {
            data = require('worker_threads').workerData;
        }

        data.FIRST_SHARD_ID = data.SHARD_LIST[0];
        data.LAST_SHARD_ID = data.SHARD_LIST[data.SHARD_LIST.length - 1];

        return data;
    }
}
module.exports = ClusterClient;