API Documentation

On GitHub

Helpers API

NoFlo - Flow-Based Programming for JavaScript
(c) 2014-2017 Flowhub UG
NoFlo may be freely distributed under the MIT license
var IP, InternalSocket, checkDeprecation, checkWirePatternPreconditions, checkWirePatternPreconditionsInput, checkWirePatternPreconditionsParams, debug, getGroupContext, getInputData, getOutputProxy, handleInputCollation, platform, populateParams, processApiWirePattern, reorderBuffer, setupBracketForwarding, setupControlPorts, setupErrorHandler, setupSendDefaults, utils,
  hasProp = {}.hasOwnProperty;

InternalSocket = require('./InternalSocket');

IP = require('./IP');

platform = require('./Platform');

utils = require('./Utils');

debug = require('debug')('noflo:helpers');

NoFlo WirePattern helper

Note: WirePattern is no longer the recommended way to build NoFlo components. Please use Process API instead.

WirePattern makes your component collect data from several inports and activates a handler proc only when a tuple from all of these ports is complete. The signature of handler function is:

proc = (combinedInputData, inputGroups, outputPorts, asyncCallback) ->

With config.forwardGroups = true it would forward group IPs from inputs to the output sending them along with the data. This option also accepts string or array values, if you want to forward groups from specific port(s) only. By default group forwarding is false.

substream cannot be interrupted by other packets, which is important when doing asynchronous processing. In fact, sendStreams is enabled by default on all outports when config.async is true.

WirePattern supports async proc handlers. Set config.async = true and make sure that proc accepts callback as 4th parameter and calls it when async operation completes or fails.

exports.WirePattern = function(component, config, proc) {
  var inPorts, outPorts, ref;

In ports

  inPorts = 'in' in config ? config.in : 'in';
  if (!utils.isArray(inPorts)) {
    inPorts = [inPorts];
  }

Out ports

  outPorts = 'out' in config ? config.out : 'out';
  if (!utils.isArray(outPorts)) {
    outPorts = [outPorts];
  }
  if (!('error' in config)) {

Error port

    config.error = 'error';
  }
  if (!('async' in config)) {

For async process

    config.async = false;
  }
  if (!('ordered' in config)) {

Keep correct output order for async mode

    config.ordered = true;
  }
  if (!('group' in config)) {

Group requests by group ID

    config.group = false;
  }
  if (!('field' in config)) {

Group requests by object field

    config.field = null;
  }
  if (!('forwardGroups' in config)) {

Forward group events from specific inputs to the output:

  • false: don’t forward anything
  • true: forward unique groups of all inputs
  • string: forward groups of a specific port only
  • array: forward unique groups of inports in the list
    config.forwardGroups = false;
  }
  if (config.forwardGroups) {
    if (typeof config.forwardGroups === 'string') {

Collect groups from one and only port?

      config.forwardGroups = [config.forwardGroups];
    }
    if (typeof config.forwardGroups === 'boolean') {

Forward groups from each port?

      config.forwardGroups = inPorts;
    }
  }
  if (!('receiveStreams' in config)) {

Receive streams feature

    config.receiveStreams = false;
  }
  if (config.receiveStreams) {
    throw new Error('WirePattern receiveStreams is deprecated');
  }
  if (!('sendStreams' in config)) {

if typeof config.receiveStreams is ‘string’ config.receiveStreams = [ config.receiveStreams ] Send streams feature

    config.sendStreams = false;
  }
  if (config.sendStreams) {
    throw new Error('WirePattern sendStreams is deprecated');
  }
  if (config.async) {

if typeof config.sendStreams is ‘string’ config.sendStreams = [ config.sendStreams ]

    config.sendStreams = outPorts;
  }
  if (!('params' in config)) {

Parameter ports

    config.params = [];
  }
  if (typeof config.params === 'string') {
    config.params = [config.params];
  }
  if (!('name' in config)) {

Node name

    config.name = '';
  }
  if (!('dropInput' in config)) {

Drop premature input before all params are received

    config.dropInput = false;
  }

Firing policy for addressable ports

  if (!('arrayPolicy' in config)) {
    config.arrayPolicy = {
      in: 'any',
      params: 'all'
    };
  }
  config.inPorts = inPorts;
  config.outPorts = outPorts;

Warn user of deprecated features

  checkDeprecation(config, proc);

Allow users to selectively fall back to legacy WirePattern implementation

  if (config.legacy || (typeof process !== "undefined" && process !== null ? (ref = process.env) != null ? ref.NOFLO_WIREPATTERN_LEGACY : void 0 : void 0)) {
    platform.deprecated('noflo.helpers.WirePattern legacy mode is deprecated');
  }
  return processApiWirePattern(component, config, proc);
};

