Documentation

0.7.8 API docs

Process API

The main idea behind process api is having all port events come into one place, and all of the output sent out from the same place.

First of all let's see what a component defined using Process API looks like:

exports.getComponent = ->
  new noflo.Component
    description: "Forward received data"
    inPorts:
      in:
        datatype: 'all'
        description: 'An input port'
    outPorts:
      out:
        datatype: 'all'
        description: 'An output port'
    process: (input, output) ->
      return unless input.has 'in'
      data = input.getData 'in'
      output.send data
      output.done()

Note the process: (input, output) ->, that's the main method on Process API and gives its name. The input is a reference to all incoming IPs to the component and output is a reference to all outgoing IPs from the component.

The way the process api works is the async process function gets called for each IP. If done does not get called, the process function will getting called, and the IPs that are passed to it keep getting appended to the buffer.

The component.process returns an instance of component, so you don't have to return it on your component definition.

Index


Component States

  • 1) Components start
  • 2) Components process preconditions are checked
  • 3) Once Components preconditions are passed, they begin processing.
  • 4) Finally, once Components finish processing and call output.done, they are done.

2) Preconditions

Given that input represents all the input received by a component, it is common to check for some preconditions before processing.

Preconditions are usually at the beginning of the process function. They check to make sure the inports have the required packets, if they match certain criteria, or anything else that is required for the component to begin processing.

A common operation is to check if input has a packet arriving at some port:

input.has 'portname'

Using has with multiple arguments will check if all those inports have packets:

input.has 'portname', 'secondportname'

An important detail to note about input.has is that it will check for any packet type (i.e. openBracket, data, closeBracket). In this context, input.has can be given a callback argument that can check any IP attributes to answer if the IP matches the custom requirements or not. For example, if someone wants only data IPs, it is possible to filter them using the callback:

hasHoldData = input.has 'hold', (ip) -> ip.type is 'data'

3) Processing

Once a component has passed the preconditions, it begins processing. Processing is where you get the data, possibly perform operations on data, and can send IPs out before calling output.done.

Receiving

Input

Consider input as the representation of all data received by the component.


Get

When someone wants to receive data in a component, input.get will get the first IP from the buffer of that port.

For example, if the incoming packet flow on an in inPort is:

1) openBracket
2) data
3) closeBracket

If input.get 'in' is called, first it will receive the openBracket (remember that openBracket is also an IP). If called again, it will receive data, and then again after that, would receive closeBracket.

Since input.get removes an IP from the buffer each time it is called, you can repeatedly call it until you have what you need. For example, if you want to collect all data IPs:

data = input.get 'in'
until data.type is 'data'
  data = data.get 'in'

GetData

The input.getData portname is a shortcut for input.get(portname).data.

If the port name is not passed in as an argument, it will try to retrieve from the in inport. Meaning, input.getData() is the same as input.getData 'in'.

Calling input.get|getData on a control port, it does not reset the control ports buffer because the data is meant to persist until new data is sent to that control port. control ports also only accept data IPs. If it is sent bracket IPs, they will be dropped silently.

As said, input.getData will accept port name(s) as the parameter. Passing in one port will give the data:

data = input.getData portname

Passing in multiple port names will give an array of the data (using destructuring):

[canada, eh] = input.getData 'canada', 'eh'
using input.get and input.getData will remove the item retrieved using it from the buffer.

Sending

An output map is an object whos keys are the ports it is sending to, and the values are the data to send to those ports.

Output

Consider output as the representation of all data sent by the component.


If you're taking something and sending multiple IPs, you should make them a Stream, meaning it should be wrapped with an openBracket and an closeBracket:

output.send new noflo.IP 'openBracket'

for data in ['eh', 'igloo']
  output.send out: data

output.send new noflo.IP 'closeBracket'
output.done()

If you're just sending one packet out of one port, it is usually best to use output.sendDone which is a shortcut for output.send + output.done:

output.sendDone out: 'data'

Which is the same as:

output.send out: 'data'
output.done()

It is also possible to send to multiple ports at a time using an output map like:

output.sendDone
  first: 'foo'
  second: 'bar'

The following is an example of a component that sends to multiplor ports at a time or error if there is an error:

