Helpers API
NoFlo - Flow-Based Programming for JavaScript
(c) 2014-2017 Flowhub UG
NoFlo may be freely distributed under the MIT license
var IP, InternalSocket, checkDeprecation, checkWirePatternPreconditions, checkWirePatternPreconditionsInput, checkWirePatternPreconditionsParams, debug, getGroupContext, getInputData, getOutputProxy, handleInputCollation, platform, populateParams, processApiWirePattern, reorderBuffer, setupBracketForwarding, setupControlPorts, setupErrorHandler, setupSendDefaults, utils,
hasProp = {}.hasOwnProperty;
InternalSocket = require('./InternalSocket');
IP = require('./IP');
platform = require('./Platform');
utils = require('./Utils');
debug = require('debug')('noflo:helpers');
NoFlo WirePattern helper
Note: WirePattern is no longer the recommended way to build NoFlo components. Please use Process API instead.
WirePattern makes your component collect data from several inports
and activates a handler proc
only when a tuple from all of these
ports is complete. The signature of handler function is:
proc = (combinedInputData, inputGroups, outputPorts, asyncCallback) ->
With config.forwardGroups = true
it would forward group IPs from
inputs to the output sending them along with the data. This option also
accepts string or array values, if you want to forward groups from specific
port(s) only. By default group forwarding is false
.
substream cannot be interrupted by other packets, which is important when
doing asynchronous processing. In fact, sendStreams
is enabled by default
on all outports when config.async
is true
.
WirePattern supports async proc
handlers. Set config.async = true
and
make sure that proc
accepts callback as 4th parameter and calls it when
async operation completes or fails.
exports.WirePattern = function(component, config, proc) {
var inPorts, outPorts, ref;
In ports
inPorts = 'in' in config ? config.in : 'in';
if (!utils.isArray(inPorts)) {
inPorts = [inPorts];
}
Out ports
outPorts = 'out' in config ? config.out : 'out';
if (!utils.isArray(outPorts)) {
outPorts = [outPorts];
}
if (!('error' in config)) {
Error port
config.error = 'error';
}
if (!('async' in config)) {
For async process
config.async = false;
}
if (!('ordered' in config)) {
Keep correct output order for async mode
config.ordered = true;
}
if (!('group' in config)) {
Group requests by group ID
config.group = false;
}
if (!('field' in config)) {
Group requests by object field
config.field = null;
}
if (!('forwardGroups' in config)) {
Forward group events from specific inputs to the output:
- false: don’t forward anything
- true: forward unique groups of all inputs
- string: forward groups of a specific port only
- array: forward unique groups of inports in the list
config.forwardGroups = false;
}
if (config.forwardGroups) {
if (typeof config.forwardGroups === 'string') {
Collect groups from one and only port?
config.forwardGroups = [config.forwardGroups];
}
if (typeof config.forwardGroups === 'boolean') {
Forward groups from each port?
config.forwardGroups = inPorts;
}
}
if (!('receiveStreams' in config)) {
Receive streams feature
config.receiveStreams = false;
}
if (config.receiveStreams) {
throw new Error('WirePattern receiveStreams is deprecated');
}
if (!('sendStreams' in config)) {
if typeof config.receiveStreams is ‘string’ config.receiveStreams = [ config.receiveStreams ] Send streams feature
config.sendStreams = false;
}
if (config.sendStreams) {
throw new Error('WirePattern sendStreams is deprecated');
}
if (config.async) {
if typeof config.sendStreams is ‘string’ config.sendStreams = [ config.sendStreams ]
config.sendStreams = outPorts;
}
if (!('params' in config)) {
Parameter ports
config.params = [];
}
if (typeof config.params === 'string') {
config.params = [config.params];
}
if (!('name' in config)) {
Node name
config.name = '';
}
if (!('dropInput' in config)) {
Drop premature input before all params are received
config.dropInput = false;
}
Firing policy for addressable ports
if (!('arrayPolicy' in config)) {
config.arrayPolicy = {
in: 'any',
params: 'all'
};
}
config.inPorts = inPorts;
config.outPorts = outPorts;
Warn user of deprecated features
checkDeprecation(config, proc);
Allow users to selectively fall back to legacy WirePattern implementation
if (config.legacy || (typeof process !== "undefined" && process !== null ? (ref = process.env) != null ? ref.NOFLO_WIREPATTERN_LEGACY : void 0 : void 0)) {
platform.deprecated('noflo.helpers.WirePattern legacy mode is deprecated');
}
return processApiWirePattern(component, config, proc);
};
Takes WirePattern configuration of a component and sets up Process API to handle it.
processApiWirePattern = function(component, config, func) {
Make param ports control ports
setupControlPorts(component, config);
Set up sendDefaults function
setupSendDefaults(component);
Set up bracket forwarding rules
setupBracketForwarding(component, config);
component.ordered = config.ordered;
Create the processing function
return component.process(function(input, output, context) {
var data, errorHandler, groups, outProxy, postpone, resume;
Abort unless WirePattern-style preconditions don’t match
if (!checkWirePatternPreconditions(config, input, output)) {
return;
}
Populate component.params from control ports
component.params = populateParams(config, input);
Read input data
data = getInputData(config, input);
Read bracket context of first inport
groups = getGroupContext(component, config.inPorts[0], input);
Produce proxy object wrapping output in legacy-style port API
outProxy = getOutputProxy(config.outPorts, output);
debug("WirePattern Process API call with", data, groups, component.params, context.scope);
postpone = function() {
throw new Error('noflo.helpers.WirePattern postpone is deprecated');
};
resume = function() {
throw new Error('noflo.helpers.WirePattern resume is deprecated');
};
Async WirePattern will call the output.done callback itself
errorHandler = setupErrorHandler(component, config, output);
return func.call(component, data, groups, outProxy, function(err) {
errorHandler();
return output.done(err);
}, postpone, resume, input.scope);
});
};
Provide deprecation warnings on certain more esoteric WirePattern features
checkDeprecation = function(config, func) {
First check the conditions that force us to fall back on legacy WirePattern
if (config.group) {
platform.deprecated('noflo.helpers.WirePattern group option is deprecated. Please port to Process API');
}
if (config.field) {
platform.deprecated('noflo.helpers.WirePattern field option is deprecated. Please port to Process API');
}
Then add deprecation warnings for other unwanted behaviors
if (func.length > 4) {
platform.deprecated('noflo.helpers.WirePattern postpone and resume are deprecated. Please port to Process API');
}
if (!config.async) {
throw new Error('noflo.helpers.WirePattern synchronous is deprecated. Please use async: true');
}
if (func.length < 4) {
throw new Error('noflo.helpers.WirePattern callback doesn\'t use callback argument');
}
if (config.error !== 'error') {
platform.deprecated('noflo.helpers.WirePattern custom error port name is deprecated. Please switch to "error" or port to WirePattern');
}
};
Updates component port definitions to control prots for WirePattern -style params array
setupControlPorts = function(component, config) {
var i, len, param, ref, results;
ref = config.params;
results = [];
for (i = 0, len = ref.length; i < len; i++) {
param = ref[i];
results.push(component.inPorts[param].options.control = true);
}
return results;
};
Sets up Process API bracket forwarding rules for WirePattern configuration
setupBracketForwarding = function(component, config) {
var i, inPort, inPorts, j, len, len1, outPort, ref;
Start with empty bracket forwarding config
component.forwardBrackets = {};
if (!config.forwardGroups) {
return;
}
By default we forward from all inports
inPorts = config.inPorts;
if (utils.isArray(config.forwardGroups)) {
Selective forwarding enabled
inPorts = config.forwardGroups;
}
for (i = 0, len = inPorts.length; i < len; i++) {
inPort = inPorts[i];
component.forwardBrackets[inPort] = [];
ref = config.outPorts;
Forward to all declared outports
for (j = 0, len1 = ref.length; j < len1; j++) {
outPort = ref[j];
component.forwardBrackets[inPort].push(outPort);
}
If component has an error outport, forward there too
if (component.outPorts.error) {
component.forwardBrackets[inPort].push('error');
}
}
};
setupErrorHandler = function(component, config, output) {
var errorHandler, errors, failHandler, sendErrors;
errors = [];
errorHandler = function(e, groups = []) {
platform.deprecated('noflo.helpers.WirePattern error method is deprecated. Please send error to callback instead');
errors.push({
err: e,
groups: groups
});
return component.hasErrors = true;
};
failHandler = function(e = null, groups = []) {
platform.deprecated('noflo.helpers.WirePattern fail method is deprecated. Please send error to callback instead');
if (e) {
errorHandler(e, groups);
}
sendErrors();
return output.done();
};
sendErrors = function() {
if (!errors.length) {
return;
}
if (config.name) {
output.sendIP('error', new IP('openBracket', config.name));
}
errors.forEach(function(e) {
var grp, i, j, len, len1, ref, ref1, results;
ref = e.groups;
for (i = 0, len = ref.length; i < len; i++) {
grp = ref[i];
output.sendIP('error', new IP('openBracket', grp));
}
output.sendIP('error', new IP('data', e.err));
ref1 = e.groups;
results = [];
for (j = 0, len1 = ref1.length; j < len1; j++) {
grp = ref1[j];
results.push(output.sendIP('error', new IP('closeBracket', grp)));
}
return results;
});
if (config.name) {
output.sendIP('error', new IP('closeBracket', config.name));
}
component.hasErrors = false;
return errors = [];
};
component.hasErrors = false;
component.error = errorHandler;
component.fail = failHandler;
return sendErrors;
};
setupSendDefaults = function(component) {
var portsWithDefaults;
portsWithDefaults = Object.keys(component.inPorts.ports).filter(function(p) {
if (!component.inPorts[p].options.control) {
return false;
}
if (!component.inPorts[p].hasDefault()) {
return false;
}
return true;
});
return component.sendDefaults = function() {
platform.deprecated('noflo.helpers.WirePattern sendDefaults method is deprecated. Please start with a Network');
return portsWithDefaults.forEach(function(port) {
var tempSocket;
tempSocket = InternalSocket.createSocket();
component.inPorts[port].attach(tempSocket);
tempSocket.send();
tempSocket.disconnect();
return component.inPorts[port].detach(tempSocket);
});
};
};
populateParams = function(config, input) {
var i, idx, j, len, len1, paramPort, params, ref, ref1;
if (!config.params.length) {
return {};
}
params = {};
ref = config.params;
for (i = 0, len = ref.length; i < len; i++) {
paramPort = ref[i];
if (input.ports[paramPort].isAddressable()) {
params[paramPort] = {};
ref1 = input.attached(paramPort);
for (j = 0, len1 = ref1.length; j < len1; j++) {
idx = ref1[j];
if (!input.hasData([paramPort, idx])) {
continue;
}
params[paramPort][idx] = input.getData([paramPort, idx]);
}
continue;
}
params[paramPort] = input.getData(paramPort);
}
return params;
};
reorderBuffer = function(buffer, matcher) {
var brackets, i, idx, ip, j, len, len1, results, substream, substreamBrackets, substreamIdx;
Move matching IP packet to be first in buffer
Note: the collation mechanism as shown below is not a very nice way to deal with inputs as it messes with input buffer order. Much better to handle collation in a specialized component or to separate flows by scope.
The trick here is to order the input in a way that still allows bracket forwarding to work. So if we want to first process packet B in stream like:
< 1
< 2
A
> 2
< 3
B
> 3
> 1
We need to change the stream to be like:
< 1
< 3
B
> 3
< 2
A
> 2
> 1
substream = null;
brackets = [];
substreamBrackets = [];
for (idx = i = 0, len = buffer.length; i < len; idx = ++i) {
ip = buffer[idx];
if (ip.type === 'openBracket') {
brackets.push(ip.data);
substreamBrackets.push(ip);
continue;
}
if (ip.type === 'closeBracket') {
brackets.pop();
if (substream) {
substream.push(ip);
}
if (substreamBrackets.length) {
substreamBrackets.pop();
}
if (substream && !substreamBrackets.length) {
break;
}
continue;
}
if (!matcher(ip, brackets)) {
Reset substream bracket tracking when we hit data
substreamBrackets = [];
continue;
}
Match found, start tracking the actual substream
substream = substreamBrackets.slice(0);
substream.push(ip);
}
See where in the buffer the matching substream begins
substreamIdx = buffer.indexOf(substream[0]);
No need to reorder if matching packet is already first
if (substreamIdx === 0) {
return;
}
Remove substream from its natural position
buffer.splice(substreamIdx, substream.length);
Place the substream in the beginning
substream.reverse();
results = [];
for (j = 0, len1 = substream.length; j < len1; j++) {
ip = substream[j];
results.push(buffer.unshift(ip));
}
return results;
};
handleInputCollation = function(data, config, input, port, idx) {
var buf;
if (!config.group && !config.field) {
return;
}
if (config.group) {
buf = input.ports[port].getBuffer(input.scope, idx);
reorderBuffer(buf, function(ip, brackets) {
var grp, i, len, ref;
ref = input.collatedBy.brackets;
for (idx = i = 0, len = ref.length; i < len; idx = ++i) {
grp = ref[idx];
if (brackets[idx] !== grp) {
return false;
}
}
return true;
});
}
if (config.field) {
data[config.field] = input.collatedBy.field;
buf = input.ports[port].getBuffer(input.scope, idx);
return reorderBuffer(buf, function(ip) {
return ip.data[config.field] === data[config.field];
});
}
};
getInputData = function(config, input) {
var data, i, idx, j, len, len1, port, ref, ref1;
data = {};
ref = config.inPorts;
for (i = 0, len = ref.length; i < len; i++) {
port = ref[i];
if (input.ports[port].isAddressable()) {
data[port] = {};
ref1 = input.attached(port);
for (j = 0, len1 = ref1.length; j < len1; j++) {
idx = ref1[j];
if (!input.hasData([port, idx])) {
continue;
}
handleInputCollation(data, config, input, port, idx);
data[port][idx] = input.getData([port, idx]);
}
continue;
}
if (!input.hasData(port)) {
continue;
}
handleInputCollation(data, config, input, port);
data[port] = input.getData(port);
}
if (config.inPorts.length === 1) {
return data[config.inPorts[0]];
}
return data;
};
getGroupContext = function(component, port, input) {
var ref, ref1;
if (((ref = input.result.__bracketContext) != null ? ref[port] : void 0) == null) {
return [];
}
if ((ref1 = input.collatedBy) != null ? ref1.brackets : void 0) {
return input.collatedBy.brackets;
}
return input.result.__bracketContext[port].filter(function(c) {
return c.source === port;
}).map(function(c) {
return c.ip.data;
});
};
getOutputProxy = function(ports, output) {
var outProxy;
outProxy = {};
ports.forEach(function(port) {
return outProxy[port] = {
connect: function() {},
beginGroup: function(group, idx) {
var ip;
ip = new IP('openBracket', group);
ip.index = idx;
return output.sendIP(port, ip);
},
send: function(data, idx) {
var ip;
ip = new IP('data', data);
ip.index = idx;
return output.sendIP(port, ip);
},
endGroup: function(group, idx) {
var ip;
ip = new IP('closeBracket', group);
ip.index = idx;
return output.sendIP(port, ip);
},
disconnect: function() {}
};
});
if (ports.length === 1) {
return outProxy[ports[0]];
}
return outProxy;
};
checkWirePatternPreconditions = function(config, input, output) {
var attached, i, idx, inputsOk, j, len, len1, packetsDropped, paramsOk, port, ref;
First check for required params
paramsOk = checkWirePatternPreconditionsParams(config, input);
Then check actual input ports
inputsOk = checkWirePatternPreconditionsInput(config, input);
If input port has data but param requirements are not met, and we’re in dropInput mode, read the data and call done
if (config.dropInput && !paramsOk) {
Drop all received input packets since params are not available
packetsDropped = false;
ref = config.inPorts;
for (i = 0, len = ref.length; i < len; i++) {
port = ref[i];
if (input.ports[port].isAddressable()) {
attached = input.attached(port);
if (!attached.length) {
continue;
}
for (j = 0, len1 = attached.length; j < len1; j++) {
idx = attached[j];
while (input.has([port, idx])) {
packetsDropped = true;
input.get([port, idx]).drop();
}
}
continue;
}
while (input.has(port)) {
packetsDropped = true;
input.get(port).drop();
}
}
if (packetsDropped) {
If we ended up dropping inputs because of missing params, we need to deactivate here
output.done();
}
}
Pass precondition check only if both params and inputs are OK
return inputsOk && paramsOk;
};
checkWirePatternPreconditionsParams = function(config, input) {
var attached, i, len, param, ref, withData;
ref = config.params;
for (i = 0, len = ref.length; i < len; i++) {
param = ref[i];
if (!input.ports[param].isRequired()) {
continue;
}
if (input.ports[param].isAddressable()) {
attached = input.attached(param);
if (!attached.length) {
return false;
}
withData = attached.filter(function(idx) {
return input.hasData([param, idx]);
});
if (config.arrayPolicy.params === 'all') {
if (withData.length !== attached.length) {
return false;
}
continue;
}
if (!withData.length) {
return false;
}
continue;
}
if (!input.hasData(param)) {
return false;
}
}
return true;
};
checkWirePatternPreconditionsInput = function(config, input) {
var attached, bracketsAtPorts, checkBrackets, checkPacket, checkPort, i, len, port, ref, withData;
if (config.group) {
bracketsAtPorts = {};
input.collatedBy = {
brackets: [],
ready: false
};
checkBrackets = function(left, right) {
var bracket, i, idx, len;
for (idx = i = 0, len = left.length; i < len; idx = ++i) {
bracket = left[idx];
if (right[idx] !== bracket) {
return false;
}
}
return true;
};
checkPacket = function(ip, brackets) {
var bracketId, bracketsToCheck;
With data packets we validate bracket matching
bracketsToCheck = brackets.slice(0);
if (config.group instanceof RegExp) {
Basic regexp validation for the brackets
bracketsToCheck = bracketsToCheck.slice(0, 1);
if (!bracketsToCheck.length) {
return false;
}
if (!config.group.test(bracketsToCheck[0])) {
return false;
}
}
if (input.collatedBy.ready) {
We already know what brackets we’re looking for, match
return checkBrackets(input.collatedBy.brackets, bracketsToCheck);
}
bracketId = bracketsToCheck.join(':');
if (!bracketsAtPorts[bracketId]) {
bracketsAtPorts[bracketId] = [];
}
if (bracketsAtPorts[bracketId].indexOf(port) === -1) {
Register that this port had these brackets
bracketsAtPorts[bracketId].push(port);
}
if (config.inPorts.indexOf(port) !== config.inPorts.length - 1) {
To prevent deadlocks we see all bracket sets, and validate if at least one of them matches. This means we return true until the last inport where we actually check.
return true;
}
if (bracketsAtPorts[bracketId].length !== config.inPorts.length) {
Brackets that are not in every port are invalid
return false;
}
if (input.collatedBy.ready) {
return false;
}
input.collatedBy.ready = true;
input.collatedBy.brackets = bracketsToCheck;
return true;
};
}
if (config.field) {
input.collatedBy = {
field: void 0,
ready: false
};
}
checkPort = function(port) {
var buf, dataBrackets, hasData, hasMatching, i, ip, len, portBrackets;
if (!config.group && !config.field) {
Without collation rules any data packet is OK
return input.hasData(port);
}
With collation rules set we need can only work when we have full streams
if (config.group) {
portBrackets = [];
dataBrackets = [];
hasMatching = false;
buf = input.ports[port].getBuffer(input.scope);
for (i = 0, len = buf.length; i < len; i++) {
ip = buf[i];
if (ip.type === 'openBracket') {
portBrackets.push(ip.data);
continue;
}
if (ip.type === 'closeBracket') {
portBrackets.pop();
if (portBrackets.length) {
continue;
}
if (!hasData) {
continue;
}
hasMatching = true;
continue;
}
hasData = checkPacket(ip, portBrackets);
continue;
}
return hasMatching;
}
if (config.field) {
return input.hasStream(port, function(ip) {
Use first data packet to define what to collate by
if (!input.collatedBy.ready) {
input.collatedBy.field = ip.data[config.field];
input.collatedBy.ready = true;
return true;
}
return ip.data[config.field] === input.collatedBy.field;
});
}
};
ref = config.inPorts;
for (i = 0, len = ref.length; i < len; i++) {
port = ref[i];
if (input.ports[port].isAddressable()) {
attached = input.attached(port);
if (!attached.length) {
return false;
}
withData = attached.filter(function(idx) {
return checkPort([port, idx]);
});
if (config.arrayPolicy['in'] === 'all') {
if (withData.length !== attached.length) {
return false;
}
continue;
}
if (!withData.length) {
return false;
}
continue;
}
if (!checkPort(port)) {
return false;
}
}
return true;
};
CustomError
returns an Error
object carrying additional properties.
exports.CustomError = function(message, options) {
var err;
err = new Error(message);
return exports.CustomizeError(err, options);
};
CustomizeError
sets additional options for an Error
object.
exports.CustomizeError = function(err, options) {
var key, val;
for (key in options) {
if (!hasProp.call(options, key)) continue;
val = options[key];
err[key] = val;
}
return err;
};
This page contains documentation generated automatically from NoFlo's Helpers.coffee file.