Takes WirePattern configuration of a component and sets up Process API to handle it.

processApiWirePattern = function(component, config, func) {

Make param ports control ports

  setupControlPorts(component, config);

Set up sendDefaults function

  setupSendDefaults(component);

Set up bracket forwarding rules

  setupBracketForwarding(component, config);
  component.ordered = config.ordered;

Create the processing function

  return component.process(function(input, output, context) {
    var data, errorHandler, groups, outProxy, postpone, resume;

Abort unless WirePattern-style preconditions don’t match

    if (!checkWirePatternPreconditions(config, input, output)) {
      return;
    }

Populate component.params from control ports

    component.params = populateParams(config, input);

Read input data

    data = getInputData(config, input);

Read bracket context of first inport

    groups = getGroupContext(component, config.inPorts[0], input);

Produce proxy object wrapping output in legacy-style port API

    outProxy = getOutputProxy(config.outPorts, output);
    debug("WirePattern Process API call with", data, groups, component.params, context.scope);
    postpone = function() {
      throw new Error('noflo.helpers.WirePattern postpone is deprecated');
    };
    resume = function() {
      throw new Error('noflo.helpers.WirePattern resume is deprecated');
    };

Async WirePattern will call the output.done callback itself

    errorHandler = setupErrorHandler(component, config, output);
    return func.call(component, data, groups, outProxy, function(err) {
      errorHandler();
      return output.done(err);
    }, postpone, resume, input.scope);
  });
};

Provide deprecation warnings on certain more esoteric WirePattern features

checkDeprecation = function(config, func) {

First check the conditions that force us to fall back on legacy WirePattern

  if (config.group) {
    platform.deprecated('noflo.helpers.WirePattern group option is deprecated. Please port to Process API');
  }
  if (config.field) {
    platform.deprecated('noflo.helpers.WirePattern field option is deprecated. Please port to Process API');
  }

Then add deprecation warnings for other unwanted behaviors

  if (func.length > 4) {
    platform.deprecated('noflo.helpers.WirePattern postpone and resume are deprecated. Please port to Process API');
  }
  if (!config.async) {
    throw new Error('noflo.helpers.WirePattern synchronous is deprecated. Please use async: true');
  }
  if (func.length < 4) {
    throw new Error('noflo.helpers.WirePattern callback doesn\'t use callback argument');
  }
  if (config.error !== 'error') {
    platform.deprecated('noflo.helpers.WirePattern custom error port name is deprecated. Please switch to "error" or port to WirePattern');
  }
};

Updates component port definitions to control prots for WirePattern -style params array

setupControlPorts = function(component, config) {
  var i, len, param, ref, results;
  ref = config.params;
  results = [];
  for (i = 0, len = ref.length; i < len; i++) {
    param = ref[i];
    results.push(component.inPorts[param].options.control = true);
  }
  return results;
};

Sets up Process API bracket forwarding rules for WirePattern configuration

