BaseNetwork API
NoFlo - Flow-Based Programming for JavaScript
(c) 2013-2018 Flowhub UG
(c) 2011-2012 Henri Bergius, Nemein
NoFlo may be freely distributed under the MIT license
/* eslint-disable
no-param-reassign,
no-underscore-dangle,
import/prefer-default-export,
*/
import { EventEmitter } from 'events';
import * as internalSocket from './InternalSocket';
import { ComponentLoader } from './ComponentLoader';
import { debounce } from './Utils';
import IP from './IP';
import { deprecated, isBrowser, makeAsync } from './Platform';
/**
* @typedef NetworkProcess
* @property {string} id
* @property {string} [componentName]
* @property {import("./Component").Component} [component]
*/
/**
* @typedef NetworkIIP
* @property {internalSocket.InternalSocket} socket
* @property {any} data
*/
/**
* @typedef NetworkEvent
* @property {string} type
* @property {Object} payload
*/
/**
* @param {internalSocket.InternalSocket} socket
* @param {NetworkProcess} process
* @param {string} port
* @param {number|void} index
* @param {boolean} inbound
* @returns {Promise<internalSocket.InternalSocket>}
*/
function connectPort(socket, process, port, index, inbound) {
if (inbound) {
socket.to = {
process,
port,
index,
};
if (!process.component
|| !process.component.inPorts
|| !process.component.inPorts.ports[port]) {
return Promise.reject(new Error(`No inport '${port}' defined in process ${process.id} (${socket.getId()})`));
}
if (process.component.inPorts.ports[port].isAddressable()) {
process.component.inPorts.ports[port].attach(socket, index);
return Promise.resolve(socket);
}
process.component.inPorts.ports[port].attach(socket);
return Promise.resolve(socket);
}
socket.from = {
process,
port,
index,
};
if (!process.component
|| !process.component.outPorts
|| !process.component.outPorts.ports[port]) {
return Promise.reject(new Error(`No outport '${port}' defined in process ${process.id} (${socket.getId()})`));
}
if (process.component.outPorts.ports[port].isAddressable()) {
process.component.outPorts.ports[port].attach(socket, index);
return Promise.resolve(socket);
}
process.component.outPorts.ports[port].attach(socket);
return Promise.resolve(socket);
}
/**
* @typedef NetworkOwnOptions
* @property {string} [baseDir] - Project base directory for component loading
* @property {ComponentLoader} [componentLoader] - Component loader instance to use, if any
* @property {Object} [flowtrace] - Flowtrace instance to use for tracing this network run
*/
/**
* @typedef { NetworkOwnOptions & import("./ComponentLoader").ComponentLoaderOptions} NetworkOptions
*/
The NoFlo network coordinator
NoFlo networks consist of processes connected to each other via sockets attached from outports to inports.
The role of the network coordinator is to take a graph and instantiate all the necessary processes from the designated components, attach sockets between them, and handle the sending of Initial Information Packets.
export class BaseNetwork extends EventEmitter {
/**
* All NoFlo networks are instantiated with a graph. Upon instantiation
* they will load all the needed components, instantiate them, and
* set up the defined connections and IIPs.
*
* @param {import("fbp-graph").Graph} graph - Graph definition to build a Network for
* @param {NetworkOptions} options - Network options
*/
constructor(graph, options = {}) {
super();
this.options = options;
Processes contains all the instantiated components for this network
/** @type {Object<string, NetworkProcess>} */
this.processes = {};
Connections contains all the socket connections in the network
/** @type {Array<internalSocket.InternalSocket>} */
this.connections = [];
Initials contains all Initial Information Packets (IIPs)
/** @type {Array<NetworkIIP>} */
this.initials = [];
/** @type {Array<NetworkIIP>} */
this.nextInitials = [];
Container to hold sockets that will be sending default data.
/** @type {Array<import("./InternalSocket").InternalSocket>} */
this.defaults = [];
The Graph this network is instantiated with
this.graph = graph;
this.started = false;
this.stopped = true;
this.debug = true;
/** @type {Array<NetworkEvent>} */
this.eventBuffer = [];
On Node.js we default the baseDir for component loading to the current working directory
if (graph.properties.baseDir && !options.baseDir) {
deprecated('Passing baseDir via Graph properties is deprecated, pass via Network options instead');
}
this.baseDir = null;
if (!isBrowser()) {
this.baseDir = options.baseDir || graph.properties.baseDir || process.cwd();
On browser we default the baseDir to the Component loading root
} else {
this.baseDir = options.baseDir || graph.properties.baseDir || '/';
}
As most NoFlo networks are long-running processes, the network coordinator marks down the start-up time. This way we can calculate the uptime of the network.
/** @type {Date | null} */
this.startupDate = null;
Initialize a Component Loader for the network
if (options.componentLoader) {
/** @type {ComponentLoader} */
this.loader = options.componentLoader;
} else if (graph.properties.componentLoader) {
deprecated('Passing componentLoader via Graph properties is deprecated, pass via Network options instead');
/** @type {ComponentLoader} */
this.loader = graph.properties.componentLoader;
} else {
/** @type {ComponentLoader} */
this.loader = new ComponentLoader(this.baseDir, this.options);
}
Enable Flowtrace for this network, when available
this.flowtraceName = null;
this.setFlowtrace(options.flowtrace || false, null);
}
The uptime of the network is the current time minus the start-up time, in seconds.
/**
* @returns {number}
*/
uptime() {
if (!this.startupDate) {
return 0;
}
return Date.now() - this.startupDate.getTime();
}
/**
* @returns {string[]}
*/
getActiveProcesses() {
/** @type {Array<string>} */
const active = [];
if (!this.started) { return active; }
Object.keys(this.processes).forEach((name) => {
const process = this.processes[name];
if (!process || !process.component) {
return;
}
if (process.component.load > 0) {
Modern component with load
active.push(name);
}
if (process.component.__openConnections > 0) {
Legacy component
active.push(name);
}
});
return active;
}
/**
* @param {string} event
* @param {any} payload
* @private
*/
traceEvent(event, payload) {
if (!this.flowtrace) {
return;
}
if (this.flowtraceName && this.flowtraceName !== this.flowtrace.mainGraph) {
Let main graph log all events from subgraphs
return;
}
switch (event) {
case 'ip': {
let type = 'data';
if (payload.type === 'openBracket') {
type = 'begingroup';
} else if (payload.type === 'closeBracket') {
type = 'endgroup';
}
const src = payload.socket.from ? {
node: payload.socket.from.process.id,
port: payload.socket.from.port,
} : null;
const tgt = payload.socket.to ? {
node: payload.socket.to.process.id,
port: payload.socket.to.port,
} : null;
this.flowtrace.addNetworkPacket(
`network:${type}`,
src,
tgt,
this.flowtraceName,
{
subgraph: payload.subgraph,
group: payload.group,
datatype: payload.datatype,
schema: payload.schema,
data: payload.data,
},
);
break;
}
case 'start': {
this.flowtrace.addNetworkStarted(this.flowtraceName);
break;
}
case 'end': {
this.flowtrace.addNetworkStopped(this.flowtraceName);
break;
}
case 'error': {
this.flowtrace.addNetworkError(this.flowtraceName, payload);
break;
}
default: {
No default handler
}
}
}
/**
* @param {string} event
* @param {any} payload
* @protected
*/
bufferedEmit(event, payload) {
Add the event to Flowtrace immediately
this.traceEvent(event, payload);
Errors get emitted immediately, like does network end
if (['icon', 'error', 'process-error', 'end'].includes(event)) {
this.emit(event, payload);
return;
}
if (!this.isStarted() && (event !== 'end')) {
this.eventBuffer.push({
type: event,
payload,
});
return;
}
this.emit(event, payload);
if (event === 'start') {
Once network has started we can send the IP-related events
this.eventBuffer.forEach((ev) => {
this.emit(ev.type, ev.payload);
});
this.eventBuffer = [];
}
if (event === 'ip') {
Emit also the legacy events from IP
switch (payload.type) {
case 'openBracket':
this.bufferedEmit('begingroup', payload);
return;
case 'closeBracket':
this.bufferedEmit('endgroup', payload);
return;
case 'data':
this.bufferedEmit('data', payload);
break;
default:
}
}
}
Loading components
Components can be passed to the NoFlo network in two ways:
- As direct, instantiated JavaScript objects
- As filenames
/**
* @callback ComponentLoadCallback
* @param {Error|null} err
* @param {import("./Component").Component} [component]
* @returns {void}
*/
/**
* @param {string} component
* @param {import("fbp-graph/lib/Types").GraphNodeMetadata} metadata
* @param {ComponentLoadCallback} [callback]
* @returns {Promise<import("./Component").Component>}
*/
load(component, metadata, callback) {
const promise = this.loader.load(component, metadata);
if (callback) {
deprecated('Providing a callback to Network.load is deprecated, use Promises');
promise.then((instance) => {
callback(null, instance);
}, callback);
}
return promise;
}
Add a process to the network
Processes can be added to a network at either start-up time or later. The processes are added with a node definition object that includes the following properties:
id
: Identifier of the process in the network. Typically a stringcomponent
: Filename or path of a NoFlo component, or a component instance object
/**
* @callback AddNodeCallback
* @param {Error|null} error
* @param {NetworkProcess} [process]
* @returns {void}
*/
/**
* @param {import("fbp-graph/lib/Types").GraphNode} node
* @param {Object} options
* @param {AddNodeCallback} [callback]
* @returns {Promise<NetworkProcess>}
*/
addNode(node, options, callback) {
if (typeof options === 'function') {
callback = /** @type {AddNodeCallback} */ (options);
options = {};
}
let promise;
Processes are treated as singletons by their identifier. If we already have a process with the given ID, return that.
if (this.processes[node.id]) {
promise = Promise.resolve(this.processes[node.id]);
} else {
/** @type {NetworkProcess} */
const process = { id: node.id };
No component defined, just register the process but don’t start.
if (!node.component) {
this.processes[process.id] = process;
promise = Promise.resolve(process);
} else {
Load the component for the process.
promise = this.load(node.component, node.metadata)
.then((instance) => {
instance.nodeId = node.id;
process.component = instance;
process.componentName = node.component;
Inform the ports of the node name
const inPorts = process.component.inPorts.ports;
const outPorts = process.component.outPorts.ports;
Object.keys(inPorts).forEach((name) => {
const port = inPorts[name];
port.node = node.id;
port.nodeInstance = instance;
port.name = name;
});
Object.keys(outPorts).forEach((name) => {
const port = outPorts[name];
port.node = node.id;
port.nodeInstance = instance;
port.name = name;
});
if (instance.isSubgraph()) {
this.subscribeSubgraph(process);
}
this.subscribeNode(process);
Store and return the process instance
this.processes[process.id] = process;
return process;
});
}
}
if (callback) {
deprecated('Providing a callback to Network.addNode is deprecated, use Promises');
promise.then((process) => {
callback(null, process);
}, callback);
}
return promise;
}
/**
* @param {import("fbp-graph/lib/Types").GraphNode} node
* @param {ErrorableCallback} [callback]
* @returns {Promise<void>}
*/
removeNode(node, callback) {
let promise;
const process = this.getNode(node.id);
if (!process) {
promise = Promise.reject(new Error(`Node ${node.id} not found`));
} else {
if (!process.component) {
delete this.processes[node.id];
return Promise.resolve();
}
promise = process.component.shutdown()
.then(() => {
delete this.processes[node.id];
return Promise.resolve();
});
}
if (callback) {
deprecated('Providing a callback to Network.removeNode is deprecated, use Promises');
promise.then(() => {
callback(null);
}, callback);
}
return promise;
}
/**
* @param {string} oldId
* @param {string} newId
* @param {ErrorableCallback} [callback]
* @returns {Promise<void>}
*/
renameNode(oldId, newId, callback) {
const process = this.getNode(oldId);
let promise;
if (!process) {
promise = Promise.reject(new Error(`Process ${oldId} not found`));
} else {
Inform the process of its ID
process.id = newId;
if (process.component) {
Inform the ports of the node name
const inPorts = process.component.inPorts.ports;
const outPorts = process.component.outPorts.ports;
Object.keys(inPorts).forEach((name) => {
const port = inPorts[name];
if (!port) { return; }
port.node = newId;
});
Object.keys(outPorts).forEach((name) => {
const port = outPorts[name];
if (!port) { return; }
port.node = newId;
});
}
this.processes[newId] = process;
delete this.processes[oldId];
promise = Promise.resolve();
}
if (callback) {
deprecated('Providing a callback to Network.renameNode is deprecated, use Promises');
promise.then(() => {
callback(null);
}, callback);
}
return promise;
}
Get process by its ID.
/**
* @param {string} id compone
* @returns {NetworkProcess|void}
*/
getNode(id) {
return this.processes[id];
}
/**
* @callback ErrorableCallback
* @param {Error|null} [err]
* @returns {void}
*/
/**
* @param {ErrorableCallback} [callback]
* @returns {Promise<this>}
*/
connect(callback) {
/**
* @param {string} key
* @param {string} method
* @returns {Promise<any>}
*/
const handleAll = (key, method) => this.graph[key]
.reduce((chain, entity) => chain
.then(() => this[method](entity, {
initial: true,
})), Promise.resolve());
const promise = Promise.resolve()
.then(() => handleAll('nodes', 'addNode'))
.then(() => handleAll('edges', 'addEdge'))
.then(() => handleAll('initializers', 'addInitial'))
.then(() => handleAll('nodes', 'addDefaults'))
.then(() => this);
if (callback) {
deprecated('Providing a callback to Network.connect is deprecated, use Promises');
promise.then(() => {
callback(null);
}, callback);
}
return promise;
}
/**
* @private
* @param {NetworkProcess} node
*/
subscribeSubgraph(node) {
if (!node.component) {
return;
}
if (!node.component.isReady()) {
node.component.once('ready', () => {
this.subscribeSubgraph(node);
});
return;
}
const instance = /** @type {import("../components/Graph").Graph} */ (node.component);
if (!instance.network) {
return;
}
instance.network.setDebug(this.debug);
if (this.flowtrace) {
instance.network.setFlowtrace(this.flowtrace, node.componentName, false);
}
/**
* @param {string} type
* @param {any} data
*/
const emitSub = (type, data) => {
if ((type === 'process-error') && (this.listeners('process-error').length === 0)) {
if (data.id && data.metadata && data.error) { throw data.error; }
throw data;
}
if (!data) { data = {}; }
if (data.subgraph) {
if (!data.subgraph.unshift) {
data.subgraph = [data.subgraph];
}
data.subgraph.unshift(node.id);
} else {
data.subgraph = [node.id];
}
this.bufferedEmit(type, data);
};
/**
* @type {IP} data
*/
instance.network.on('ip', (data) => {
emitSub('ip', data);
});
/**
* @type {Error} data
*/
instance.network.on('process-error', (data) => {
emitSub('process-error', data);
});
}
Subscribe to events from all connected sockets and re-emit them
/**
* @param {internalSocket.InternalSocket} socket
* @param {NetworkProcess} [source]
*/
subscribeSocket(socket, source) {
socket.on('ip', (ip) => {
this.bufferedEmit('ip', {
id: socket.getId(),
type: ip.type,
socket,
data: ip.data,
metadata: socket.metadata,
});
});
socket.on('error', (event) => {
if (this.listeners('process-error').length === 0) {
if (event.id && event.metadata && event.error) { throw event.error; }
throw event;
}
this.bufferedEmit('process-error', event);
});
if (!source || !source.component || !source.component.isLegacy()) {
return;
}
const comp = /** @type {import("./Component").Component} */ (source.component);
Handle activation for legacy components via connects/disconnects
socket.on('connect', () => {
if (!comp.__openConnections) {
comp.__openConnections = 0;
}
comp.__openConnections += 1;
});
socket.on('disconnect', () => {
comp.__openConnections -= 1;
if (comp.__openConnections < 0) {
comp.__openConnections = 0;
}
if (comp.__openConnections === 0) {
this.checkIfFinished();
}
});
}
/**
* @param {NetworkProcess} node
*/
subscribeNode(node) {
if (!node.component) {
return;
}
const instance = /** @type {import("./Component").Component} */ (node.component);
instance.on('activate', () => {
if (this.debouncedEnd) { this.abortDebounce = true; }
});
instance.on('deactivate', (load) => {
if (load > 0) { return; }
this.checkIfFinished();
});
if (!instance.getIcon) { return; }
instance.on('icon', () => {
this.bufferedEmit('icon', {
id: node.id,
icon: instance.getIcon(),
});
});
}
/**
* @protected
* @param {string} node
* @param {string} direction
* @returns Promise<NetworkProcess>
*/
ensureNode(node, direction) {
const instance = this.getNode(node);
if (!instance) {
return Promise.reject(new Error(`No process defined for ${direction} node ${node}`));
}
if (!instance.component) {
return Promise.reject(new Error(`No component defined for ${direction} node ${node}`));
}
const comp = /** @type {import("./Component").Component} */ (instance.component);
if (!comp.isReady()) {
return new Promise((resolve) => {
comp.once('ready', () => {
resolve(instance);
});
});
}
return Promise.resolve(instance);
}
/**
* @callback AddEdgeCallback
* @param {Error|null} error
* @param {internalSocket.InternalSocket} [socket]
* @returns {void}
*/
/**
* @param {import("fbp-graph/lib/Types").GraphEdge} edge
* @param {Object} options
* @param {AddEdgeCallback} [callback]
* @returns {Promise<internalSocket.InternalSocket>}
*/
addEdge(edge, options, callback) {
if (typeof options === 'function') {
callback = /** @type {AddEdgeCallback} */ (options);
options = {};
}
const promise = this.ensureNode(edge.from.node, 'outbound')
.then((from) => {
const socket = internalSocket.createSocket(edge.metadata);
socket.setDebug(this.debug);
return this.ensureNode(edge.to.node, 'inbound')
.then((to) => {
Subscribe to events from the socket
this.subscribeSocket(socket, from);
return connectPort(socket, to, edge.to.port, edge.to.index, true);
})
.then(() => connectPort(socket, from, edge.from.port, edge.from.index, false))
.then(() => {
this.connections.push(socket);
return socket;
});
});
if (callback) {
deprecated('Providing a callback to Network.addEdge is deprecated, use Promises');
promise.then((socket) => {
callback(null, socket);
}, callback);
}
return promise;
}
/**
* @param {import("fbp-graph/lib/Types").GraphEdge} edge
* @param {ErrorableCallback} [callback]
* @returns {Promise<void>}
*/
removeEdge(edge, callback) {
this.connections.forEach((connection) => {
if (!connection) { return; }
if ((edge.to.node !== connection.to.process.id) || (edge.to.port !== connection.to.port)) {
return;
}
connection.to.process.component.inPorts[connection.to.port].detach(connection);
if (edge.from.node) {
if (connection.from
&& (edge.from.node === connection.from.process.id)
&& (edge.from.port === connection.from.port)) {
connection.from.process.component.outPorts[connection.from.port].detach(connection);
}
}
this.connections.splice(this.connections.indexOf(connection), 1);
});
if (callback) {
deprecated('Providing a callback to Network.removeEdge is deprecated, use Promises');
callback(null);
}
return Promise.resolve();
}
/**
* @protected
* @param {import("fbp-graph/lib/Types").GraphNode} node
* @returns {Promise<void>}
*/
addDefaults(node) {
return this.ensureNode(node.id, 'inbound')
.then((process) => Promise.all(Object.keys(process.component.inPorts.ports).map((key) => {
Attach a socket to any defaulted inPorts as long as they aren’t already attached.
const port = process.component.inPorts.ports[key];
if (!port.hasDefault() || port.isAttached()) {
return Promise.resolve();
}
const socket = internalSocket.createSocket();
socket.setDebug(this.debug);
Subscribe to events from the socket
this.subscribeSocket(socket);
return connectPort(socket, process, key, undefined, true)
.then(() => {
this.connections.push(socket);
this.defaults.push(socket);
});
})))
.then(() => {});
}
/**
* @param {import("fbp-graph/lib/Types").GraphIIP} initializer
* @param {Object} options
* @param {AddEdgeCallback} [callback]
* @returns {Promise<internalSocket.InternalSocket>}
*/
addInitial(initializer, options, callback) {
if (typeof options === 'function') {
callback = /** @type {AddEdgeCallback} */ (options);
options = {};
}
const promise = this.ensureNode(initializer.to.node, 'inbound')
.then((to) => {
const socket = internalSocket.createSocket(initializer.metadata);
socket.setDebug(this.debug);
Subscribe to events from the socket
this.subscribeSocket(socket);
return connectPort(socket, to, initializer.to.port, initializer.to.index, true);
})
.then((socket) => {
this.connections.push(socket);
const init = {
socket,
data: initializer.from.data,
};
this.initials.push(init);
this.nextInitials.push(init);
if (this.isRunning()) {
Network is running now, send initials immediately
(this.sendInitials)();
} else if (!this.isStopped()) {
Network has finished but hasn’t been stopped, set started and set
this.setStarted(true);
(this.sendInitials)();
}
return socket;
});
if (callback) {
deprecated('Providing a callback to Network.addInitial is deprecated, use Promises');
promise.then((socket) => {
callback(null, socket);
}, callback);
}
return promise;
}
/**
* @param {import("fbp-graph/lib/Types").GraphIIP} initializer
* @param {ErrorableCallback} [callback]
* @returns {Promise<void>}
*/
removeInitial(initializer, callback) {
this.connections.forEach((connection) => {
if (!connection) { return; }
if ((initializer.to.node !== connection.to.process.id)
|| (initializer.to.port !== connection.to.port)) {
return;
}
connection.to.process.component.inPorts[connection.to.port].detach(connection);
this.connections.splice(this.connections.indexOf(connection), 1);
for (let i = 0; i < this.initials.length; i += 1) {
const init = this.initials[i];
if (!init) { return; }
if (init.socket !== connection) { return; }
this.initials.splice(this.initials.indexOf(init), 1);
}
for (let i = 0; i < this.nextInitials.length; i += 1) {
const init = this.nextInitials[i];
if (!init) { return; }
if (init.socket !== connection) { return; }
this.nextInitials.splice(this.nextInitials.indexOf(init), 1);
}
});
if (callback) {
deprecated('Providing a callback to Network.removeInitial is deprecated, use Promises');
callback(null);
}
return Promise.resolve();
}
/**
* @returns Promise<void>
*/
sendInitials() {
return new Promise((resolve) => {
makeAsync(resolve);
})
.then(() => this.initials.reduce((chain, initial) => chain
.then(() => {
initial.socket.post(new IP('data', initial.data, {
initial: true,
}));
return Promise.resolve();
}), Promise.resolve()))
.then(() => {
Clear the list of initials to still be sent
this.initials = [];
return Promise.resolve();
});
}
isStarted() {
return this.started;
}
isStopped() {
return this.stopped;
}
isRunning() {
return this.getActiveProcesses().length > 0;
}
/**
* @protected
* @returns {Promise<void>}
*/
startComponents() {
if (!this.processes || !Object.keys(this.processes).length) {
return Promise.resolve();
}
Perform any startup routines necessary for every component.
return Promise.all(Object.keys(this.processes).map((id) => {
const process = this.processes[id];
if (!process.component) {
return Promise.resolve();
}
return process.component.start();
}))
.then(() => {});
}
/**
* @returns Promise<void>
*/
sendDefaults() {
return Promise.all(this.defaults.map((socket) => {
Don’t send defaults if more than one socket is present on the port. This case should only happen when a subgraph is created as a component as its network is instantiated and its inputs are serialized before a socket is attached from the “parent” graph.
if (socket.to.process.component.inPorts[socket.to.port].sockets.length !== 1) {
return Promise.resolve();
}
socket.connect();
socket.send();
socket.disconnect();
return Promise.resolve();
}))
.then(() => {});
}
/**
* @param {ErrorableCallback} [callback]
* @returns {Promise<this>}
*/
start(callback) {
if (this.debouncedEnd) {
this.abortDebounce = true;
}
let promise;
if (this.started) {
promise = this.stop()
.then(() => this.start());
} else {
this.initials = this.nextInitials.slice(0);
this.eventBuffer = [];
promise = this.startComponents()
.then(() => this.sendInitials())
.then(() => this.sendDefaults())
.then(() => {
this.setStarted(true);
return Promise.resolve(this);
});
}
if (callback) {
deprecated('Providing a callback to Network.start is deprecated, use Promises');
promise.then(() => {
callback(null);
}, callback);
}
return promise;
}
/**
* @param {ErrorableCallback} [callback]
* @returns {Promise<this>}
*/
stop(callback) {
if (this.debouncedEnd) {
this.abortDebounce = true;
}
let promise;
if (!this.started) {
this.stopped = true;
promise = Promise.resolve(this);
} else {
Disconnect all connections
this.connections.forEach((connection) => {
if (!connection.isConnected()) {
return;
}
connection.disconnect();
});
if (!this.processes || !Object.keys(this.processes).length) {
No processes to stop
this.setStarted(false);
this.stopped = true;
promise = Promise.resolve(this);
} else {
Emit stop event when all processes are stopped
promise = Promise.all(Object.keys(this.processes)
.map((id) => {
if (!this.processes[id].component) {
return Promise.resolve();
}
eslint-disable-next-line max-len
const comp = /** @type {import("./Component").Component} */ (this.processes[id].component);
return comp.shutdown();
}))
.then(() => {
this.setStarted(false);
this.stopped = true;
return Promise.resolve(this);
});
}
}
if (callback) {
deprecated('Providing a callback to Network.stop is deprecated, use Promises');
promise.then(() => {
callback(null);
}, callback);
}
return promise;
}
/**
* @param {boolean} started
*/
setStarted(started) {
if (this.started === started) { return; }
if (!started) {
Ending the execution
this.started = false;
this.bufferedEmit('end', {
start: this.startupDate,
end: new Date(),
uptime: this.uptime(),
});
return;
}
Starting the execution
if (!this.startupDate) {
this.startupDate = new Date();
}
this.started = true;
this.stopped = false;
this.bufferedEmit('start',
{ start: this.startupDate });
}
checkIfFinished() {
if (this.isRunning()) { return; }
delete this.abortDebounce;
if (!this.debouncedEnd) {
this.debouncedEnd = debounce(() => {
if (this.abortDebounce) { return; }
if (this.isRunning()) { return; }
this.setStarted(false);
}, 50);
}
(this.debouncedEnd)();
}
getDebug() {
return this.debug;
}
/**
* @param {boolean} active
*/
setDebug(active) {
if (active === this.debug) { return; }
this.debug = active;
this.connections.forEach((socket) => {
socket.setDebug(active);
});
Object.keys(this.processes).forEach((processId) => {
const process = this.processes[processId];
if (!process.component) {
return;
}
const instance = process.component;
if (instance.isSubgraph()) {
const inst = /** @type {import("../components/Graph").Graph} */ (instance);
inst.network.setDebug(active);
}
});
}
/**
* @param {Object|null} flowtrace
* @param {string|null} [name]
* @param {boolean} [main]
*/
setFlowtrace(flowtrace, name = null, main = true) {
if (!flowtrace) {
this.flowtraceName = null;
this.flowtrace = null;
return;
}
if (this.flowtrace) {
We already have a tracer
return;
}
this.flowtrace = flowtrace;
this.flowtraceName = name || this.graph.name;
this.flowtrace.addGraph(this.flowtraceName, this.graph, main);
Object.keys(this.processes).forEach((nodeId) => {
Register existing subgraphs
const node = this.processes[nodeId];
const inst = /** @type {import("../components/Graph").Graph} */ (node.component);
if (!inst.isSubgraph() || !inst.network) {
return;
}
inst.network.setFlowtrace(this.flowtrace, node.componentName, false);
});
}
}
This page contains documentation generated automatically from NoFlo's BaseNetwork.js file.