exports.getComponent = ->
  c = new noflo.Component
    description: 'Take in data on 1 inport, repeat that data out a port
      and send other data out of another'
    inPorts:
      eh:
        datatype: 'string'
    outPorts:
      canadian:
        datatype: 'all'
      igloo:
        datatype: 'all'
      error:
        datatype: 'object'

  c.forwardBrackets =
    eh: ['canadian', 'igloo']

  c.process (input, output) ->
    return unless input.has 'eh'

    eh = input.getData 'eh'

    unless eh?
      # will send to `error` outport
      output.sendDone new Error('inPort `eh` did not have any real data!')
      return

    output.sendDone
      canadian: '+1'
      igloo: eh

Sending IPs

There are generally 2 cases when you’d want to send noflo.IP explicitly:

  • When sending openBracket or closeBracket
  • When specifying IP metadata, such as scope, index or application-specific metadata.
exports.getComponent = ->
  c = new noflo.Component
    description: 'get data on an inport, send out as a specifically scoped IP'
    inPorts:
      in:
        datatype: 'all'
    outPorts:
      out:
        datatype: 'all'

  c.process (input, output) ->
    return unless input.has 'in'

    data = input.getData 'in'
    output.sendDone out: new noflo.IP('data', data, scope: 'example-scope')

To see more usage of sending, including using streams, check out writing your own projects guide component, FindEhs.


4) Done

When you are done processing your data, call output.done() (or output.sendDone if it makes sense for how you're using it.)

An Error can be send to output.sendDone or output.done which will send the Error to the error port. If there is not an error port defined, it will propogate back up. The same happens if you just throw an Error.
output.sendDone new Error('we have a problem')
In the future, it may emit a `ProccessError`.

Brackets

Brackets are Information Packets used to group things. Read more about their different types in Information Packets Types.

BracketForwarding

animation

Brackets are automatically forwarded from in inport to outports out and error (if those ports exist).

Bracket forwarding is a way to pass on brackets so that you don't have to deal with brackets coming from that inport in the process function.

If an inport receives an openBracket, data, and closeBracket and you are using the bracketForwarding: true option, you can get the data, process it and send IPs out. What you send out will be wrapped in the openBracket and closeBracket.

For example, given some IPs coming into an inport:

1) openBracket ('name')
2) data
3) closeBracket ('name')

and the component handling the IPs:

exports.getComponent = ->
  c = new noflo.Component
    inPorts:
      eh:
        datatype: 'all'
    outPorts:
      canada:
        datatype: 'object'
    forwardBrackets:
      eh: ['canada']
    process: (input, output) ->
      return unless input.has 'eh'
      output.send canada: 'one'
      output.send canada: 'two'
      output.done()

a socket listening to the canada outport would receive:

1) openBracket ('name')
2) 'one'
2) 'two'
3) closeBracket ('name')

When sending brackets as a group, the openBracket and closeBracket should contain the same data.

Control ports are not wrapped with brackets, they only deal with data.

A more advanced example using sub-streams (should be avoided if possible because they add unnecessary complexity):

1) openBracket, '$outtermost'

2) openBracket, '$sub1'
3) data, 'eh'
4) closeBracket, '$sub2'

5) openBracket, '$sub2'
6) data, 'canada'
7) data, 'igloo'
8) closeBracket, '$sub2'

9) openBracket, '$outtermost'

and using the following component to handle the stream:

exports.getComponent = ->
  c = new noflo.Component
    inPorts:
      in:
        datatype: 'all'
    outPorts:
      out:
        datatype: 'object'
    process: (input, output) ->
      return unless input.has 'in'
      data = input.get 'in'
      until data.type is 'data'
        data = input.get 'in'

      output.send data + ' eh!'
      output.done()

the output stream would be:

1) openBracket, '$outtermost'

2) openBracket, '$sub1'
3) data, 'eh eh!'
4) closeBracket, '$sub2'

5) openBracket, '$sub2'
6) data, 'canada eh!'
7) data, 'igloo eh!'
8) closeBracket, '$sub2'

9) openBracket, '$outtermost'

What's happening here is forwardBrackets is taking the brackets, sending the openBracket, waiting until the data inside of the stream has been sent, then sending the respective closeBrackets.

Resources

An example of bracket forwarding can be found in: Loading Components inline.

A fairly simple usage: SplitObject

And here in the Canadianness project: FindWords

Example