setupBracketForwarding = function(component, config) {
  var i, inPort, inPorts, j, len, len1, outPort, ref;

Start with empty bracket forwarding config

  component.forwardBrackets = {};
  if (!config.forwardGroups) {
    return;
  }

By default we forward from all inports

  inPorts = config.inPorts;
  if (utils.isArray(config.forwardGroups)) {

Selective forwarding enabled

    inPorts = config.forwardGroups;
  }
  for (i = 0, len = inPorts.length; i < len; i++) {
    inPort = inPorts[i];
    component.forwardBrackets[inPort] = [];
    ref = config.outPorts;

Forward to all declared outports

    for (j = 0, len1 = ref.length; j < len1; j++) {
      outPort = ref[j];
      component.forwardBrackets[inPort].push(outPort);
    }

If component has an error outport, forward there too

    if (component.outPorts.error) {
      component.forwardBrackets[inPort].push('error');
    }
  }
};

setupErrorHandler = function(component, config, output) {
  var errorHandler, errors, failHandler, sendErrors;
  errors = [];
  errorHandler = function(e, groups = []) {
    platform.deprecated('noflo.helpers.WirePattern error method is deprecated. Please send error to callback instead');
    errors.push({
      err: e,
      groups: groups
    });
    return component.hasErrors = true;
  };
  failHandler = function(e = null, groups = []) {
    platform.deprecated('noflo.helpers.WirePattern fail method is deprecated. Please send error to callback instead');
    if (e) {
      errorHandler(e, groups);
    }
    sendErrors();
    return output.done();
  };
  sendErrors = function() {
    if (!errors.length) {
      return;
    }
    if (config.name) {
      output.sendIP('error', new IP('openBracket', config.name));
    }
    errors.forEach(function(e) {
      var grp, i, j, len, len1, ref, ref1, results;
      ref = e.groups;
      for (i = 0, len = ref.length; i < len; i++) {
        grp = ref[i];
        output.sendIP('error', new IP('openBracket', grp));
      }
      output.sendIP('error', new IP('data', e.err));
      ref1 = e.groups;
      results = [];
      for (j = 0, len1 = ref1.length; j < len1; j++) {
        grp = ref1[j];
        results.push(output.sendIP('error', new IP('closeBracket', grp)));
      }
      return results;
    });
    if (config.name) {
      output.sendIP('error', new IP('closeBracket', config.name));
    }
    component.hasErrors = false;
    return errors = [];
  };
  component.hasErrors = false;
  component.error = errorHandler;
  component.fail = failHandler;
  return sendErrors;
};

setupSendDefaults = function(component) {
  var portsWithDefaults;
  portsWithDefaults = Object.keys(component.inPorts.ports).filter(function(p) {
    if (!component.inPorts[p].options.control) {
      return false;
    }
    if (!component.inPorts[p].hasDefault()) {
      return false;
    }
    return true;
  });
  return component.sendDefaults = function() {
    platform.deprecated('noflo.helpers.WirePattern sendDefaults method is deprecated. Please start with a Network');
    return portsWithDefaults.forEach(function(port) {
      var tempSocket;
      tempSocket = InternalSocket.createSocket();
      component.inPorts[port].attach(tempSocket);
      tempSocket.send();
      tempSocket.disconnect();
      return component.inPorts[port].detach(tempSocket);
    });
  };
};

populateParams = function(config, input) {
  var i, idx, j, len, len1, paramPort, params, ref, ref1;
  if (!config.params.length) {
    return {};
  }
  params = {};
  ref = config.params;
  for (i = 0, len = ref.length; i < len; i++) {
    paramPort = ref[i];
    if (input.ports[paramPort].isAddressable()) {
      params[paramPort] = {};
      ref1 = input.attached(paramPort);
      for (j = 0, len1 = ref1.length; j < len1; j++) {
        idx = ref1[j];
        if (!input.hasData([paramPort, idx])) {
          continue;
        }
        params[paramPort][idx] = input.getData([paramPort, idx]);
      }
      continue;
    }
    params[paramPort] = input.getData(paramPort);
  }
  return params;
};

