API Documentation

On GitHub

InPort API

NoFlo - Flow-Based Programming for JavaScript
(c) 2014-2017 Flowhub UG
NoFlo may be freely distributed under the MIT license
import BasePort from './BasePort';

NoFlo inport

Input Port (inport) implementation for NoFlo components. These ports are the way a component receives Information Packets.

/**
 * @typedef InPortOptions
 * @property {boolean} [control]
 * @property {boolean} [triggering]
 */
/**
 * @typedef {import("./BasePort").BaseOptions & InPortOptions} PortOptions
 */

export default class InPort extends BasePort {
  /**
   * @param {PortOptions} [options]
   */
  constructor(options = {}) {
    const opts = options;
    if (opts.control == null) { opts.control = false; }
    if (opts.scoped == null) { opts.scoped = true; }
    if (opts.triggering == null) { opts.triggering = true; }

    super(opts);

    this.nodeInstance = null;

    this.prepareBuffer();
  }

Assign a delegate for retrieving data should this inPort

  attachSocket(socket, localId = null) {

have a default value.

    if (this.hasDefault()) {
      socket.setDataDelegate(() => this.options.default);
    }

    socket.on('connect', () => this.handleSocketEvent('connect', socket, localId));
    socket.on('begingroup', (group) => this.handleSocketEvent('begingroup', group, localId));
    socket.on('data', (data) => {
      this.validateData(data);
      return this.handleSocketEvent('data', data, localId);
    });
    socket.on('endgroup', (group) => this.handleSocketEvent('endgroup', group, localId));
    socket.on('disconnect', () => this.handleSocketEvent('disconnect', socket, localId));
    socket.on('ip', (ip) => this.handleIP(ip, localId));
  }

  handleIP(packet, index) {
    if (this.options.control && (packet.type !== 'data')) { return; }
    const ip = packet;
    ip.owner = this.nodeInstance;
    if (this.isAddressable()) { ip.index = index; }
    if (ip.datatype === 'all') {

Stamp non-specific IP objects with port datatype

      ip.datatype = this.getDataType();
    }
    if (this.getSchema() && !ip.schema) {

Stamp non-specific IP objects with port schema

      ip.schema = this.getSchema();
    }

    const buf = this.prepareBufferForIP(ip);
    buf.push(ip);
    if (this.options.control && (buf.length > 1)) { buf.shift(); }

    this.emit('ip', ip, index);
  }

  handleSocketEvent(event, payload, id) {

Emit port event

    if (this.isAddressable()) {
      return this.emit(event, payload, id);
    }
    return this.emit(event, payload);
  }

  hasDefault() {
    return this.options.default !== undefined;
  }

  prepareBuffer() {
    if (this.isAddressable()) {
      if (this.options.scoped) {
        /** @type {Object<string,Object<number,Array<import("./IP").default>>>} */
        this.indexedScopedBuffer = {};
      }
      /** @type {Object<number,Array<import("./IP").default>>} */
      this.indexedIipBuffer = {};
      /** @type {Object<number,Array<import("./IP").default>>} */
      this.indexedBuffer = {};
      return;
    }
    if (this.options.scoped) {
      /** @type {Object<string,Array<import("./IP").default>>} */
      this.scopedBuffer = {};
    }
    /** @type {Array<import("./IP").default>} */
    this.iipBuffer = [];
    /** @type {Array<import("./IP").default>} */
    this.buffer = [];
  }