exports.getComponent = ->
  c = new noflo.Component
    inPorts: in: datatype: 'all'
    outPorts: out: datatype: 'all'
    process: (input, output) ->
      fruits =  ['apples', 'bananas', 'grapes', 'oranges']
      veggies = ['broccoli', 'cabbage', 'celery']

      output.ports.out.send new noflo.IP 'openBracket', 'fruit'
      for fruit in fruits
        output.ports.out.send fruit
      output.ports.out.send new noflo.IP 'closeBracket', 'fruit'

      output.ports.out.send new noflo.IP 'openBracket', 'veggies'
      for veggie in veggies
        output.ports.out.send veggie
      output.ports.out.send new noflo.IP 'closeBracket', 'veggies'

Avoid

To keep things simple, it is usually best practice to strive to avoid using brackets unless necessary.

Brackets can quickly overcomplicate things, when a stream of things is required, an object be substituted for brackets:

output.ports.name.send
  name: 'fruits'
  data: fruits

output.ports.name.send
  name: 'veggies'
  data: veggies

or

output.ports.name.send
  names: ['fruits', 'veggies']
  data:
    fruits: fruits
    veggies: veggies

Stream Helpers

hasStream

Will check if an input port has the full stream. A full stream is all the IPs surrounded by open/close brackets, as in this example.

input.hasStream portname

It can also take multiple port names as arguments:

input.hasStream 'eh', 'canada'

getStream

Will get the full stream and then reset the buffer state for that port.

stream = input.getStream portname

It can also take multiple port names as arguments:

[eh, canada] = input.getStream 'eh', 'canada'

For an example of how to use streams, see DetermineEmotion component.


Data Stream Helpers

Data Stream helpers are available so a component can receive a full stream, yet only have to deal with only the data IPs and let bracketForwarding option deal with the brackets. The data stream helpers are mainly used for ports that receive Flat Streams.

hasDataStream

Data streams are used by setting data: true attribute on an inport and allowing bracketForwarding to forward brackets behind the scenes so that the process function only has to deal with the data.

hasDataStream will only work if the port data property is true.

hasStream checks if every openBracket has a closeBracket.

However, when forwardBrackets is enabled for a port, IPs that are not data are removed from the buffer, so there has to be a separate value to track the IPs that come in that are not data. This is why hasStream does not work on a port using bracketForwarding.

When using forwardBrackets the process function is triggered before the last closeBracket, so using data: true changes it to be triggered after.

getDataStream

Often when using streams, all that is required is getting the data from the stream.

Using only getStream that might look like this:

c.process (input, output) ->
  return unless input.hasStream 'in'
  stream = input.getStream 'in'
    .filter (ip) -> ip.type is 'data'
    .map (ip) -> ip.data
  console.log stream

This can be achieving much easier using the getDataStream helper:

c.process (input, output) ->
  return unless input.hasStream 'in'
  stream = input.getDataStream 'in'
  console.log stream

Flat Streams

Data Streams using the hasDataStream are usually only beneficial using flat streams. A flat stream has only one pair of open/close brackets, in other words there are no nested no sub-streams, like in the following example:

1) openBracket, $outtermost
2) data, 'eh'
3) data, 'canada'
4) data, 'igloo'
5) closeBracket, $outtermost

The following is a non flat stream because the data has sub-streams. When input comes in, sometimes packets are wrapped in sub-streams. For example (after the comma is the data in the packet):

1) openBracket, $outtermost

  2) openBracket, $sub-stream-a
  3) data, 'eh'
  4) closeBracket, $sub-stream-a

  5) openBracket, $sub-stream-b
  6) data, 'canada'
  7) data, 'igloo'
  8) closeBracket, $sub-stream-b

9) closeBracket, $outtermost

If using this example sub-stream with Data Stream helpers using this example component:

exports.getComponent = ->
  c = new noflo.Component
    description: 'takes in data, and appends "eh" to the end of each string'
    inPorts:
      in:
        datatype: 'string'
        data: true
    outPorts:
      out:
        datatype: 'string'
    process: (input, output) ->
      return unless input.hasDataStream 'in'
      stream = input.getDataStream 'in'
      for data in stream
        output.send data + ' eh!'
      output.done()

The resulting output stream would have lost the sub-stream brackets:

1) openBracket, $outtermost
2) data, 'eh eh!'
3) data, 'canada eh!'
4) data, 'igloo eh!'
5) closeBracket, $outtermost

See noflo-packets/Compact for an example using hasDataStream and getDataStream.


Firing Patterns

There are two standard firing patterns. First one is the full stream where data is being sent to a port in order, surrounded or not by openBracket or closeBracket:

1) openBracket (optional)
2) data
3) data
4) closeBracket (optional)

