Process API
The main idea behind process api is having all port events come into one place, and all of the output sent out from the same place.
First of all let's see what a component defined using Process API looks like:
exports.getComponent = ->
new noflo.Component
description: "Forward received data"
inPorts:
in:
datatype: 'all'
description: 'An input port'
outPorts:
out:
datatype: 'all'
description: 'An output port'
process: (input, output) ->
return unless input.has 'in'
data = input.getData 'in'
output.send data
output.done()
Note the process: (input, output) ->
, that's the main method on
Process API and gives its name. The input
is a reference to all
incoming IPs to the component and output
is a reference to all
outgoing IPs from the component.
The way the process api works is the async process function gets called for each IP. If done
does not get called, the process function will getting called, and the IPs that are passed to it keep getting appended to the buffer.
component.process
returns an instance of component, so you don't have to
return it on your component definition.
Index
Component States
- 1) Components start
- 2) Components process preconditions are checked
- 3) Once Components preconditions are passed, they begin processing.
- 4) Finally, once Components finish processing and call
output.done
, they are done.
2) Preconditions
Given that input
represents all the input received by a component, it is
common to check for some preconditions before processing.
Preconditions are usually at the beginning of the process
function.
They check to make sure the inports have the required packets,
if they match certain criteria, or anything else that is required for the component to begin
processing.
A common operation is to check if input
has a packet arriving at some port:
input.has 'portname'
Using has
with multiple arguments will check if all those inports have packets:
input.has 'portname', 'secondportname'
An important detail to note about input.has
is that it will check for any packet type
(i.e. openBracket
, data
, closeBracket
). In this context, input.has
can be given
a callback argument that
can check any IP attributes to answer if the IP matches the custom requirements or not.
For example, if someone wants only data
IPs, it is possible to filter them
using the callback:
hasHoldData = input.has 'hold', (ip) -> ip.type is 'data'
3) Processing
Once a component has passed the preconditions, it begins processing. Processing is where you get the data, possibly perform operations on data, and can send IPs out before calling output.done
.
Receiving
Input
Consider input
as the representation of all data received by the component.
Get
When someone wants to receive data in a component, input.get
will get the first
IP from the buffer of that port.
For example, if the incoming packet flow on an in
inPort is:
1) openBracket
2) data
3) closeBracket
If input.get 'in'
is called, first it will receive the openBracket
(remember that openBracket
is also an IP).
If called again, it will receive data
, and then again after that, would
receive closeBracket
.
Since input.get
removes an IP from the buffer each time it is called, you can repeatedly
call it until you have what you need. For example, if you want to collect all
data
IPs:
data = input.get 'in'
until data.type is 'data'
data = data.get 'in'
GetData
The input.getData portname
is a shortcut for input.get(portname).data
.
If the port name is not passed in as an argument, it will try to retrieve from
the in
inport. Meaning, input.getData()
is the same as input.getData 'in'
.
input.get|getData
on a control
port, it does not reset the control
ports buffer because the data is meant to persist until new data is sent to that control
port. control
ports also only accept data
IP
s. If it is sent bracket IP
s, they will be dropped silently.
As said, input.getData
will accept port name(s) as the parameter.
Passing in one port will give the data:
data = input.getData portname
Passing in multiple port names will give an array of the data (using destructuring):
[canada, eh] = input.getData 'canada', 'eh'
input.get
and input.getData
will remove the item retrieved using it from the buffer.
Sending
An output map is an object whos keys are the ports it is sending to, and the values are the data to send to those ports.
Output
Consider output
as the representation of all data sent by the component.
If you're taking something and send
ing multiple IPs,
you should make them a Stream
, meaning it should be wrapped with an
openBracket
and an closeBracket
:
output.send new noflo.IP 'openBracket'
for data in ['eh', 'igloo']
output.send out: data
output.send new noflo.IP 'closeBracket'
output.done()
If you're just sending one packet out of one port, it is usually best to use
output.sendDone
which is a shortcut for output.send
+ output.done
:
output.sendDone out: 'data'
Which is the same as:
output.send out: 'data'
output.done()
It is also possible to send to multiple ports at a time using an output map like:
output.sendDone
first: 'foo'
second: 'bar'
The following is an example of a component that sends to multiplor ports at a time or error if there is an error:
exports.getComponent = ->
c = new noflo.Component
description: 'Take in data on 1 inport, repeat that data out a port
and send other data out of another'
inPorts:
eh:
datatype: 'string'
outPorts:
canadian:
datatype: 'all'
igloo:
datatype: 'all'
error:
datatype: 'object'
c.forwardBrackets =
eh: ['canadian', 'igloo']
c.process (input, output) ->
return unless input.has 'eh'
eh = input.getData 'eh'
unless eh?
# will send to `error` outport
output.sendDone new Error('inPort `eh` did not have any real data!')
return
output.sendDone
canadian: '+1'
igloo: eh
Sending IPs
There are generally 2 cases when you’d want to send noflo.IP
explicitly:
- When sending
openBracket
orcloseBracket
- When specifying IP metadata, such as
scope
,index
or application-specific metadata.
exports.getComponent = ->
c = new noflo.Component
description: 'get data on an inport, send out as a specifically scoped IP'
inPorts:
in:
datatype: 'all'
outPorts:
out:
datatype: 'all'
c.process (input, output) ->
return unless input.has 'in'
data = input.getData 'in'
output.sendDone out: new noflo.IP('data', data, scope: 'example-scope')
To see more usage of sending, including using streams, check out writing your own projects guide component, FindEhs.
4) Done
When you are done processing your data, call output.done()
(or output.sendDone
if it makes sense for how you're using it.)
Error
can be send to output.sendDone
or
output.done
which
will send the Error
to the error
port.
If there is not an error
port defined, it will propogate back up. The same happens if you just
throw an Error
.
output.sendDone new Error('we have a problem')
In the future, it may emit a `ProccessError`.
Brackets
Brackets are Information Packets used to group things. Read more about their different types in Information Packets Types.
BracketForwarding
in
inport to
outports out
and error
(if those ports exist).
Bracket forwarding is a way to pass on brackets so that you don't have to deal with brackets coming from that inport in the process function.
If an inport receives an openBracket
, data
, and closeBracket
and you are using the bracketForwarding: true
option, you can get the
data
, process it and send IPs out. What you
send out will be wrapped in the openBracket
and closeBracket
.
For example, given some IPs coming into an inport:
1) openBracket ('name')
2) data
3) closeBracket ('name')
and the component handling the IPs:
exports.getComponent = ->
c = new noflo.Component
inPorts:
eh:
datatype: 'all'
outPorts:
canada:
datatype: 'object'
forwardBrackets:
eh: ['canada']
process: (input, output) ->
return unless input.has 'eh'
output.send canada: 'one'
output.send canada: 'two'
output.done()
a socket listening to the canada
outport would receive:
1) openBracket ('name')
2) 'one'
2) 'two'
3) closeBracket ('name')
When sending brackets as a group, the openBracket
and closeBracket
should contain the same data.
Control ports are not wrapped with brackets, they only deal with data
.
A more advanced example using sub-streams (should be avoided if possible because they add unnecessary complexity):
1) openBracket, '$outtermost'
2) openBracket, '$sub1'
3) data, 'eh'
4) closeBracket, '$sub2'
5) openBracket, '$sub2'
6) data, 'canada'
7) data, 'igloo'
8) closeBracket, '$sub2'
9) openBracket, '$outtermost'
and using the following component to handle the stream:
exports.getComponent = ->
c = new noflo.Component
inPorts:
in:
datatype: 'all'
outPorts:
out:
datatype: 'object'
process: (input, output) ->
return unless input.has 'in'
data = input.get 'in'
until data.type is 'data'
data = input.get 'in'
output.send data + ' eh!'
output.done()
the output stream would be:
1) openBracket, '$outtermost'
2) openBracket, '$sub1'
3) data, 'eh eh!'
4) closeBracket, '$sub2'
5) openBracket, '$sub2'
6) data, 'canada eh!'
7) data, 'igloo eh!'
8) closeBracket, '$sub2'
9) openBracket, '$outtermost'
What's happening here is forwardBrackets is taking the brackets
, sending the openBracket
, waiting until the data
inside of the stream has been sent, then sending the respective closeBracket
s.
Resources
An example of bracket forwarding can be found in: Loading Components inline.
A fairly simple usage: SplitObject
And here in the Canadianness project: FindWords
Example
exports.getComponent = ->
c = new noflo.Component
inPorts: in: datatype: 'all'
outPorts: out: datatype: 'all'
process: (input, output) ->
fruits = ['apples', 'bananas', 'grapes', 'oranges']
veggies = ['broccoli', 'cabbage', 'celery']
output.ports.out.send new noflo.IP 'openBracket', 'fruit'
for fruit in fruits
output.ports.out.send fruit
output.ports.out.send new noflo.IP 'closeBracket', 'fruit'
output.ports.out.send new noflo.IP 'openBracket', 'veggies'
for veggie in veggies
output.ports.out.send veggie
output.ports.out.send new noflo.IP 'closeBracket', 'veggies'
Avoid
To keep things simple, it is usually best practice to strive to avoid using brackets unless necessary.
Brackets can quickly overcomplicate things, when a stream of things is required, an object be substituted for brackets:
output.ports.name.send
name: 'fruits'
data: fruits
output.ports.name.send
name: 'veggies'
data: veggies
or
output.ports.name.send
names: ['fruits', 'veggies']
data:
fruits: fruits
veggies: veggies
Stream Helpers
hasStream
Will check if an input port has the full stream. A full stream is all the IPs surrounded by open/close brackets, as in this example.
input.hasStream portname
It can also take multiple port names as arguments:
input.hasStream 'eh', 'canada'
getStream
Will get the full stream and then reset the buffer state for that port.
stream = input.getStream portname
It can also take multiple port names as arguments:
[eh, canada] = input.getStream 'eh', 'canada'
For an example of how to use streams, see DetermineEmotion component.
Data Stream Helpers
Data Stream helpers are available so a component can receive a full stream, yet only have to deal with only the data IPs and let bracketForwarding option deal with the brackets. The data stream helpers are mainly used for ports that receive Flat Streams.
hasDataStream
Data streams are used by setting data: true
attribute on an inport and allowing bracketForwarding to forward brackets behind the scenes so that the process function only has to deal with the data
.
hasDataStream
will only work if the port data
property is true
.
hasStream checks if every openBracket has a closeBracket.
However, when forwardBrackets is enabled for a port, IPs that are not data
are removed from the buffer, so there has to be a separate value to track the IPs that come in that are not data. This is why hasStream does not work on a port using bracketForwarding.
When using forwardBrackets the process function is triggered before the last closeBracket
, so using data: true
changes it to be triggered after.
getDataStream
Often when using streams, all that is required is getting the data
from the stream.
Using only getStream
that might look like this:
c.process (input, output) ->
return unless input.hasStream 'in'
stream = input.getStream 'in'
.filter (ip) -> ip.type is 'data'
.map (ip) -> ip.data
console.log stream
This can be achieving much easier using the getDataStream
helper:
c.process (input, output) ->
return unless input.hasStream 'in'
stream = input.getDataStream 'in'
console.log stream
Flat Streams
Data Streams using the hasDataStream are usually only beneficial using flat streams. A flat stream has only one pair of open/close brackets, in other words there are no nested no sub-streams, like in the following example:
1) openBracket, $outtermost
2) data, 'eh'
3) data, 'canada'
4) data, 'igloo'
5) closeBracket, $outtermost
The following is a non flat stream because the data has sub-streams. When input comes in, sometimes packets are wrapped in sub-streams. For example (after the comma is the data in the packet):
1) openBracket, $outtermost
2) openBracket, $sub-stream-a
3) data, 'eh'
4) closeBracket, $sub-stream-a
5) openBracket, $sub-stream-b
6) data, 'canada'
7) data, 'igloo'
8) closeBracket, $sub-stream-b
9) closeBracket, $outtermost
If using this example sub-stream with Data Stream helpers using this example component:
exports.getComponent = ->
c = new noflo.Component
description: 'takes in data, and appends "eh" to the end of each string'
inPorts:
in:
datatype: 'string'
data: true
outPorts:
out:
datatype: 'string'
process: (input, output) ->
return unless input.hasDataStream 'in'
stream = input.getDataStream 'in'
for data in stream
output.send data + ' eh!'
output.done()
The resulting output stream would have lost the sub-stream brackets:
1) openBracket, $outtermost
2) data, 'eh eh!'
3) data, 'canada eh!'
4) data, 'igloo eh!'
5) closeBracket, $outtermost
See noflo-packets/Compact for an example using hasDataStream and getDataStream.
Firing Patterns
There are two standard firing patterns. First one is the full stream where
data is being sent to a port in order, surrounded or not by
openBracket
or closeBracket
:
1) openBracket (optional)
2) data
3) data
4) closeBracket (optional)
The second one is per packet where each data
IP is processed. Both
patterns and examples of their use are shown in the following sections.
Full Stream
# get everything from here...
1) openBracket
2) data
3) data
4) closeBracket
# ...to here
or if there is no wrapping brackets:
# get everything from here...
1) data
# ...to here
example implementation:
exports.getComponent = ->
c = new noflo.Component
icon: 'gear'
inPorts:
eh:
datatype: 'all'
required: true
outPorts:
canada:
datatype: 'object'
required: true
process: (input, output) ->
return unless input.hasStream 'in'
stream = input.getStream 'in'
# ...do stuff with the stream...
output.sendDone canada: stream
Per Packet
1) openBracket # don't get this
2) data # get only this
3) data # and then only this
4) closeBracket # don't get this
example:
coffeescript
exports.getComponent = ->
c = new noflo.Component
icon: 'gear'
inPorts:
eh:
datatype: 'all'
required: true
outPorts:
canada:
datatype: 'object'
required: true
c.forwardBrackets =
eh: ['canada']
process: (input, output) ->
return unless input.has 'eh', (ip) -> ip.type is 'data'
data = input.getData 'eh'
until data.type is 'data'
data = data.get 'eh'
# ...do stuff with the data...
output.send canada: data
Ordering
Ordered
The ordered
component option that makes the component maintain the
order between input
and output
regardless of streams. (default is false
)
For example, a synchronous KnexDbSelect
component that outputs rowsets
in the same order it gets queries, regardless of what time they take.
The option ordered
will not work unless autoOrdering
is disabled.
AutoOrdering
The autoOrdering
component option groups the output sending. (default is true
)
autoOrdering
temporarily enables ordered
for components still having ordered: false
to make them stream-safe.
If order is important, it can be disabled by setting component.autoOrdering = false
. See an example of autoOrdering
What autoOrdering
does is automatically turns ordered
on when it sees a stream coming, so it makes sure (or at least tries to) that the output stream is the result of processing exactly the same sequence as the input stream.
Buffer
If you need to do something advanced and the Get and Stream helpers cannot do what you need, you can read information right from the buffer. To do that easily, there are input.buffer
helpers.
To get the current buffer:
currentBuffer = input.buffer.get()
To get the current buffer for a specific port:
currentInBuffer = input.buffer.get 'in'
To get the current buffer for a multiple ports:
[inBuffer, ehBuffer] = input.buffer.get 'in', 'eh'
To find IPs matching criteria for a certain port:
openBracketAndDataForIn = input.buffer.find 'in', (ip) ->
(ip.type is 'data' or ip.type is 'openbracket') and ip.data?
To completely reset the buffer:
input.buffer.set portname, []
or based on your conditions (in this case, keeping only ips with data type):
input.buffer.filter portname, (ip) -> ip.type is 'data'
An example usage would be to not reset one port buffer while you reset different one and trigger on every IP.
noflo = require 'noflo'
exports.getComponent = ->
c = new noflo.Component
icon: 'gear'
inPorts:
eh:
datatype: 'all'
required: true
igloo:
datatype: 'all'
outPorts:
canada:
datatype: 'object'
process: (input, output) ->
return unless input.hasStream 'eh'
stream = input.getStream 'eh'
streamData = stream.filter (ip) -> ip.type is 'data'
data = 'ehs=' + streamData.length
if input.has 'igloo'
igloos = input.buffer.find 'igloo', (ip) -> ip.type is 'data'
for igloo in igloos
data += '&' + igloo.data
console.log data
output.sendDone canada: data
c = exports.getComponent()
eh = new noflo.internalSocket.createSocket()
igloo = new noflo.internalSocket.createSocket()
c.inPorts.eh.attach eh
c.inPorts.igloo.attach igloo
eh.send new noflo.IP 'openBracket'
eh.send 'eh?'
eh.send 'eh!'
eh.send 'eh!?'
igloo.send 'cold'
igloo.send 'message'
eh.send 'eh'
eh.send new noflo.IP 'closeBracket'
# @TODO: NEEDS A FIX
eh.send new noflo.IP 'closeBracket'
eh.send new noflo.IP 'openBracket'
eh.send '...eh...'
eh.send new noflo.IP 'closeBracket'
# @TODO: NEEDS A FIX
eh.send new noflo.IP 'closeBracket'