Information Packets (IP)
The IP object
Each packet that a component sends or receives is enveloped in IP class. IP objects are normally created and obtained by utilizing Ports API (see section below), this section considers important properties and methods of the object.
Primary properties
Any IP object has 2 mandatory properties: .type
and .data
.
type
There are 3 types of objects:
- data
- a normal data packet
- openBracket
- a packet indicating a beginning of a substream
- closeBracket
- a packet indicating an ending of a substream
data
This property is the actual value that a packet carries. The default value is null
. Bracket IPs may also carry non-null data, the semantics of which is user-defined.
Optional properties
Arbitrary properties can be set in an IP object. There is a list of such properties recognized by the system and given a special meaning.
groups
- a list of groups for a packet. This is the replacement for the earlierbegingroup
events which are now obsolete.scope
- is a token string that identifies ascope
of availability for a packet. See scope for more information.owner
- a reference to a process which currently owns or was the last one to own the packet.index
- an index of an addressable port on which the IP was received.
Ports API
A process should use its ports in order to send or receive IPs. We have updated the Ports API so that new components can benefit from using object IPs, while the old components still see them as raw events and raw data.
Input
A component accepting IP objects as its input should use process
to handle those IPs:
exports.getComponent = ->
c = new noflo.Component
inPorts:
in:
datatype: 'string'
c.process (input, output) ->
console.log '<' if input.ip.type is 'openBracket'
console.log '>' if input.ip.type is 'closeBracket'
console.log input.ip.data
For addressable ports the index
property contains array port index:
exports.getComponent = ->
c = new noflo.Component
inPorts:
in:
datatype: 'string'
addressable: true
c.process (input, output) ->
console.log input.ip.data, "on in[#{input.ip.index}]"
Output
Output ports provide handy methods for sending data enveloped in IP objects.
The most common case is sending data using the data
method with signature
data: (data, options = {}, index = null) ->
data
- the actual value to be carriedoptions
- map of any optional IP properties such asgroups
,scope
, etc.index
- index for array output ports
There are similar methods to send openBracket
and closeBracket
packets:
openBracket: (data = null, options = {}, index = null) ->
closeBracket: (data = null, options = {}, index = null) ->
data
- optional data carried with bracketoptions
- map of optional IP propertiesindex
- index for array output ports
All these 3 methods are chainable. It means that a chain of calls will produce a series of packets sent one after another.
The most basic usage example is sending just data value:
component.outPorts.foo.data 'bar'
An example of sending a packet with optional properties:
component.outPorts.foo.data 'bar',
scope: 'a1b2c3d4'
groups: ['tier1']
Sending a substream via a chain of calls:
component.outPorts.foo.openBracket 'accounts'
.data accountRecord1
.data accountRecord2
.closeBracket 'accounts'
Scope
scope
is a token string that identifies a scope
of availability for a packet. Packets with the same scope
are visible across the network(s) within the same context, while packets with different scope
do not overlap. This is useful e.g. for isolating data that belongs to different requests.
To check the scope of an IP:
c.process (input, output) ->
data = input.get 'in'
console.log data.scope
If you need to access the scope of the latest IP being sent in:
c.process (input, output) ->
console.log input.scope
To set a scope, for example, an express request id:
express = require 'express'
app = express()
app.get '/eh', (req, res) ->
attachedInPortSocket.post new noflo.IP 'data', req, scope: req.id
When you have to set state and cannot keep things in the buffer, assign properties to use scoped indexes, and dont forget to reset the state when required.
c.example = {}
c.process (input, output) ->
return unless input.ip.type is 'data'
data = input.get 'in'
c.example[data.scope] = data.data
output.done()
if true
delete c.example[data.scope]
Concurrency problems can arise in graphs. The following is an example of such a situation:
1) a packet (named $1) is sent to start
2) start sends $1 packet to `eh`
3) start sends $1 packet to `canada`
This is where it gets tricky. Sometimes, canada
might take longer than eh
Sometimes, eh
might take longer than canada
. canada
might use the database
from previous requests.
So for this run, it might then continue by doing:
4) `eh` sends a packet $1 to `merge`
5) a new packet (named $2) is sent to start
6) start sends $2 packet to `eh`
7) start sends $2 packet to `canada`
8) `canada` sends $2 packet to `merge`
9) now `merge` sends out data, but it has $1 from `eh` but $2 from `canada`!
10) `eh` sends $2 to `merge`
11) now, `merge` sends out data, but it has $2 from `eh`, but $1 from `canada`!
When scopes are used, even if the same order of operations happens, it will use the data from the correct request.