Component API
NoFlo - Flow-Based Programming for JavaScript
(c) 2013-2017 Flowhub UG
(c) 2011-2012 Henri Bergius, Nemein
NoFlo may be freely distributed under the MIT license
/* eslint-disable
class-methods-use-this,
no-underscore-dangle,
import/prefer-default-export,
*/
import { EventEmitter } from 'events';
import debug from 'debug';
import { InPorts, OutPorts, normalizePortName } from './Ports';
import { deprecated } from './Platform';
import InPort from './InPort'; // eslint-disable-line no-unused-vars
import OutPort from './OutPort'; // eslint-disable-line no-unused-vars
import ProcessContext from './ProcessContext';
import ProcessInput from './ProcessInput';
import ProcessOutput from './ProcessOutput';
import IP from './IP'; // eslint-disable-line no-unused-vars
const debugComponent = debug('noflo:component');
const debugBrackets = debug('noflo:component:brackets');
const debugSend = debug('noflo:component:send');
/**
* @callback ProcessingFunction
* @param {ProcessInput} input
* @param {ProcessOutput} output
* @param {ProcessContext} context
* @returns {Promise<any> | void}
*/
/**
* @typedef ComponentOptions
* @property {import("./Ports").InPortsOptions | InPorts} [inPorts] - Inports for the component
* @property {import("./Ports").OutPortsOptions | OutPorts} [outPorts] - Outports for the component
* @property {string} [icon]
* @property {string} [description]
* @property {ProcessingFunction} [options.process] - Component processsing function
* @property {boolean} [ordered] - Whether component should send
* packets in same order it received them
* @property {boolean} [autoOrdering]
* @property {boolean} [activateOnInput] - Whether component should
* activate when it receives packets
* @property {Object<string, Array<string>>} [forwardBrackets] - Mappings of forwarding ports
*/
/**
* @typedef BracketContext
* @property {Object<string,Object>} in
* @property {Object<string,Object>} out
*/
eslint-disable-next-line max-len
/** @typedef {{ __resolved?: boolean, __bracketClosingAfter?: BracketContext[], [key: string]: any }} ProcessResult */
NoFlo Component Base class
The noflo.Component
interface provides a way to instantiate
and extend NoFlo components.
export class Component extends EventEmitter {
/**
* @param {ComponentOptions} [options]
*/
constructor(options = { }) {
super();
const opts = options;
Prepare inports, if any were given in options.
They can also be set up imperatively after component
instantiation by using the component.inPorts.add
method.
if (!opts.inPorts) { opts.inPorts = {}; }
if (opts.inPorts instanceof InPorts) {
this.inPorts = opts.inPorts;
} else {
this.inPorts = new InPorts(opts.inPorts);
}
Prepare outports, if any were given in opts.
They can also be set up imperatively after component
instantiation by using the component.outPorts.add
method.
if (!opts.outPorts) { opts.outPorts = {}; }
if (opts.outPorts instanceof OutPorts) {
this.outPorts = opts.outPorts;
} else {
this.outPorts = new OutPorts(opts.outPorts);
}
Set the default component icon and description
this.icon = opts.icon ? opts.icon : '';
this.description = opts.description ? opts.description : '';
/** @type {string|null} */
this.componentName = null;
/** @type {string|null} */
this.baseDir = null;
Initially the component is not started
this.started = false;
this.load = 0;
Whether the component should keep send packets out in the order they were received
this.ordered = opts.ordered != null ? opts.ordered : false;
this.autoOrdering = opts.autoOrdering != null ? opts.autoOrdering : null;
Queue for handling ordered output packets
/** @type {ProcessResult[]} */
this.outputQ = [];
Context used for bracket forwarding
/** @type {BracketContext} */
this.bracketContext = {
in: {},
out: {},
};
Whether the component should activate when it receives packets
this.activateOnInput = opts.activateOnInput != null ? opts.activateOnInput : true;
Bracket forwarding rules. By default we forward
brackets from in
port to out
and error
ports.
if (!opts.forwardBrackets) {
opts.forwardBrackets = { in: ['out', 'error'] };
}
this.forwardBrackets = opts.forwardBrackets;
The component’s process function can either be
passed in opts, or given imperatively after
instantation using the component.process
method.
if (typeof opts.process === 'function') {
this.process(opts.process);
}
Placeholder for the ID of the current node, populated by NoFlo network
/** @type string | null */
this.nodeId = null;
Deprecated legacy component connection counter
this.__openConnections = 0;
}
getDescription() { return this.description; }
isReady() { return true; }
isSubgraph() { return false; }
/**
* @param {string} icon - Updated icon for the component
*/
setIcon(icon) {
this.icon = icon;
this.emit('icon', this.icon);
}
getIcon() { return this.icon; }
Error emitting helper
If component has an error
outport that is connected, errors
are sent as IP objects there. If the port is not connected,
errors are thrown.
/**
* @param {Error} e
* @param {Array<string>} [groups]
* @param {string} [errorPort]
* @param {string | null} [scope]
*/
error(e, groups = [], errorPort = 'error', scope = null) {
const outPort = /** @type {OutPort} */ (this.outPorts.ports[errorPort]);
if (outPort
&& (outPort.isAttached() || !outPort.isRequired())) {
groups.forEach((group) => {
outPort.openBracket(group, { scope });
});
outPort.data(e, { scope });
groups.forEach((group) => {
outPort.closeBracket(group, { scope });
});
return;
}
throw e;
}
/**
* @callback ErrorableCallback
* @param {Error | null} error
*/
Setup
The setUp method is for component-specific initialization. Called at network start-up.
Override in component implementation to do component-specific setup work.
/**
* @param {ErrorableCallback} callback - Callback for when teardown is ready
* @returns {Promise<void> | void}
*/
setUp(callback) {
callback(null);
}
Teardown
The tearDown method is for component-specific cleanup. Called at network shutdown
Override in component implementation to do component-specific cleanup work, like clearing any accumulated state.
/**
* @param {ErrorableCallback} callback - Callback for when teardown is ready
* @returns {Promise<void> | void}
*/
tearDown(callback) {
callback(null);
}
Start
Called when network starts. This sets calls the setUp method and sets the component to a started state.
/**
* @param {ErrorableCallback} [callback] - Callback for when shutdown is ready
* @returns {Promise<void>}
*/
start(callback) {
let promise;
if (this.isStarted()) {
promise = Promise.resolve();
} else {
promise = new Promise((resolve, reject) => {
const res = this.setUp((err) => {
if (err) {
reject(err);
return;
}
resolve();
});
if (res && res.then) {
setUp returned a Promise
res.then(resolve, reject);
}
})
.then(() => {
this.started = true;
this.emit('start');
return Promise.resolve();
});
}
if (callback) {
deprecated('Providing a callback to Component.start is deprecated, use Promises');
promise.then(() => {
callback(null);
}, callback);
}
return promise;
}
Shutdown
Called when network is shut down. This sets calls the tearDown method and sets the component back to a non-started state.
The callback is called when tearDown finishes and all active processing contexts have ended.
/**
* @param {ErrorableCallback} [callback] - Callback for when shutdown is ready
* @returns {Promise<void>}
*/
shutdown(callback) {
const promise = new Promise((resolve, reject) => {
Tell the component that it is time to shut down
const res = this.tearDown((err) => {
if (err) {
reject(err);
return;
}
resolve();
});
if (res && res.then) {
Teardown returned a Promise
res.then(resolve, reject);
}
})
.then(() => new Promise((resolve) => {
if (this.load > 0) {
Some in-flight processes, wait for them to finish
/**
* @param {number} load
*/
const checkLoad = (load) => {
if (load > 0) {
return;
}
this.removeListener('deactivate', checkLoad);
resolve();
};
this.on('deactivate', checkLoad);
return;
}
resolve();
}))
.then(() => {
Clear contents of inport buffers
const inPorts = this.inPorts.ports || this.inPorts;
Object.keys(inPorts).forEach((portName) => {
const inPort = /** @type {InPort} */ (inPorts[portName]);
if (typeof inPort.clear !== 'function') { return; }
inPort.clear();
});
Clear bracket context
this.bracketContext = {
in: {},
out: {},
};
if (!this.isStarted()) {
return Promise.resolve();
}
this.started = false;
this.emit('end');
return Promise.resolve();
});
if (callback) {
deprecated('Providing a callback to Component.shutdown is deprecated, use Promises');
promise.then(() => {
callback(null);
}, callback);
}
return promise;
}
isStarted() {
return this.started;
}
Ensures bracket forwarding map is correct for the existing ports
prepareForwarding() {
Object.keys(this.forwardBrackets).forEach((inPort) => {
const outPorts = this.forwardBrackets[inPort];
if (!(inPort in this.inPorts.ports)) {
delete this.forwardBrackets[inPort];
return;
}
/** @type {Array<string>} */
const tmp = [];
outPorts.forEach((outPort) => {
if (outPort in this.outPorts.ports) {
tmp.push(outPort);
}
});
if (tmp.length === 0) {
delete this.forwardBrackets[inPort];
} else {
this.forwardBrackets[inPort] = tmp;
}
});
}
Method for determining if a component is using the modern NoFlo Process API
isLegacy() {
Process API
if (this.handle) { return false; }
Legacy
return true;
}
Sets process handler function
/**
* @param {ProcessingFunction} handle - Processing function
* @returns {this}
*/
process(handle) {
if (typeof handle !== 'function') {
throw new Error('Process handler must be a function');
}
if (!this.inPorts) {
throw new Error('Component ports must be defined before process function');
}
this.prepareForwarding();
this.handle = handle;
Object.keys(this.inPorts.ports).forEach((name) => {
const port = /** @type {InPort} */ (this.inPorts.ports[name]);
if (!port.name) { port.name = name; }
port.on('ip', (ip) => this.handleIP(ip, port));
});
return this;
}
Method for checking if a given inport is set up for automatic bracket forwarding
/**
* @param {InPort|string} port
* @returns {boolean}
*/
isForwardingInport(port) {
let portName;
if (typeof port === 'string') {
portName = port;
} else {
portName = port.name;
}
if (portName && portName in this.forwardBrackets) {
return true;
}
return false;
}
Method for checking if a given outport is set up for automatic bracket forwarding
/**
* @param {InPort|string} inport
* @param {OutPort|string} outport
* @returns {boolean}
*/
isForwardingOutport(inport, outport) {
let inportName; let
outportName;
if (typeof inport === 'string') {
inportName = inport;
} else {
inportName = inport.name;
}
if (typeof outport === 'string') {
outportName = outport;
} else {
outportName = outport.name;
}
if (!inportName || !outportName) {
return false;
}
if (!this.forwardBrackets[inportName]) { return false; }
if (this.forwardBrackets[inportName].indexOf(outportName) !== -1) { return true; }
return false;
}
Method for checking whether the component sends packets in the same order they were received.
isOrdered() {
if (this.ordered) { return true; }
if (this.autoOrdering) { return true; }
return false;
}
Handling IP objects
The component has received an Information Packet. Call the processing function so that firing pattern preconditions can be checked and component can do processing as needed.
/**
* @param {IP} ip
* @param {InPort} port
* @returns {void}
*/
handleIP(ip, port) {
if (!port.options.triggering) {
If port is non-triggering, we can skip the process function call
return;
}
if ((ip.type === 'openBracket') && (this.autoOrdering === null) && !this.ordered) {
Switch component to ordered mode when receiving a stream unless auto-ordering is disabled
debugComponent(`${this.nodeId} port '${port.name}' entered auto-ordering mode`);
this.autoOrdering = true;
}
Initialize the result object for situations where output needs to be queued to be kept in order
/** @type {ProcessResult} */
let result = {};
if (this.isForwardingInport(port)) {
For bracket-forwarding inports we need to initialize a bracket context so that brackets can be sent as part of the output, and closed after.
if (ip.type === 'openBracket') {
For forwarding ports openBrackets don’t fire
return;
}
if (ip.type === 'closeBracket') {
For forwarding ports closeBrackets don’t fire However, we need to handle several different scenarios: A. There are closeBrackets in queue before current packet B. There are closeBrackets in queue after current packet C. We’ve queued the results from all in-flight processes and new closeBracket arrives
const buf = port.getBuffer(ip.scope, ip.index);
const dataPackets = buf.filter((p) => p.type === 'data');
if ((this.outputQ.length >= this.load) && (dataPackets.length === 0)) {
if (buf[0] !== ip) { return; }
if (!port.name) { return; }
Remove from buffer
port.get(ip.scope, ip.index);
const bracketCtx = this.getBracketContext('in', port.name, ip.scope, ip.index).pop();
bracketCtx.closeIp = ip;
debugBrackets(`${this.nodeId} closeBracket-C from '${bracketCtx.source}' to ${bracketCtx.ports}: '${ip.data}'`);
result = {
__resolved: true,
__bracketClosingAfter: [bracketCtx],
};
this.outputQ.push(result);
this.processOutputQueue();
}
Check if buffer contains data IPs. If it does, we want to allow firing
if (!dataPackets.length) { return; }
}
}
Prepare the input/output pair
const context = new ProcessContext(ip, this, port, result);
const input = new ProcessInput(this.inPorts, context);
const output = new ProcessOutput(this.outPorts, context);
try {
Call the processing function
if (!this.handle) {
throw new Error('Processing function not defined');
}
const res = this.handle(input, output, context);
if (res && res.then) {
Processing function returned a Promise
res.then(
(data) => output.sendDone(data),
(err) => output.done(err),
);
}
} catch (e) {
this.deactivate(context);
output.sendDone(e);
}
if (context.activated) { return; }
If receiving an IP object didn’t cause the component to activate, log that input conditions were not met
if (port.isAddressable()) {
debugComponent(`${this.nodeId} packet on '${port.name}[${ip.index}]' didn't match preconditions: ${ip.type}`);
return;
}
debugComponent(`${this.nodeId} packet on '${port.name}' didn't match preconditions: ${ip.type}`);
}
Get the current bracket forwarding context for an IP object
/**
* @param {string} type
* @param {string} port
* @param {string|null} scope
* @param {number|null} [idx]
*/
getBracketContext(type, port, scope, idx = null) {
let { name, index } = normalizePortName(port);
if (idx != null) { index = `${idx}`; }
const portsList = type === 'in' ? this.inPorts : this.outPorts;
if (portsList.ports[name].isAddressable()) {
name = `${name}[${index}]`;
} else {
name = port;
}
Ensure we have a bracket context for the current scope
if (!this.bracketContext[type][name]) {
this.bracketContext[type][name] = {};
}
if (!this.bracketContext[type][name][scope]) {
this.bracketContext[type][name][scope] = [];
}
return this.bracketContext[type][name][scope];
}
Add an IP object to the list of results to be sent in order
/**
* @param {ProcessResult} result
* @param {Object} port
* @param {IP} packet
* @param {boolean} [before]
*/
addToResult(result, port, packet, before = false) {
const res = result;
const ip = packet;
const { name, index } = normalizePortName(port);
const method = before ? 'unshift' : 'push';
if (this.outPorts.ports[name].isAddressable()) {
const idx = /** @type {number} */ (index ? parseInt(index, 10) : ip.index);
if (!res[name]) {
res[name] = {};
}
if (!res[name][idx]) {
res[name][idx] = [];
}
ip.index = idx;
res[name][idx][method](ip);
return;
}
if (!res[name]) {
res[name] = [];
}
res[name][method](ip);
}
Get contexts that can be forwarded with this in/outport pair.
/** @private */
getForwardableContexts(inport, outport, contexts) {
const { name, index } = normalizePortName(outport);
const forwardable = [];
contexts.forEach((ctx, idx) => {
No forwarding to this outport
if (!this.isForwardingOutport(inport, name)) { return; }
We have already forwarded this context to this outport
if (ctx.ports.indexOf(outport) !== -1) { return; }
See if we have already forwarded the same bracket from another inport
const outContext = this.getBracketContext('out', name, ctx.ip.scope, parseInt(index, 10))[idx];
if (outContext) {
if ((outContext.ip.data === ctx.ip.data) && (outContext.ports.indexOf(outport) !== -1)) {
return;
}
}
forwardable.push(ctx);
});
return forwardable;
}
Add any bracket forwards needed to the result queue
/** @private */
addBracketForwards(result) {
const res = result;
if (res.__bracketClosingBefore != null ? res.__bracketClosingBefore.length : undefined) {
res.__bracketClosingBefore.forEach((context) => {
debugBrackets(`${this.nodeId} closeBracket-A from '${context.source}' to ${context.ports}: '${context.closeIp.data}'`);
if (!context.ports.length) { return; }
context.ports.forEach((port) => {
const ipClone = context.closeIp.clone();
this.addToResult(res, port, ipClone, true);
this.getBracketContext('out', port, ipClone.scope).pop();
});
});
}
if (res.__bracketContext) {
First see if there are any brackets to forward. We need to reverse the keys so that they get added in correct order
Object.keys(res.__bracketContext).reverse().forEach((inport) => {
const context = res.__bracketContext[inport];
if (!context.length) { return; }
Object.keys(res).forEach((outport) => {
let datas; let forwardedOpens; let unforwarded;
const ips = res[outport];
if (outport.indexOf('__') === 0) { return; }
if (this.outPorts[outport].isAddressable()) {
Object.keys(ips).forEach((idx) => {
Don’t register indexes we’re only sending brackets to
const idxIps = ips[idx];
datas = idxIps.filter((ip) => ip.type === 'data');
if (!datas.length) { return; }
const portIdentifier = `${outport}[${idx}]`;
unforwarded = this.getForwardableContexts(inport, portIdentifier, context);
if (!unforwarded.length) { return; }
forwardedOpens = [];
unforwarded.forEach((ctx) => {
debugBrackets(`${this.nodeId} openBracket from '${inport}' to '${portIdentifier}': '${ctx.ip.data}'`);
const ipClone = ctx.ip.clone();
ipClone.index = parseInt(idx, 10);
forwardedOpens.push(ipClone);
ctx.ports.push(portIdentifier);
this.getBracketContext('out', outport, ctx.ip.scope, ipClone.index).push(ctx);
});
forwardedOpens.reverse();
forwardedOpens.forEach((ip) => { this.addToResult(res, outport, ip, true); });
});
return;
}
Don’t register ports we’re only sending brackets to
datas = ips.filter((ip) => ip.type === 'data');
if (!datas.length) { return; }
unforwarded = this.getForwardableContexts(inport, outport, context);
if (!unforwarded.length) { return; }
forwardedOpens = [];
unforwarded.forEach((ctx) => {
debugBrackets(`${this.nodeId} openBracket from '${inport}' to '${outport}': '${ctx.ip.data}'`);
forwardedOpens.push(ctx.ip.clone());
ctx.ports.push(outport);
this.getBracketContext('out', outport, ctx.ip.scope).push(ctx);
});
forwardedOpens.reverse();
forwardedOpens.forEach((ip) => { this.addToResult(res, outport, ip, true); });
});
});
}
if (res.__bracketClosingAfter != null ? res.__bracketClosingAfter.length : undefined) {
res.__bracketClosingAfter.forEach((context) => {
debugBrackets(`${this.nodeId} closeBracket-B from '${context.source}' to ${context.ports}: '${context.closeIp.data}'`);
if (!context.ports.length) { return; }
context.ports.forEach((port) => {
const ipClone = context.closeIp.clone();
this.addToResult(res, port, ipClone, false);
this.getBracketContext('out', port, ipClone.scope).pop();
});
});
}
delete res.__bracketClosingBefore;
delete res.__bracketContext;
delete res.__bracketClosingAfter;
}
Whenever an execution context finishes, send all resolved output from the queue in the order it is in.
/** @private */
processOutputQueue() {
while (this.outputQ.length > 0) {
if (!this.outputQ[0].__resolved) { break; }
const result = this.outputQ.shift();
this.addBracketForwards(result);
Object.keys(result).forEach((port) => {
let portIdentifier;
const ips = result[port];
if (port.indexOf('__') === 0) { return; }
if (this.outPorts.ports[port].isAddressable()) {
Object.keys(ips).forEach((index) => {
const idxIps = ips[index];
const idx = parseInt(index, 10);
if (!this.outPorts.ports[port].isAttached(idx)) { return; }
idxIps.forEach((packet) => {
const ip = packet;
portIdentifier = `${port}[${ip.index}]`;
if (ip.type === 'openBracket') {
debugSend(`${this.nodeId} sending ${portIdentifier} < '${ip.data}'`);
} else if (ip.type === 'closeBracket') {
debugSend(`${this.nodeId} sending ${portIdentifier} > '${ip.data}'`);
} else {
debugSend(`${this.nodeId} sending ${portIdentifier} DATA`);
}
if (!this.outPorts[port].options.scoped) {
ip.scope = null;
}
this.outPorts[port].sendIP(ip);
});
});
return;
}
if (!this.outPorts.ports[port].isAttached()) { return; }
ips.forEach((packet) => {
const ip = packet;
portIdentifier = port;
if (ip.type === 'openBracket') {
debugSend(`${this.nodeId} sending ${portIdentifier} < '${ip.data}'`);
} else if (ip.type === 'closeBracket') {
debugSend(`${this.nodeId} sending ${portIdentifier} > '${ip.data}'`);
} else {
debugSend(`${this.nodeId} sending ${portIdentifier} DATA`);
}
if (!this.outPorts[port].options.scoped) {
ip.scope = null;
}
this.outPorts[port].sendIP(ip);
});
});
}
}
Signal that component has activated. There may be multiple activated contexts at the same time
/**
* @param {Object} context
* @param {boolean} context.activated
* @param {boolean} context.deactivated
* @param {Object} context.result
*/
activate(context) {
if (context.activated) { return; } // prevent double activation
context.activated = true;
context.deactivated = false;
this.load += 1;
this.emit('activate', this.load);
if (this.ordered || this.autoOrdering) {
this.outputQ.push(context.result);
}
}
Signal that component has deactivated. There may be multiple activated contexts at the same time
/**
* @param {Object} context
* @param {boolean} context.activated
* @param {boolean} context.deactivated
*/
deactivate(context) {
if (context.deactivated) { return; } // prevent double deactivation
context.deactivated = true;
context.activated = false;
if (this.isOrdered()) {
this.processOutputQueue();
}
this.load -= 1;
this.emit('deactivate', this.load);
}
}
Component.description = '';
Component.icon = null;
This page contains documentation generated automatically from NoFlo's Component.js file.