reorderBuffer = function(buffer, matcher) {
  var brackets, i, idx, ip, j, len, len1, results, substream, substreamBrackets, substreamIdx;

Move matching IP packet to be first in buffer

Note: the collation mechanism as shown below is not a very nice way to deal with inputs as it messes with input buffer order. Much better to handle collation in a specialized component or to separate flows by scope.

The trick here is to order the input in a way that still allows bracket forwarding to work. So if we want to first process packet B in stream like:

< 1
< 2
A
> 2
< 3
B
> 3
> 1

We need to change the stream to be like:

< 1
< 3
B
> 3
< 2
A
> 2
> 1
  substream = null;
  brackets = [];
  substreamBrackets = [];
  for (idx = i = 0, len = buffer.length; i < len; idx = ++i) {
    ip = buffer[idx];
    if (ip.type === 'openBracket') {
      brackets.push(ip.data);
      substreamBrackets.push(ip);
      continue;
    }
    if (ip.type === 'closeBracket') {
      brackets.pop();
      if (substream) {
        substream.push(ip);
      }
      if (substreamBrackets.length) {
        substreamBrackets.pop();
      }
      if (substream && !substreamBrackets.length) {
        break;
      }
      continue;
    }
    if (!matcher(ip, brackets)) {

Reset substream bracket tracking when we hit data

      substreamBrackets = [];
      continue;
    }

Match found, start tracking the actual substream

    substream = substreamBrackets.slice(0);
    substream.push(ip);
  }

See where in the buffer the matching substream begins

  substreamIdx = buffer.indexOf(substream[0]);

No need to reorder if matching packet is already first

  if (substreamIdx === 0) {
    return;
  }

Remove substream from its natural position

  buffer.splice(substreamIdx, substream.length);

Place the substream in the beginning

  substream.reverse();
  results = [];
  for (j = 0, len1 = substream.length; j < len1; j++) {
    ip = substream[j];
    results.push(buffer.unshift(ip));
  }
  return results;
};

handleInputCollation = function(data, config, input, port, idx) {
  var buf;
  if (!config.group && !config.field) {
    return;
  }
  if (config.group) {
    buf = input.ports[port].getBuffer(input.scope, idx);
    reorderBuffer(buf, function(ip, brackets) {
      var grp, i, len, ref;
      ref = input.collatedBy.brackets;
      for (idx = i = 0, len = ref.length; i < len; idx = ++i) {
        grp = ref[idx];
        if (brackets[idx] !== grp) {
          return false;
        }
      }
      return true;
    });
  }
  if (config.field) {
    data[config.field] = input.collatedBy.field;
    buf = input.ports[port].getBuffer(input.scope, idx);
    return reorderBuffer(buf, function(ip) {
      return ip.data[config.field] === data[config.field];
    });
  }
};

getInputData = function(config, input) {
  var data, i, idx, j, len, len1, port, ref, ref1;
  data = {};
  ref = config.inPorts;
  for (i = 0, len = ref.length; i < len; i++) {
    port = ref[i];
    if (input.ports[port].isAddressable()) {
      data[port] = {};
      ref1 = input.attached(port);
      for (j = 0, len1 = ref1.length; j < len1; j++) {
        idx = ref1[j];
        if (!input.hasData([port, idx])) {
          continue;
        }
        handleInputCollation(data, config, input, port, idx);
        data[port][idx] = input.getData([port, idx]);
      }
      continue;
    }
    if (!input.hasData(port)) {
      continue;
    }
    handleInputCollation(data, config, input, port);
    data[port] = input.getData(port);
  }
  if (config.inPorts.length === 1) {
    return data[config.inPorts[0]];
  }
  return data;
};

