Legacy
Index
- WirePattern vs Process
- Async Components
- WirePattern
- Translation
- Port Events
- Porting
- Advanced Porting
WirePattern vs process
Firing
WirePattern | Process |
---|---|
when all of the inPorts defined in the in WirePattern parameters and the required param ports have data, it is triggered. |
It is fired every time new data comes into any of the ports, you decide whether it matches the preconditions |
State
WirePattern | Process |
---|---|
to keep state, either properties are set on the component, or params are used depending on the situation. | data can be kept in the buffer by using the buffer helper methods and removing it when done. |
Request Isolation
WirePattern | Process |
---|---|
difficult, see https://github.com/noflo/noflo/blob/master/src/lib/Helpers.coffee#L63 | |
https://github.com/noflo/noflo/blob/master/spec/Helpers.coffee#L1460 | request isolation for free with packet scope |
Accessing ports
WirePattern | Process |
---|---|
anywhere that has access to the component, use component.outPorts.portname or component.inPorts.portname . |
inside process api function output.ports.portname , or input.ports.portname , or when accessing input or when sending. |
Asynchronous components
Asynchronous components process data and send output some time later. The outputs are sent in the order that they are processed, which might be a different than the order received.
Previously, we used a dedicated class named AsyncComponent
which would be extended.
Now, process api is asynchronous by default.
WirePattern can be used for async by setting the async
property. WirePattern Async
WirePattern
1) in & out array
exports.getComponent = ->
c = new noflo.Component
# When `in` has an array, it will put them all into an object as the object properties
# When `out` has an array, it will make the out an object and put each output as the objects properties
noflo.helpers.WirePattern c,
in: ['in', 'eh'] # the inPorts to recieve from
out: ['out', 'canada'] # the outPorts to send out of
async: true # whether the component is asynchronous or not
, (data, groups, out, done) ->
out.out.send data.out
out.canada.send data.eh
done()
2) in & out singular
exports.getComponent = ->
c = new noflo.Component
# When `in` only has one string, it is only the data sent to it
# When `out` is only one string, is is only the port to send out of
noflo.helpers.WirePattern c,
in: 'eh'
out: 'canada'
async: true
, (eh, groups, out, done) ->
out.send eh
done()
3) forwardGroups
exports.getComponent = ->
c = new noflo.Component
# `forwardGroups` is similar to using forwardBrackets in process api, it takes groups from each `in` sent to it and sends them to each `out` wrapping them.
# it would forward group IPs from
# inputs to the output sending them along with the data. This option also
# accepts string or array values, if you want to forward groups from specific
# port(s) only. By default group forwarding is `false`.
noflo.helpers.WirePattern c,
in: ['eh', 'moose']
out: 'canada'
async: true
forwardGroups: true
, (data, groups, out, done) ->
out.send eh
done()
4) group
exports.getComponent = ->
c = new noflo.Component
# `group` will ensure that WirePattern will only fire when each `in` has received data wrapped in the same group (ie, `eh` receives the same groups as `moose`)
noflo.helpers.WirePattern c,
in: ['eh', 'moose']
out: 'canada'
async: true
forwardGroups: true
group: true
, (data, groups, out, done) ->
out.send eh
done()
5) ordered
exports.getComponent = ->
c = new noflo.Component
# using `ordered` will make sure the data that is received in the order you defined it in the `in`, in this case, `eh` before `moose`
# With `config.group = true` it checks incoming group IPs and collates
# data with matching group IPs. By default this kind of grouping is `false`.
# Set `config.group` to a RegExp object to correlate inputs only if the
# group matches the expression (e.g. `^req_`). For non-matching groups
# the component will act normally.
noflo.helpers.WirePattern c,
in: ['eh', 'moose']
out: 'canada'
async: true
forwardGroups: true
group: true
ordered: true
, (data, groups, out, done) ->
out.send eh
done()
6) params
exports.getComponent = ->
c = new noflo.Component
# using `params` will keep whatever the last data that was sent to it under `c.params.name`. For example here, `c.params.igloo`. Important to note it will not wait for params before firing.
noflo.helpers.WirePattern c,
in: ['eh', 'moose']
params: ['igloo']
out: 'canada'
async: true
forwardGroups: true
group: true
ordered: true
, (data, groups, out, done) ->
console.log c.params.igloo
out.send eh
done()
dropInput
makes all input that comes before receiving required
params.
inport it will use the inPort named
inby default if it exists, the same for
out
Ports and events
Being a flow-based programming environment, the main action in NoFlo happens through ports and their connections. All actions a component does should be triggered via input port events. There are several events that can be associated with ports:
- Attach: there is a connection to the port
- Connect: the port has started sending or receiving a data transmission
- BeginGroup: the data stream after this event is associated with a given named group. Components may or may not utilize this information
- Data: an individual data packet in a transmission. There might be multiple depending on how a component operates
- EndGroup: A particular grouped stream of data ends
- Disconnect: end of data transmission
- Detach: A connection to the port has been removed
- IP: An Information Packet has been sent, could be with a type of
data
,openBracket
, orcloseBracket
. This is the modern way, and usually the only thing that should be listened for.
It depends on the nature of the component how these events may be handled. Most typical components do operations on a whole transmission, meaning that they should wait for the disconnect event on inports before they act, but some components can also act on single data packets coming in.
When a port has no connections, meaning that it was initialized without a connection, or a detach event has happened, it should do no operations regarding that port.
Translation
In process api, there are no connect
and disconnect
events inside of the process
function. However, for backwards compatibility, they are translated when sending out.
Old to new
- Connect: is translated into OpenBracket
- BeginGroup: is translated into OpenBracket
- EndGroup: is translated into CloseBracket
- Disconnect: is translated into CloseBracket
New to old
- OpenBracket (aside from the outtermost): are translated into BeginGroup
- CloseBracket (aside from the outtermost): are translated into EndGroup
This means if you send out events using the new API, you can use the old way of listening to it, and vice versa.
Porting
1) Initial
fs = require 'fs'
noflo = require 'noflo'
exports.getComponent = ->
c = new noflo.Component
c.icon = 'copy'
c.description = 'Copy a file'
c.inPorts.add 'source',
datatype: 'string'
c.inPorts.add 'destination',
datatype: 'string'
c.outPorts.add 'out',
datatype: 'string'
required: false
c.outPorts.add 'error',
datatype: 'object'
required: false
noflo.helpers.WirePattern c,
in: ['source', 'destination']
out: 'out'
forwardGroups: true
async: true
, (data, groups, out, callback) ->
handleError = (err) ->
if err.code is 'EMFILE'
return callback err
callback err
rs = fs.createReadStream data.source
ws = fs.createWriteStream data.destination
rs.on 'error', handleError
ws.on 'error', handleError
rs.pipe ws
rs.on 'end', ->
out.send data.destination
do callback
c
2) Basics
fs = require 'fs'
noflo = require 'noflo'
exports.getComponent = ->
# We can change the port definitions, descriptions, and mark the required ports which shows the ones that are not marked as required are not.
c = new noflo.Component
icon: 'copy'
description: 'Copy a file'
inPorts:
source:
datatype: 'string'
description: 'source of the file to copy'
required: true
destination:
datatype: 'string'
description: 'where to copy the file to'
required: true
outPorts:
out:
datatype: 'string'
description: 'contents of the file to send'
error:
datatype: 'object'
description: 'if something goes wrong copying or writing the file'
# since WirePattern returns component, we do not need to return it at the end.
noflo.helpers.WirePattern c,
in: ['source', 'destination']
out: 'out'
forwardGroups: true
async: true
, (data, groups, out, callback) ->
handleError = (err) ->
if err.code is 'EMFILE'
return callback err
callback err
rs = fs.createReadStream data.source
ws = fs.createWriteStream data.destination
rs.on 'error', handleError
ws.on 'error', handleError
rs.pipe ws
rs.on 'end', ->
out.send data.destination
do callback
3) Process
fs = require 'fs'
noflo = require 'noflo'
exports.getComponent = ->
c = new noflo.Component
icon: 'copy'
description: 'Copy a file'
inPorts:
source:
datatype: 'string'
description: 'source of the file to copy'
required: true
destination:
datatype: 'string'
description: 'where to copy the file to'
required: true
outPorts:
out:
datatype: 'string'
description: 'contents of the file to send'
error:
datatype: 'object'
description: 'if something goes wrong copying or writing the file'
# WirePattern had forwardGroups firing definition, it sends every bracket received to every outport, process api you can manually define which brackets go where
c.forwardBrackets =
source: ['out', 'error']
destination: ['out', 'error']
# change from WirePattern to process
# WirePattern had `async` firing definition, process api is async by default
c.process (input, output) ->
# WirePattern had the `in` firing input port definition, in process it gets called on any event so we just return unless it has packets for these
return unless input.has 'source', 'destination'
# instead of passing in the data we want in WirePattern, we get out the data we want
rs = fs.createReadStream input.getData('source')
ws = fs.createWriteStream input.getData('destination')
# will send the data to sendDone which will send it out of `error` port because it is sent an Error object
rs.on 'error', output.sendDone
ws.on 'error', output.sendDone
rs.pipe ws
rs.on 'end', ->
# in WirePattern you had to send the data and call `callback`
# in process, there is a function to do both
output.sendDone out: data.destination
Advanced Porting
1) Initial
noflo = require "noflo"
_ = require "underscore"
{ deepCopy } = require "owl-deepcopy"
class FilterProperty extends noflo.Component
icon: 'filter'
description: "Filter out some properties by matching RegExps
against the keys of incoming objects"
constructor: ->
@keys = []
@recurse = false
@keep = false
@legacy = false
# Legacy mode
@accepts = []
@regexps = []
@inPorts = new noflo.InPorts
in:
datatype: 'object'
description: 'Object to filter properties from'
key:
datatype: 'string'
description: 'Keys to filter (one key per IP)'
recurse:
datatype: 'boolean'
description: '"true" to recurse on the object\'s values'
keep:
datatype: 'boolean'
description: '"true" if matching properties must be kept, otherwise removed'
# Legacy mode
accept:
datatype: 'all'
regexp:
datatype: 'all'
@outPorts = new noflo.OutPorts
out:
datatype: 'object'
@inPorts.keep.on "data", (keep) =>
@keep = String(keep) is "true"
@inPorts.recurse.on "data", (data) =>
@recurse = String(data) is "true"
@inPorts.key.on "connect", =>
@keys = []
@inPorts.key.on "data", (key) =>
@keys.push new RegExp key, "g"
# Legacy mode
@inPorts.accept.on "data", (data) =>
@legacy = true
@accepts.push data
@inPorts.regexp.on "data", (data) =>
@legacy = true
@regexps.push data
@inPorts.in.on "begingroup", (group) =>
@outPorts.out.beginGroup group
@inPorts.in.on "data", (data) =>
# Legacy mode
if @legacy
@filterData data
else
if _.isObject data
data = deepCopy data
@filter data
@outPorts.out.send data
@inPorts.in.on "endgroup", (group) =>
@outPorts.out.endGroup()
@inPorts.in.on "disconnect", =>
@outPorts.out.disconnect()
filter: (object) ->
return if _.isEmpty object
for key, value of object
isMatched = false
for filter in @keys
match = key.match filter
if not @keep and match or
@keep and not match
delete object[key]
isMatched = true
break
if not isMatched and _.isObject(value) and @recurse
@filter value
# Legacy mode
filterData: (object) ->
newData = {}
match = false
for property, value of object
if @accepts.indexOf(property) isnt -1
newData[property] = value
match = true
continue
for expression in @regexps
regexp = new RegExp expression
if regexp.exec property
newData[property] = value
match = true
return unless match
@outPorts.out.send newData
exports.getComponent = -> new FilterProperty
2) Basics
# change to use single quotes
noflo = require 'noflo'
_ = require 'underscore'
{ deepCopy } = require 'owl-deepcopy'
# change from class to function
exports.getComponent ->
c = new noflo.Component
c.icon = 'filter'
c.description: 'Filter out some properties by matching RegExps
against the keys of incoming objects'
constructor: ->
c.keys = []
c.recurse = false
c.keep = false
c.legacy = false
# Legacy mode
c.accepts = []
c.regexps = []
c.inPorts = new noflo.InPorts
in:
datatype: 'object'
description: 'Object to filter properties from'
key:
datatype: 'string'
description: 'Keys to filter (one key per IP)'
recurse:
datatype: 'boolean'
description: '"true" to recurse on the object\'s values'
keep:
datatype: 'boolean'
description: '"true" if matching properties must be kept, otherwise removed'
# Legacy mode
accept:
datatype: 'all'
regexp:
datatype: 'all'
c.outPorts = new noflo.OutPorts
out:
datatype: 'object'
c.inPorts.keep.on "data", (keep) =>
c.keep = String(keep) is "true"
c.inPorts.recurse.on "data", (data) =>
c.recurse = String(data) is "true"
c.inPorts.key.on "connect", =>
c.keys = []
c.inPorts.key.on "data", (key) =>
c.keys.push new RegExp key, "g"
# Legacy mode
c.inPorts.accept.on "data", (data) =>
c.legacy = true
c.accepts.push data
c.inPorts.regexp.on "data", (data) =>
c.legacy = true
c.regexps.push data
c.inPorts.in.on "begingroup", (group) =>
c.outPorts.out.beginGroup group
c.inPorts.in.on "data", (data) =>
# Legacy mode
if c.legacy
c.filterData data
else
if _.isObject data
data = deepCopy data
c.filter data
c.outPorts.out.send data
c.inPorts.in.on "endgroup", (group) =>
c.outPorts.out.endGroup()
c.inPorts.in.on "disconnect", =>
c.outPorts.out.disconnect()
filter: (object) ->
return if _.isEmpty object
for key, value of object
isMatched = false
for filter in c.keys
match = key.match filter
if not c.keep and match or
c.keep and not match
delete object[key]
isMatched = true
break
if not isMatched and _.isObject(value) and c.recurse
c.filter value
# Legacy mode
filterData: (object) ->
newData = {}
match = false
for property, value of object
if c.accepts.indexOf(property) isnt -1
newData[property] = value
match = true
continue
for expression in c.regexps
regexp = new RegExp expression
if regexp.exec property
newData[property] = value
match = true
return unless match
c.outPorts.out.send newData
exports.getComponent = -> new FilterProperty
3) Class to function
# change to use single quotes
noflo = require 'noflo'
_ = require 'underscore'
{ deepCopy } = require 'owl-deepcopy'
# change from class to function
exports.getComponent ->
c = new noflo.Component
c.icon = 'filter'
c.description: 'Filter out some properties by matching RegExps
against the keys of incoming objects'
# remove state
constructor: ->
c.keys = []
c.recurse = false
c.keep = false
c.legacy = false
# Legacy mode
c.accepts = []
c.regexps = []
c.inPorts = new noflo.InPorts
in:
datatype: 'object'
description: 'Object to filter properties from'
key:
datatype: 'string'
description: 'Keys to filter (one key per IP)'
recurse:
datatype: 'boolean'
description: '"true" to recurse on the object\'s values'
keep:
datatype: 'boolean'
description: '"true" if matching properties must be kept, otherwise removed'
# Legacy mode
accept:
datatype: 'all'
regexp:
datatype: 'all'
c.outPorts = new noflo.OutPorts
out:
datatype: 'object'
c.inPorts.keep.on "data", (keep) =>
c.keep = String(keep) is "true"
c.inPorts.recurse.on "data", (data) =>
c.recurse = String(data) is "true"
c.inPorts.key.on "connect", =>
c.keys = []
c.inPorts.key.on "data", (key) =>
c.keys.push new RegExp key, "g"
# Legacy mode
c.inPorts.accept.on "data", (data) =>
c.legacy = true
c.accepts.push data
c.inPorts.regexp.on "data", (data) =>
c.legacy = true
c.regexps.push data
c.inPorts.in.on "begingroup", (group) =>
c.outPorts.out.beginGroup group
c.inPorts.in.on "data", (data) =>
# Legacy mode
if c.legacy
c.filterData data
else
if _.isObject data
data = deepCopy data
c.filter data
c.outPorts.out.send data
c.inPorts.in.on "endgroup", (group) =>
c.outPorts.out.endGroup()
c.inPorts.in.on "disconnect", =>
c.outPorts.out.disconnect()
filter: (object) ->
return if _.isEmpty object
for key, value of object
isMatched = false
for filter in c.keys
match = key.match filter
if not c.keep and match or
c.keep and not match
delete object[key]
isMatched = true
break
if not isMatched and _.isObject(value) and c.recurse
c.filter value
# Legacy mode
filterData: (object) ->
newData = {}
match = false
for property, value of object
if c.accepts.indexOf(property) isnt -1
newData[property] = value
match = true
continue
for expression in c.regexps
regexp = new RegExp expression
if regexp.exec property
newData[property] = value
match = true
return unless match
c.outPorts.out.send newData
exports.getComponent = -> new FilterProperty
4) Process
# remove need for underscorejs
noflo = require 'noflo'
{ deepCopy } = require 'owl-deepcopy'
exports.getComponent = ->
c = new noflo.Component
c.icon = 'filter'
c.description = 'Filter out some properties by matching RegExps
against the keys of incoming objects'
c.inPorts = new noflo.InPorts
in:
datatype: 'object'
description: 'Object to filter properties from'
required: true
key:
datatype: 'string'
description: 'Keys to filter (one key per IP)'
required: true
recurse:
datatype: 'boolean'
description: '"true" to recurse on the object\'s values'
control: true
default: false
keep:
datatype: 'boolean'
description: '"true" if matching properties must be kept, otherwise removed'
control: true
default: false
# Legacy mode
accept:
datatype: 'all'
regexp:
datatype: 'all'
c.outPorts = new noflo.OutPorts
out:
datatype: 'object'
c.filter = (object, keys, recurse, keep, input) ->
for key, value of object
isMatched = false
# the keys are filters we want to match in the object
for filter in keys
match = key.match filter
# if they match, we delete them
matchButDontKeep = not keep and match
keepButDontMatch = keep and not match
if matchButDontKeep or keepButDontMatch
delete object[key]
isMatched = true
break
if not isMatched and recurse and typeof value is 'object'
c.filter value, keys, recurse, keep, input
c.process (input, output) ->
# because we only want to use non-brackets
return input.buffer.get().pop() if input.ip.type isnt 'data'
return unless input.has 'in', 'key'
legacy = false
if input.has('accept') or input.has('regexp')
legacy = true
accepts = input.get('accept').data
regexp = input.get('regexp').data
# because we can have multiple data packets,
# we want to get them all, and use just the data
keys = input.buffer
.find 'key', (ip) -> ip.type is 'data' and ip.data?
.map (ip) -> new RegExp ip.data, "g"
data = input.getData 'in'
recurse = input.getData 'recurse'
keep = input.getData 'keep'
if keep? and typeof keep is 'object'
keep = keep.pop()
unless legacy
if typeof data is 'object'
data = deepCopy data
c.filter data, keys, recurse, keep, input
output.sendDone data
# Legacy mode
else
newData = {}
match = false
for property, value of data
if accepts.indexOf(property) isnt -1
newData[property] = value
match = true
continue
for expression in regexp
regex = new RegExp expression
if regex.exec property
newData[property] = value
match = true
return unless match
output.sendDone newData
input.buffer.set 'key', []