API Documentation

On GitHub

ProcessOutput API

NoFlo - Flow-Based Programming for JavaScript
(c) 2013-2020 Flowhub UG
(c) 2011-2012 Henri Bergius, Nemein
NoFlo may be freely distributed under the MIT license
/* eslint-disable no-underscore-dangle */
import debug from 'debug';
import IP from './IP';

const debugComponent = debug('noflo:component');

Checks if a value is an Error

/**
 * @param {any} err
 * @returns {boolean}
 */
function isError(err) {
  return err instanceof Error
    || (Array.isArray(err) && (err.length > 0) && err[0] instanceof Error);
}

export default class ProcessOutput {
  /**
   * @param {import("./Ports").OutPorts} ports - Component outports
   * @param {import("./ProcessContext").default} context - Processing context
   */
  constructor(ports, context) {
    this.ports = ports;
    this.context = context;
    this.nodeInstance = this.context.nodeInstance;
    this.ip = this.context.ip;
    this.result = this.context.result;
    this.scope = this.context.scope;
  }

Sends an error object

  /**
   * @param {Error|Error[]} err
   * @returns {void}
   */
  error(err) {
    const errs = Array.isArray(err) ? err : [err];
    if (this.ports.ports.error
      && (this.ports.ports.error.isAttached() || !this.ports.ports.error.isRequired())) {
      if (errs.length > 1) { this.sendIP('error', new IP('openBracket')); }
      errs.forEach((e) => { this.sendIP('error', e); });
      if (errs.length > 1) { this.sendIP('error', new IP('closeBracket')); }
    } else {
      errs.forEach((e) => { throw e; });
    }
  }

Sends a single IP object to a port

  /**
   * @param {string} port - Port to send to
   * @param {IP|any} packet - IP or data to send
   * @returns {void}
   */
  sendIP(port, packet) {
    const ip = IP.isIP(packet) ? packet : new IP('data', packet);
    if ((this.scope !== null) && (ip.scope === null)) { ip.scope = this.scope; }

    if (!this.nodeInstance.outPorts.ports[port]) {
      throw new Error(`Node ${this.nodeInstance.nodeId} does not have outport ${port}`);
    }

eslint-disable-next-line max-len

    const portImpl = /** @type {import("./OutPort").default} */ (this.nodeInstance.outPorts.ports[port]);

    if (portImpl.isAddressable() && (ip.index === null)) {
      throw new Error(`Sending packets to addressable port ${this.nodeInstance.nodeId} ${port} requires specifying index`);
    }

    if (this.nodeInstance.isOrdered()) {
      this.nodeInstance.addToResult(this.result, port, ip);
      return;
    }
    if (!portImpl.options.scoped) {
      ip.scope = null;
    }
    portImpl.sendIP(ip);
  }

Sends packets for each port as a key in the map or sends Error or a list of Errors if passed such

  /**
   * @param {Error|Array<Error>|Object<string, any>} outputMap
   */
  send(outputMap) {
    if (isError(outputMap)) {
      const errors = /** @type {Error|Array<Error>} */ (outputMap);
      this.error(errors);
      return;
    }

    /** @type {Array<string>} */
    const componentPorts = [];
    let mapIsInPorts = false;
    Object.keys(this.ports.ports).forEach((port) => {
      if ((port !== 'error') && (port !== 'ports') && (port !== '_callbacks')) { componentPorts.push(port); }
      if (!mapIsInPorts && (outputMap != null) && (typeof outputMap === 'object') && (Object.keys(outputMap).indexOf(port) !== -1)) {
        mapIsInPorts = true;
      }
    });

    if ((componentPorts.length === 1) && !mapIsInPorts) {
      this.sendIP(componentPorts[0], outputMap);
      return;
    }

    if ((componentPorts.length > 1) && !mapIsInPorts) {
      throw new Error('Port must be specified for sending output');
    }

    Object.keys(outputMap).forEach((port) => {
      const packet = outputMap[port];
      this.sendIP(port, packet);
    });
  }

Sends the argument via send() and marks activation as done()

  /**
   * @param {Error|Array<Error>|Object<string, any>} outputMap
   */
  sendDone(outputMap) {
    this.send(outputMap);
    this.done();
  }

Makes a map-style component pass a result value to out keeping all IP metadata received from in, or modifying it if options is provided

  /**
   * @param {any} data
   * @param {Object<string, any>} [options]
   */
  pass(data, options = {}) {
    if (!('out' in this.ports)) {
      throw new Error('output.pass() requires port "out" to be present');
    }
    const that = this;
    Object.keys(options).forEach((key) => {
      const val = options[key];
      that.ip[key] = val;
    });
    this.ip.data = data;
    this.sendIP('out', this.ip);
    this.done();
  }

Finishes process activation gracefully

  /**
   * @param {Error|Array<Error>} [error]
   */
  done(error) {
    this.result.__resolved = true;
    this.nodeInstance.activate(this.context);
    if (error) { this.error(error); }

    const isLast = () => {

We only care about real output sets with processing data

      const resultsOnly = this.nodeInstance.outputQ.filter((q) => {
        if (!q.__resolved) { return true; }
        if ((Object.keys(q).length === 2) && q.__bracketClosingAfter) {
          return false;
        }
        return true;
      });
      const pos = resultsOnly.indexOf(this.result);
      const len = resultsOnly.length;
      const {
        load,
      } = this.nodeInstance;
      if (pos === (len - 1)) { return true; }
      if ((pos === -1) && (load === (len + 1))) { return true; }
      if ((len <= 1) && (load === 1)) { return true; }
      return false;
    };
    if (this.nodeInstance.isOrdered() && isLast()) {

We’re doing bracket forwarding. See if there are dangling closeBrackets in buffer since we’re the last running process function.

      Object.keys(this.nodeInstance.bracketContext.in).forEach((port) => {
        const contexts = this.nodeInstance.bracketContext.in[port];
        if (!contexts[this.scope]) { return; }
        const nodeContext = contexts[this.scope];
        if (!nodeContext.length) { return; }
        const context = nodeContext[nodeContext.length - 1];

eslint-disable-next-line max-len

        const inPorts = /** @type {import("./InPort").default} */ (this.nodeInstance.inPorts.ports[context.source]);
        const buf = inPorts.getBuffer(context.ip.scope, context.ip.index);
        while (buf.length > 0 && buf[0].type === 'closeBracket') {
          const ip = inPorts.get(context.ip.scope, context.ip.index);
          const ctx = nodeContext.pop();
          ctx.closeIp = ip;
          if (!this.result.__bracketClosingAfter) { this.result.__bracketClosingAfter = []; }
          this.result.__bracketClosingAfter.push(ctx);
        }
      });
    }

    debugComponent(`${this.nodeInstance.nodeId} finished processing ${this.nodeInstance.load}`);

    this.nodeInstance.deactivate(this.context);
  }
}

This page contains documentation generated automatically from NoFlo's ProcessOutput.js file.