getGroupContext = function(component, port, input) {
  var ref, ref1;
  if (((ref = input.result.__bracketContext) != null ? ref[port] : void 0) == null) {
    return [];
  }
  if ((ref1 = input.collatedBy) != null ? ref1.brackets : void 0) {
    return input.collatedBy.brackets;
  }
  return input.result.__bracketContext[port].filter(function(c) {
    return c.source === port;
  }).map(function(c) {
    return c.ip.data;
  });
};

getOutputProxy = function(ports, output) {
  var outProxy;
  outProxy = {};
  ports.forEach(function(port) {
    return outProxy[port] = {
      connect: function() {},
      beginGroup: function(group, idx) {
        var ip;
        ip = new IP('openBracket', group);
        ip.index = idx;
        return output.sendIP(port, ip);
      },
      send: function(data, idx) {
        var ip;
        ip = new IP('data', data);
        ip.index = idx;
        return output.sendIP(port, ip);
      },
      endGroup: function(group, idx) {
        var ip;
        ip = new IP('closeBracket', group);
        ip.index = idx;
        return output.sendIP(port, ip);
      },
      disconnect: function() {}
    };
  });
  if (ports.length === 1) {
    return outProxy[ports[0]];
  }
  return outProxy;
};

checkWirePatternPreconditions = function(config, input, output) {
  var attached, i, idx, inputsOk, j, len, len1, packetsDropped, paramsOk, port, ref;

First check for required params

  paramsOk = checkWirePatternPreconditionsParams(config, input);

Then check actual input ports

  inputsOk = checkWirePatternPreconditionsInput(config, input);

If input port has data but param requirements are not met, and we’re in dropInput mode, read the data and call done

  if (config.dropInput && !paramsOk) {

Drop all received input packets since params are not available

    packetsDropped = false;
    ref = config.inPorts;
    for (i = 0, len = ref.length; i < len; i++) {
      port = ref[i];
      if (input.ports[port].isAddressable()) {
        attached = input.attached(port);
        if (!attached.length) {
          continue;
        }
        for (j = 0, len1 = attached.length; j < len1; j++) {
          idx = attached[j];
          while (input.has([port, idx])) {
            packetsDropped = true;
            input.get([port, idx]).drop();
          }
        }
        continue;
      }
      while (input.has(port)) {
        packetsDropped = true;
        input.get(port).drop();
      }
    }
    if (packetsDropped) {

If we ended up dropping inputs because of missing params, we need to deactivate here

      output.done();
    }
  }

Pass precondition check only if both params and inputs are OK

  return inputsOk && paramsOk;
};

checkWirePatternPreconditionsParams = function(config, input) {
  var attached, i, len, param, ref, withData;
  ref = config.params;
  for (i = 0, len = ref.length; i < len; i++) {
    param = ref[i];
    if (!input.ports[param].isRequired()) {
      continue;
    }
    if (input.ports[param].isAddressable()) {
      attached = input.attached(param);
      if (!attached.length) {
        return false;
      }
      withData = attached.filter(function(idx) {
        return input.hasData([param, idx]);
      });
      if (config.arrayPolicy.params === 'all') {
        if (withData.length !== attached.length) {
          return false;
        }
        continue;
      }
      if (!withData.length) {
        return false;
      }
      continue;
    }
    if (!input.hasData(param)) {
      return false;
    }
  }
  return true;
};