  /**
   * @param {import("./IP").default} ip
   * @returns {Array<import("./IP").default>}
   */
  prepareBufferForIP(ip) {
    if (this.isAddressable()) {
      if ((ip.scope != null) && this.options.scoped) {
        if (!(ip.scope in this.indexedScopedBuffer)) { this.indexedScopedBuffer[ip.scope] = []; }
        if (!(ip.index in this.indexedScopedBuffer[ip.scope])) {
          this.indexedScopedBuffer[ip.scope][ip.index] = [];
        }
        return this.indexedScopedBuffer[ip.scope][ip.index];
      }
      if (ip.initial) {
        if (!(ip.index in this.indexedIipBuffer)) { this.indexedIipBuffer[ip.index] = []; }
        return this.indexedIipBuffer[ip.index];
      }
      if (!(ip.index in this.indexedBuffer)) { this.indexedBuffer[ip.index] = []; }
      return this.indexedBuffer[ip.index];
    }
    if ((ip.scope != null) && this.options.scoped) {
      if (!(ip.scope in this.scopedBuffer)) {
        this.scopedBuffer[ip.scope] = [];
      }
      return this.scopedBuffer[ip.scope];
    }
    if (ip.initial) {
      return this.iipBuffer;
    }
    return this.buffer;
  }

  validateData(data) {
    if (!this.options.values) { return; }
    if (this.options.values.indexOf(data) === -1) {
      throw new Error(`Invalid data='${data}' received, not in [${this.options.values}]`);
    }
  }

  /**
   * @param {string|null} scope
   * @param {number|null} index
   * @param {boolean} [initial]
   * @returns {Array<import("./IP").default>}
   */
  getBuffer(scope, index, initial = false) {
    if (this.isAddressable()) {
      if ((scope != null) && this.options.scoped) {
        if (!(scope in this.indexedScopedBuffer)) { return undefined; }
        if (!(index in this.indexedScopedBuffer[scope])) { return undefined; }
        return this.indexedScopedBuffer[scope][index];
      }
      if (initial) {
        if (!(index in this.indexedIipBuffer)) { return undefined; }
        return this.indexedIipBuffer[index];
      }
      if (!(index in this.indexedBuffer)) { return undefined; }
      return this.indexedBuffer[index];
    }
    if ((scope != null) && this.options.scoped) {
      if (!(scope in this.scopedBuffer)) { return undefined; }
      return this.scopedBuffer[scope];
    }
    if (initial) {
      return this.iipBuffer;
    }
    return this.buffer;
  }

  /**
   * @param {string|null} scope
   * @param {number|null} index
   * @param {boolean} [initial]
   * @returns {import("./IP").default|void}
   */
  getFromBuffer(scope, index, initial = false) {
    const buf = this.getBuffer(scope, index, initial);
    if (!(buf != null ? buf.length : undefined)) {
      return undefined;
    }
    if (this.options.control) {
      return buf[buf.length - 1];
    }
    return buf.shift();
  }

Fetches a packet from the port

  get(scope, index) {
    const res = this.getFromBuffer(scope, index);
    if (res !== undefined) { return res; }

Try to find an IIP instead

    return this.getFromBuffer(null, index, true);
  }

  hasIPinBuffer(scope, index, validate, initial = false) {
    const buf = this.getBuffer(scope, index, initial);
    if (!(buf != null ? buf.length : undefined)) { return false; }
    for (let i = 0; i < buf.length; i += 1) {
      if (validate(buf[i])) { return true; }
    }
    return false;
  }

  hasIIP(index, validate) {
    return this.hasIPinBuffer(null, index, validate, true);
  }

Returns true if port contains packet(s) matching the validator

  has(scope, index, validate) {
    let valid = validate;
    let idx = index;
    if (!this.isAddressable()) {
      valid = idx;
      idx = null;
    }
    if (this.hasIPinBuffer(scope, idx, valid)) { return true; }
    if (this.hasIIP(idx, valid)) { return true; }
    return false;
  }

Returns the number of data packets in an inport

  length(scope, index) {
    const buf = this.getBuffer(scope, index);
    if (!buf) { return 0; }
    return buf.length;
  }

Tells if buffer has packets or not

  ready(scope) {
    return this.length(scope) > 0;
  }

Clears inport buffers

  clear() {
    return this.prepareBuffer();
  }
}

This page contains documentation generated automatically from NoFlo's InPort.js file.