The second one is per packet where each data IP is processed. Both patterns and examples of their use are shown in the following sections.

Full Stream

# get everything from here...
1) openBracket
2) data
3) data
4) closeBracket
# ...to here

or if there is no wrapping brackets:

# get everything from here...
1) data
# ...to here

example implementation:

exports.getComponent = ->
  c = new noflo.Component
    icon: 'gear'
    inPorts:
      eh:
        datatype: 'all'
        required: true
    outPorts:
      canada:
        datatype: 'object'
        required: true
    process: (input, output) ->
      return unless input.hasStream 'in'
      stream = input.getStream 'in'
      # ...do stuff with the stream...
      output.sendDone canada: stream

Per Packet

1) openBracket # don't get this
2) data # get only this
3) data # and then only this
4) closeBracket # don't get this

example: coffeescript exports.getComponent = -> c = new noflo.Component icon: 'gear' inPorts: eh: datatype: 'all' required: true outPorts: canada: datatype: 'object' required: true c.forwardBrackets = eh: ['canada'] process: (input, output) -> return unless input.has 'eh', (ip) -> ip.type is 'data' data = input.getData 'eh' until data.type is 'data' data = data.get 'eh' # ...do stuff with the data... output.send canada: data


Ordering

Ordered

The ordered component option that makes the component maintain the order between input and output regardless of streams. (default is false)

By default, component outport is ordered when using output.send.

For example, a synchronous KnexDbSelect component that outputs rowsets in the same order it gets queries, regardless of what time they take.

The option ordered will not work unless autoOrdering is disabled.

AutoOrdering

The autoOrdering component option groups the output sending. (default is true)

autoOrdering temporarily enables ordered for components still having ordered: false to make them stream-safe.

If order is important, it can be disabled by setting component.autoOrdering = false. See an example of autoOrdering

What autoOrdering does is automatically turns ordered on when it sees a stream coming, so it makes sure (or at least tries to) that the output stream is the result of processing exactly the same sequence as the input stream.


Buffer

If you need to do something advanced and the Get and Stream helpers cannot do what you need, you can read information right from the buffer. To do that easily, there are input.buffer helpers.

When you manually read from the buffer using the buffer helpers, it is not reset automatically, so you have to manually change the buffer when you are finished processing and are done.

To get the current buffer:

currentBuffer = input.buffer.get()

To get the current buffer for a specific port:

currentInBuffer = input.buffer.get 'in'

To get the current buffer for a multiple ports:

[inBuffer, ehBuffer] = input.buffer.get 'in', 'eh'

To find IPs matching criteria for a certain port:

openBracketAndDataForIn = input.buffer.find 'in', (ip) ->
  (ip.type is 'data' or ip.type is 'openbracket') and ip.data?

To completely reset the buffer:

input.buffer.set portname, []

or based on your conditions (in this case, keeping only ips with data type):

input.buffer.filter portname, (ip) -> ip.type is 'data'

An example usage would be to not reset one port buffer while you reset different one and trigger on every IP.

noflo = require 'noflo'

exports.getComponent = ->
  c = new noflo.Component
    icon: 'gear'
    inPorts:
      eh:
        datatype: 'all'
        required: true
      igloo:
        datatype: 'all'
    outPorts:
      canada:
        datatype: 'object'
    process: (input, output) ->
      return unless input.hasStream 'eh'
      stream = input.getStream 'eh'
      streamData = stream.filter (ip) -> ip.type is 'data'

      data = 'ehs=' + streamData.length
      if input.has 'igloo'
        igloos = input.buffer.find 'igloo', (ip) -> ip.type is 'data'
        for igloo in igloos
          data += '&' + igloo.data

      console.log data
      output.sendDone canada: data

c = exports.getComponent()
eh = new noflo.internalSocket.createSocket()
igloo = new noflo.internalSocket.createSocket()
c.inPorts.eh.attach eh
c.inPorts.igloo.attach igloo

eh.send new noflo.IP 'openBracket'
eh.send 'eh?'
eh.send 'eh!'
eh.send 'eh!?'
igloo.send 'cold'
igloo.send 'message'
eh.send 'eh'
eh.send new noflo.IP 'closeBracket'

# @TODO: NEEDS A FIX
eh.send new noflo.IP 'closeBracket'

eh.send new noflo.IP 'openBracket'
eh.send '...eh...'
eh.send new noflo.IP 'closeBracket'

# @TODO: NEEDS A FIX
eh.send new noflo.IP 'closeBracket'