AsCallback API
NoFlo - Flow-Based Programming for JavaScript
(c) 2017-2018 Flowhub UG
NoFlo may be freely distributed under the MIT license
/* eslint-disable
no-param-reassign,
import/prefer-default-export,
*/
import { Graph } from 'fbp-graph';
import { ComponentLoader } from './ComponentLoader';
import { Network } from './Network';
import IP from './IP';
import * as internalSocket from './InternalSocket';
asCallback embedding API
asCallback is a helper for embedding NoFlo components or graphs in other JavaScript programs.
By using the noflo.asCallback
function, you can turn any
NoFlo component or NoFlo Graph instance into a regular,
Node.js-style JavaScript function.
Each call to that function starts a new NoFlo network where the given input arguments are sent as IP objects to matching inports. Once the network finishes, the IP objects received from the network will be sent to the callback function.
If there was anything sent to an error
outport, this will
be provided as the error argument to the callback.
Option normalization
Here we handle the input valus given to the asCallback
function. This allows passing things like a pre-initialized
NoFlo ComponentLoader, or giving the component loading
baseDir context.
/**
* @typedef {Graph | string} AsCallbackComponent
*/
/**
* @typedef {Object} AsCallbackOptions
* @property {string} [name] - Name for the wrapped network
* @property {ComponentLoader} [loader] - Component loader instance to use, if any
* @property {string} [baseDir] - Project base directory for component loading
* @property {Object} [flowtrace] - Flowtrace instance to use for tracing this network run
* @property {NetworkCallback} [networkCallback] - Access to Network instance
* @property {boolean} [raw] - Whether the callback should operate on raw noflo.IP objects
*/
/**
* @param {AsCallbackOptions} options
* @param {AsCallbackComponent} component
* @returns {AsCallbackOptions}
*/
function normalizeOptions(options, component) {
if (!options) { options = {}; }
if (!options.name && typeof component === 'string') {
options.name = component;
}
if (options.loader) {
options.baseDir = options.loader.baseDir;
}
if (!options.baseDir && process && process.cwd) {
options.baseDir = process.cwd();
}
if (options.baseDir && !options.loader) {
options.loader = new ComponentLoader(options.baseDir);
}
if (!options.raw) {
options.raw = false;
}
return options;
}
Network preparation
Each invocation of the asCallback-wrapped NoFlo graph creates a new network. This way we can isolate multiple executions of the function in their own contexts.
/**
* @param {AsCallbackComponent} component
* @param {AsCallbackOptions} options
* @returns {Promise<Network>}
*/
function prepareNetwork(component, options) {
If we were given a graph instance, then just create a network
if (typeof component === 'object') {
This is a graph object
const network = new Network(component, {
...options,
componentLoader: options.loader,
});
Wire the network up
return network.connect();
}
if (!options.loader) {
return Promise.reject(new Error('No component loader provided'));
}
Start by loading the component
return options.loader.load(component, {})
.then((instance) => {
Prepare a graph wrapping the component
const graph = new Graph(options.name);
const nodeName = options.name || 'AsCallback';
graph.addNode(nodeName, component);
Expose ports
const inPorts = instance.inPorts.ports;
const outPorts = instance.outPorts.ports;
Object.keys(inPorts).forEach((port) => {
graph.addInport(port, nodeName, port);
});
Object.keys(outPorts).forEach((port) => {
graph.addOutport(port, nodeName, port);
});
Prepare network
const network = new Network(graph, {
...options,
componentLoader: options.loader,
});
Wire the network up and start execution
return network.connect();
});
}
Network execution
Once network is ready, we connect to all of its exported in and outports and start the network.
Input data is sent to the inports, and we collect IP packets received on the outports.
Once the network finishes, we send the resulting IP objects to the callback.
/**
* @param {Network} network
* @param {any} inputs
* @returns {Promise<any>}
*/
function runNetwork(network, inputs) {
return new Promise((resolve, reject) => {
Prepare inports
/** @type {Object<string, import("./InternalSocket").InternalSocket>} */
let inSockets = {};
Subscribe outports
/** @type {Array<Object<string, IP>>} */
const received = [];
const outPorts = Object.keys(network.graph.outports);
/** @type {Object<string, import("./InternalSocket").InternalSocket>} */
let outSockets = {};
outPorts.forEach((outport) => {
const portDef = network.graph.outports[outport];
const process = network.getNode(portDef.process);
if (!process || !process.component) {
return;
}
outSockets[outport] = internalSocket.createSocket();
network.subscribeSocket(outSockets[outport]);
process.component.outPorts[portDef.port].attach(outSockets[outport]);
outSockets[outport].from = {
process,
port: portDef.port,
};
outSockets[outport].on('ip', (ip) => {
/** @type Object<string, IP> */
const res = {};
res[outport] = ip;
received.push(res);
});
});
Subscribe to process errors
let onEnd = null;
let onError = null;
onError = (err) => {
reject(err.error);
network.removeListener('end', onEnd);
};
network.once('process-error', onError);
Subscribe network finish
onEnd = () => {
Clear listeners
Object.keys(outSockets).forEach((port) => {
const socket = outSockets[port];
socket.from.process.component.outPorts[socket.from.port].detach(socket);
});
outSockets = {};
inSockets = {};
resolve(received);
network.removeListener('process-error', onError);
};
network.once('end', onEnd);
Start network
network.start()
.then(() => {
Send inputs
for (let i = 0; i < inputs.length; i += 1) {
const inputMap = inputs[i];
const keys = Object.keys(inputMap);
for (let j = 0; j < keys.length; j += 1) {
const port = keys[j];
const value = inputMap[port];
if (!inSockets[port]) {
const portDef = network.graph.inports[port];
if (!portDef) {
reject(new Error(`Port ${port} not available in the graph`));
return;
}
const process = network.getNode(portDef.process);
if (!process || !process.component) {
reject(new Error(`Process ${portDef.process} for port ${port} not available in the graph`));
return;
}
inSockets[port] = internalSocket.createSocket();
network.subscribeSocket(inSockets[port]);
inSockets[port].to = {
process,
port,
};
process.component.inPorts[portDef.port].attach(inSockets[port]);
}
try {
if (IP.isIP(value)) {
inSockets[port].post(value);
} else {
inSockets[port].post(new IP('data', value));
}
} catch (e) {
reject(e);
network.removeListener('process-error', onError);
network.removeListener('end', onEnd);
return;
}
}
}
}, reject);
});
}
function getType(inputs, network) {
Scalar values are always simple inputs
if (typeof inputs !== 'object' || !inputs) { return 'simple'; }
if (Array.isArray(inputs)) {
const maps = inputs.filter((entry) => getType(entry, network) === 'map');
If each member if the array is an input map, this is a sequence
if (maps.length === inputs.length) { return 'sequence'; }
Otherwise arrays must be simple inputs
return 'simple';
}
Empty objects can’t be maps
const keys = Object.keys(inputs);
if (!keys.length) { return 'simple'; }
for (let i = 0; i < keys.length; i += 1) {
const key = keys[i];
if (!network.graph.inports[key]) { return 'simple'; }
}
return 'map';
}
function prepareInputMap(inputs, inputType, network) {
Sequence we can use as-is
if (inputType === 'sequence') { return inputs; }
We can turn a map to a sequence by wrapping it in an array
if (inputType === 'map') { return [inputs]; }
Simple inputs need to be converted to a sequence
let inPort = Object.keys(network.graph.inports)[0];
if (!inPort) {
return {};
}
If we have a port named “IN”, send to that
if (network.graph.inports.in) { inPort = 'in'; }
const map = {};
map[inPort] = inputs;
return [map];
}
function normalizeOutput(values, options) {
if (options.raw) { return values; }
const result = [];
let previous = null;
let current = result;
values.forEach((packet) => {
if (packet.type === 'openBracket') {
previous = current;
current = [];
previous.push(current);
}
if (packet.type === 'data') {
current.push(packet.data);
}
if (packet.type === 'closeBracket') {
current = previous;
}
});
if (result.length === 1) {
return result[0];
}
return result;
}
function sendOutputMap(outputs, resultType, options) {
First check if the output sequence contains errors
const errors = outputs.filter((map) => map.error != null).map((map) => map.error);
if (errors.length) {
return Promise.reject(normalizeOutput(errors, options));
}
if (resultType === 'sequence') {
return Promise.resolve(outputs.map((map) => {
const res = {};
Object.keys(map).forEach((key) => {
const val = map[key];
if (options.raw) {
res[key] = val;
return;
}
res[key] = normalizeOutput([val], options);
});
return res;
}));
}
Flatten the sequence
const mappedOutputs = {};
outputs.forEach((map) => {
Object.keys(map).forEach((key) => {
const val = map[key];
if (!mappedOutputs[key]) { mappedOutputs[key] = []; }
mappedOutputs[key].push(val);
});
});
const outputKeys = Object.keys(mappedOutputs);
const withValue = outputKeys.filter((outport) => mappedOutputs[outport].length > 0);
if (withValue.length === 0) {
No output
return Promise.resolve(null);
}
if ((withValue.length === 1) && (resultType === 'simple')) {
Single outport
return Promise.resolve(normalizeOutput(mappedOutputs[withValue[0]], options));
}
const result = {};
Object.keys(mappedOutputs).forEach((port) => {
const packets = mappedOutputs[port];
result[port] = normalizeOutput(packets, options);
});
return Promise.resolve(result);
}
/**
* @callback ResultCallback
* @param {Error | null} err
* @param {any} [output]
* @returns {void}
*/
/**
* @callback NetworkAsCallback
* @param {any} input
* @param {ResultCallback} callback
* @returns void
*/
/**
* @callback NetworkAsPromise
* @param {any} input
* @returns {Promise<any>}
*/
/**
* @callback NetworkCallback
* @param {Network} network
* @returns void
*/
/**
* @param {Graph | string} component - Graph or component to load
* @param {Object} options
* @param {string} [options.name] - Name for the wrapped network
* @param {ComponentLoader} [options.loader] - Component loader instance to use, if any
* @param {string} [options.baseDir] - Project base directory for component loading
* @param {Object} [options.flowtrace] - Flowtrace instance to use for tracing this network run
* @param {NetworkCallback} [options.networkCallback] - Access to Network instance
* @param {boolean} [options.raw] - Whether the callback should operate on raw noflo.IP objects
* @returns {NetworkAsPromise}
*/
export function asPromise(component, options) {
if (!component) {
throw new Error('No component or graph provided');
}
options = normalizeOptions(options, component);
return (inputs) => prepareNetwork(component, options)
.then((network) => {
if (options.networkCallback) {
options.networkCallback(network);
}
const resultType = getType(inputs, network);
const inputMap = prepareInputMap(inputs, resultType, network);
return runNetwork(network, inputMap)
.then((outputMap) => sendOutputMap(outputMap, resultType, options));
});
}
/**
* @param {AsCallbackComponent} component - Graph or component to load
* @param {AsCallbackOptions} options
* @returns {NetworkAsCallback}
*/
export function asCallback(component, options) {
const promised = asPromise(component, options);
return (inputs, callback) => {
promised(inputs)
.then((output) => {
callback(null, output);
}, callback);
};
}
This page contains documentation generated automatically from NoFlo's AsCallback.js file.