checkWirePatternPreconditionsInput = function(config, input) {
  var attached, bracketsAtPorts, checkBrackets, checkPacket, checkPort, i, len, port, ref, withData;
  if (config.group) {
    bracketsAtPorts = {};
    input.collatedBy = {
      brackets: [],
      ready: false
    };
    checkBrackets = function(left, right) {
      var bracket, i, idx, len;
      for (idx = i = 0, len = left.length; i < len; idx = ++i) {
        bracket = left[idx];
        if (right[idx] !== bracket) {
          return false;
        }
      }
      return true;
    };
    checkPacket = function(ip, brackets) {
      var bracketId, bracketsToCheck;

With data packets we validate bracket matching

      bracketsToCheck = brackets.slice(0);
      if (config.group instanceof RegExp) {

Basic regexp validation for the brackets

        bracketsToCheck = bracketsToCheck.slice(0, 1);
        if (!bracketsToCheck.length) {
          return false;
        }
        if (!config.group.test(bracketsToCheck[0])) {
          return false;
        }
      }
      if (input.collatedBy.ready) {

We already know what brackets we’re looking for, match

        return checkBrackets(input.collatedBy.brackets, bracketsToCheck);
      }
      bracketId = bracketsToCheck.join(':');
      if (!bracketsAtPorts[bracketId]) {
        bracketsAtPorts[bracketId] = [];
      }
      if (bracketsAtPorts[bracketId].indexOf(port) === -1) {

Register that this port had these brackets

        bracketsAtPorts[bracketId].push(port);
      }
      if (config.inPorts.indexOf(port) !== config.inPorts.length - 1) {

To prevent deadlocks we see all bracket sets, and validate if at least one of them matches. This means we return true until the last inport where we actually check.

        return true;
      }
      if (bracketsAtPorts[bracketId].length !== config.inPorts.length) {

Brackets that are not in every port are invalid

        return false;
      }
      if (input.collatedBy.ready) {
        return false;
      }
      input.collatedBy.ready = true;
      input.collatedBy.brackets = bracketsToCheck;
      return true;
    };
  }
  if (config.field) {
    input.collatedBy = {
      field: void 0,
      ready: false
    };
  }
  checkPort = function(port) {
    var buf, dataBrackets, hasData, hasMatching, i, ip, len, portBrackets;
    if (!config.group && !config.field) {

Without collation rules any data packet is OK

      return input.hasData(port);
    }

With collation rules set we need can only work when we have full streams

    if (config.group) {
      portBrackets = [];
      dataBrackets = [];
      hasMatching = false;
      buf = input.ports[port].getBuffer(input.scope);
      for (i = 0, len = buf.length; i < len; i++) {
        ip = buf[i];
        if (ip.type === 'openBracket') {
          portBrackets.push(ip.data);
          continue;
        }
        if (ip.type === 'closeBracket') {
          portBrackets.pop();
          if (portBrackets.length) {
            continue;
          }
          if (!hasData) {
            continue;
          }
          hasMatching = true;
          continue;
        }
        hasData = checkPacket(ip, portBrackets);
        continue;
      }
      return hasMatching;
    }
    if (config.field) {
      return input.hasStream(port, function(ip) {

Use first data packet to define what to collate by

        if (!input.collatedBy.ready) {
          input.collatedBy.field = ip.data[config.field];
          input.collatedBy.ready = true;
          return true;
        }
        return ip.data[config.field] === input.collatedBy.field;
      });
    }
  };
  ref = config.inPorts;
  for (i = 0, len = ref.length; i < len; i++) {
    port = ref[i];
    if (input.ports[port].isAddressable()) {
      attached = input.attached(port);
      if (!attached.length) {
        return false;
      }
      withData = attached.filter(function(idx) {
        return checkPort([port, idx]);
      });
      if (config.arrayPolicy['in'] === 'all') {
        if (withData.length !== attached.length) {
          return false;
        }
        continue;
      }
      if (!withData.length) {
        return false;
      }
      continue;
    }
    if (!checkPort(port)) {
      return false;
    }
  }
  return true;
};

CustomError returns an Error object carrying additional properties.

exports.CustomError = function(message, options) {
  var err;
  err = new Error(message);
  return exports.CustomizeError(err, options);
};

CustomizeError sets additional options for an Error object.

exports.CustomizeError = function(err, options) {
  var key, val;
  for (key in options) {
    if (!hasProp.call(options, key)) continue;
    val = options[key];
    err[key] = val;
  }
  return err;
};

This page contains documentation generated automatically from NoFlo's Helpers.coffee file.