Documentation

0.7.8 API docs

Legacy

Index

WirePattern vs process

Firing

WirePattern Process
when all of the inPorts defined in the in WirePattern parameters and the required param ports have data, it is triggered. It is fired every time new data comes into any of the ports, you decide whether it matches the preconditions

State

WirePattern Process
to keep state, either properties are set on the component, or params are used depending on the situation. data can be kept in the buffer by using the buffer helper methods and removing it when done.

Request Isolation

WirePattern Process
difficult, see https://github.com/noflo/noflo/blob/master/src/lib/Helpers.coffee#L63
https://github.com/noflo/noflo/blob/master/spec/Helpers.coffee#L1460 request isolation for free with packet scope

Accessing ports

WirePattern Process
anywhere that has access to the component, use component.outPorts.portname or component.inPorts.portname. inside process api function output.ports.portname, or input.ports.portname, or when accessing input or when sending.

Asynchronous components

Asynchronous components process data and send output some time later. The outputs are sent in the order that they are processed, which might be a different than the order received.

Previously, we used a dedicated class named AsyncComponent which would be extended.

Now, process api is asynchronous by default.

WirePattern can be used for async by setting the async property. WirePattern Async

WirePattern

exports.getComponent = ->
c = new noflo.Component
# when you are finished processing, call `done`
  noflo.helpers.WirePattern c, (data, groups, out, done) ->
    done()

1) in & out array

exports.getComponent = ->
  c = new noflo.Component

  # When `in` has an array, it will put them all into an object as the object properties
  # When `out` has an array, it will make the out an object and put each output as the objects properties
  noflo.helpers.WirePattern c,
    in: ['in', 'eh'] # the inPorts to recieve from
    out: ['out', 'canada'] # the outPorts to send out of
    async: true # whether the component is asynchronous or not
  , (data, groups, out, done) ->
    out.out.send data.out
    out.canada.send data.eh
    done()

2) in & out singular

exports.getComponent = ->
  c = new noflo.Component

  # When `in` only has one string, it is only the data sent to it
  # When `out` is only one string, is is only the port to send out of
  noflo.helpers.WirePattern c,
    in: 'eh'
    out: 'canada'
    async: true
  , (eh, groups, out, done) ->
    out.send eh
    done()

3) forwardGroups

exports.getComponent = ->
  c = new noflo.Component

  # `forwardGroups` is similar to using forwardBrackets in process api, it takes groups from each `in` sent to it and sends them to each `out` wrapping them.
  # it would forward group IPs from
  # inputs to the output sending them along with the data. This option also
  # accepts string or array values, if you want to forward groups from specific
  # port(s) only. By default group forwarding is `false`.
  noflo.helpers.WirePattern c,
    in: ['eh', 'moose']
    out: 'canada'
    async: true
    forwardGroups: true
  , (data, groups, out, done) ->
    out.send eh
    done()

4) group

exports.getComponent = ->
  c = new noflo.Component

  # `group` will ensure that WirePattern will only fire when each `in` has received data wrapped in the same group (ie, `eh` receives the same groups as `moose`)
  noflo.helpers.WirePattern c,
    in: ['eh', 'moose']
    out: 'canada'
    async: true
    forwardGroups: true
    group: true
  , (data, groups, out, done) ->
    out.send eh
    done()

5) ordered

exports.getComponent = ->
  c = new noflo.Component

  # using `ordered` will make sure the data that is received in the order you defined it in the `in`, in this case, `eh` before `moose`

  # With `config.group = true` it checks incoming group IPs and collates
  # data with matching group IPs. By default this kind of grouping is `false`.
  # Set `config.group` to a RegExp object to correlate inputs only if the
  # group matches the expression (e.g. `^req_`). For non-matching groups
  # the component will act normally.
  noflo.helpers.WirePattern c,
    in: ['eh', 'moose']
    out: 'canada'
    async: true
    forwardGroups: true
    group: true
    ordered: true
  , (data, groups, out, done) ->
    out.send eh
    done()

6) params

exports.getComponent = ->
  c = new noflo.Component

  # using `params` will keep whatever the last data that was sent to it under `c.params.name`. For example here, `c.params.igloo`. Important to note it will not wait for params before firing.
  noflo.helpers.WirePattern c,
    in: ['eh', 'moose']
    params: ['igloo']
    out: 'canada'
    async: true
    forwardGroups: true
    group: true
    ordered: true
  , (data, groups, out, done) ->
    console.log c.params.igloo
    out.send eh
    done()

dropInput makes all input that comes before receiving required params.

if you do not specify an
in
port it will use the inPort named
in
by default if it exists, the same for
out

Ports and events

Being a flow-based programming environment, the main action in NoFlo happens through ports and their connections. All actions a component does should be triggered via input port events. There are several events that can be associated with ports:

  • Attach: there is a connection to the port
  • Connect: the port has started sending or receiving a data transmission
  • BeginGroup: the data stream after this event is associated with a given named group. Components may or may not utilize this information
  • Data: an individual data packet in a transmission. There might be multiple depending on how a component operates
  • EndGroup: A particular grouped stream of data ends
  • Disconnect: end of data transmission
  • Detach: A connection to the port has been removed
  • IP: An Information Packet has been sent, could be with a type of data, openBracket, or closeBracket. This is the modern way, and usually the only thing that should be listened for.

It depends on the nature of the component how these events may be handled. Most typical components do operations on a whole transmission, meaning that they should wait for the disconnect event on inports before they act, but some components can also act on single data packets coming in.

When a port has no connections, meaning that it was initialized without a connection, or a detach event has happened, it should do no operations regarding that port.

Translation

In process api, there are no connect and disconnect events inside of the process function. However, for backwards compatibility, they are translated when sending out.

Old to new

  • Connect: is translated into OpenBracket
  • BeginGroup: is translated into OpenBracket
  • EndGroup: is translated into CloseBracket
  • Disconnect: is translated into CloseBracket

New to old

  • OpenBracket (aside from the outtermost): are translated into BeginGroup
  • CloseBracket (aside from the outtermost): are translated into EndGroup

This means if you send out events using the new API, you can use the old way of listening to it, and vice versa.


Porting

1) Initial

  fs = require 'fs'
  noflo = require 'noflo'

  exports.getComponent = ->
    c = new noflo.Component
    c.icon = 'copy'
    c.description = 'Copy a file'

    c.inPorts.add 'source',
      datatype: 'string'
    c.inPorts.add 'destination',
      datatype: 'string'
    c.outPorts.add 'out',
      datatype: 'string'
      required: false
    c.outPorts.add 'error',
      datatype: 'object'
      required: false

    noflo.helpers.WirePattern c,
      in: ['source', 'destination']
      out: 'out'
      forwardGroups: true
      async: true
    , (data, groups, out, callback) ->
      handleError = (err) ->
        if err.code is 'EMFILE'
          return callback err
        callback err

      rs = fs.createReadStream data.source
      ws = fs.createWriteStream data.destination
      rs.on 'error', handleError
      ws.on 'error', handleError

      rs.pipe ws
      rs.on 'end', ->
        out.send data.destination
        do callback

    c

2) Basics

  fs = require 'fs'
  noflo = require 'noflo'

  exports.getComponent = ->

    #  We can change the port definitions, descriptions, and mark the required ports which shows the ones that are not marked as required are not.
    c = new noflo.Component
      icon: 'copy'
      description: 'Copy a file'
      inPorts:
        source:
          datatype: 'string'
          description: 'source of the file to copy'
          required: true
        destination:
          datatype: 'string'
          description: 'where to copy the file to'
          required: true
      outPorts:
        out:
          datatype: 'string'
          description: 'contents of the file to send'
        error:
          datatype: 'object'
          description: 'if something goes wrong copying or writing the file'

    # since WirePattern returns component, we do not need to return it at the end.
    noflo.helpers.WirePattern c,
      in: ['source', 'destination']
      out: 'out'
      forwardGroups: true
      async: true
    , (data, groups, out, callback) ->
      handleError = (err) ->
        if err.code is 'EMFILE'
          return callback err
        callback err

      rs = fs.createReadStream data.source
      ws = fs.createWriteStream data.destination
      rs.on 'error', handleError
      ws.on 'error', handleError

      rs.pipe ws
      rs.on 'end', ->
        out.send data.destination
        do callback

3) Process

  fs = require 'fs'
  noflo = require 'noflo'

  exports.getComponent = ->
    c = new noflo.Component
      icon: 'copy'
      description: 'Copy a file'
      inPorts:
        source:
          datatype: 'string'
          description: 'source of the file to copy'
          required: true
        destination:
          datatype: 'string'
          description: 'where to copy the file to'
          required: true
      outPorts:
        out:
          datatype: 'string'
          description: 'contents of the file to send'
        error:
          datatype: 'object'
          description: 'if something goes wrong copying or writing the file'

    # WirePattern had forwardGroups firing definition, it sends every bracket received to every outport, process api you can manually define which brackets go where
    c.forwardBrackets =
      source: ['out', 'error']
      destination: ['out', 'error']

    # change from WirePattern to process
    # WirePattern had `async` firing definition, process api is async by default
    c.process (input, output) ->
      # WirePattern had the `in` firing input port definition, in process it gets called on any event so we just return unless it has packets for these
      return unless input.has 'source', 'destination'

      # instead of passing in the data we want in WirePattern, we get out the data we want
      rs = fs.createReadStream input.getData('source')
      ws = fs.createWriteStream input.getData('destination')

      # will send the data to sendDone which will send it out of `error` port because it is sent an Error object
      rs.on 'error', output.sendDone
      ws.on 'error', output.sendDone

      rs.pipe ws
      rs.on 'end', ->
        # in WirePattern you had to send the data and call `callback`
        # in process, there is a function to do both
        output.sendDone out: data.destination

Advanced Porting

1) Initial

  noflo = require "noflo"
  _ = require "underscore"
  { deepCopy } = require "owl-deepcopy"

  class FilterProperty extends noflo.Component

    icon: 'filter'

    description: "Filter out some properties by matching RegExps
    against the keys of incoming objects"

    constructor: ->
      @keys = []
      @recurse = false
      @keep = false

      @legacy = false
      # Legacy mode
      @accepts = []
      @regexps = []

      @inPorts = new noflo.InPorts
        in:
          datatype: 'object'
          description: 'Object to filter properties from'
        key:
          datatype: 'string'
          description: 'Keys to filter (one key per IP)'
        recurse:
          datatype: 'boolean'
          description: '"true" to recurse on the object\'s values'
        keep:
          datatype: 'boolean'
          description: '"true" if matching properties must be kept, otherwise removed'
        # Legacy mode
        accept:
          datatype: 'all'
        regexp:
          datatype: 'all'
      @outPorts = new noflo.OutPorts
        out:
          datatype: 'object'

      @inPorts.keep.on "data", (keep) =>
        @keep = String(keep) is "true"

      @inPorts.recurse.on "data", (data) =>
        @recurse = String(data) is "true"

      @inPorts.key.on "connect", =>
        @keys = []
      @inPorts.key.on "data", (key) =>
        @keys.push new RegExp key, "g"

      # Legacy mode
      @inPorts.accept.on "data", (data) =>
        @legacy = true
        @accepts.push data
      @inPorts.regexp.on "data", (data) =>
        @legacy = true
        @regexps.push data

      @inPorts.in.on "begingroup", (group) =>
        @outPorts.out.beginGroup group

      @inPorts.in.on "data", (data) =>
        # Legacy mode
        if @legacy
          @filterData data
        else
          if _.isObject data
            data = deepCopy data
            @filter data
            @outPorts.out.send data

      @inPorts.in.on "endgroup", (group) =>
        @outPorts.out.endGroup()

      @inPorts.in.on "disconnect", =>
        @outPorts.out.disconnect()

    filter: (object) ->
      return if _.isEmpty object

      for key, value of object
        isMatched = false

        for filter in @keys
          match = key.match filter
          if not @keep and match or
             @keep and not match
            delete object[key]
            isMatched = true
            break

        if not isMatched and _.isObject(value) and @recurse
          @filter value

    # Legacy mode
    filterData: (object) ->
      newData = {}
      match = false
      for property, value of object
        if @accepts.indexOf(property) isnt -1
          newData[property] = value
          match = true
          continue

        for expression in @regexps
          regexp = new RegExp expression
          if regexp.exec property
            newData[property] = value
            match = true

      return unless match
      @outPorts.out.send newData

  exports.getComponent = -> new FilterProperty

2) Basics

  # change to use single quotes
  noflo = require 'noflo'
  _ = require 'underscore'
  { deepCopy } = require 'owl-deepcopy'

  # change from class to function
  exports.getComponent ->
    c = new noflo.Component
    c.icon = 'filter'
    c.description: 'Filter out some properties by matching RegExps
    against the keys of incoming objects'

    constructor: ->
      c.keys = []
      c.recurse = false
      c.keep = false

      c.legacy = false
      # Legacy mode
      c.accepts = []
      c.regexps = []

      c.inPorts = new noflo.InPorts
        in:
          datatype: 'object'
          description: 'Object to filter properties from'
        key:
          datatype: 'string'
          description: 'Keys to filter (one key per IP)'
        recurse:
          datatype: 'boolean'
          description: '"true" to recurse on the object\'s values'
        keep:
          datatype: 'boolean'
          description: '"true" if matching properties must be kept, otherwise removed'
        # Legacy mode
        accept:
          datatype: 'all'
        regexp:
          datatype: 'all'
      c.outPorts = new noflo.OutPorts
        out:
          datatype: 'object'

      c.inPorts.keep.on "data", (keep) =>
        c.keep = String(keep) is "true"

      c.inPorts.recurse.on "data", (data) =>
        c.recurse = String(data) is "true"

      c.inPorts.key.on "connect", =>
        c.keys = []
      c.inPorts.key.on "data", (key) =>
        c.keys.push new RegExp key, "g"

      # Legacy mode
      c.inPorts.accept.on "data", (data) =>
        c.legacy = true
        c.accepts.push data
      c.inPorts.regexp.on "data", (data) =>
        c.legacy = true
        c.regexps.push data

      c.inPorts.in.on "begingroup", (group) =>
        c.outPorts.out.beginGroup group

      c.inPorts.in.on "data", (data) =>
        # Legacy mode
        if c.legacy
          c.filterData data
        else
          if _.isObject data
            data = deepCopy data
            c.filter data
            c.outPorts.out.send data

      c.inPorts.in.on "endgroup", (group) =>
        c.outPorts.out.endGroup()

      c.inPorts.in.on "disconnect", =>
        c.outPorts.out.disconnect()

    filter: (object) ->
      return if _.isEmpty object

      for key, value of object
        isMatched = false

        for filter in c.keys
          match = key.match filter
          if not c.keep and match or
             c.keep and not match
            delete object[key]
            isMatched = true
            break

        if not isMatched and _.isObject(value) and c.recurse
          c.filter value

    # Legacy mode
    filterData: (object) ->
      newData = {}
      match = false
      for property, value of object
        if c.accepts.indexOf(property) isnt -1
          newData[property] = value
          match = true
          continue

        for expression in c.regexps
          regexp = new RegExp expression
          if regexp.exec property
            newData[property] = value
            match = true

      return unless match
      c.outPorts.out.send newData

  exports.getComponent = -> new FilterProperty

3) Class to function

  # change to use single quotes
  noflo = require 'noflo'
  _ = require 'underscore'
  { deepCopy } = require 'owl-deepcopy'

  # change from class to function
  exports.getComponent ->
    c = new noflo.Component
    c.icon = 'filter'
    c.description: 'Filter out some properties by matching RegExps
    against the keys of incoming objects'

    # remove state

    constructor: ->
      c.keys = []
      c.recurse = false
      c.keep = false

      c.legacy = false
      # Legacy mode
      c.accepts = []
      c.regexps = []

      c.inPorts = new noflo.InPorts
        in:
          datatype: 'object'
          description: 'Object to filter properties from'
        key:
          datatype: 'string'
          description: 'Keys to filter (one key per IP)'
        recurse:
          datatype: 'boolean'
          description: '"true" to recurse on the object\'s values'
        keep:
          datatype: 'boolean'
          description: '"true" if matching properties must be kept, otherwise removed'
        # Legacy mode
        accept:
          datatype: 'all'
        regexp:
          datatype: 'all'
      c.outPorts = new noflo.OutPorts
        out:
          datatype: 'object'

      c.inPorts.keep.on "data", (keep) =>
        c.keep = String(keep) is "true"

      c.inPorts.recurse.on "data", (data) =>
        c.recurse = String(data) is "true"

      c.inPorts.key.on "connect", =>
        c.keys = []
      c.inPorts.key.on "data", (key) =>
        c.keys.push new RegExp key, "g"

      # Legacy mode
      c.inPorts.accept.on "data", (data) =>
        c.legacy = true
        c.accepts.push data
      c.inPorts.regexp.on "data", (data) =>
        c.legacy = true
        c.regexps.push data

      c.inPorts.in.on "begingroup", (group) =>
        c.outPorts.out.beginGroup group

      c.inPorts.in.on "data", (data) =>
        # Legacy mode
        if c.legacy
          c.filterData data
        else
          if _.isObject data
            data = deepCopy data
            c.filter data
            c.outPorts.out.send data

      c.inPorts.in.on "endgroup", (group) =>
        c.outPorts.out.endGroup()

      c.inPorts.in.on "disconnect", =>
        c.outPorts.out.disconnect()

    filter: (object) ->
      return if _.isEmpty object

      for key, value of object
        isMatched = false

        for filter in c.keys
          match = key.match filter
          if not c.keep and match or
             c.keep and not match
            delete object[key]
            isMatched = true
            break

        if not isMatched and _.isObject(value) and c.recurse
          c.filter value

    # Legacy mode
    filterData: (object) ->
      newData = {}
      match = false
      for property, value of object
        if c.accepts.indexOf(property) isnt -1
          newData[property] = value
          match = true
          continue

        for expression in c.regexps
          regexp = new RegExp expression
          if regexp.exec property
            newData[property] = value
            match = true

      return unless match
      c.outPorts.out.send newData

  exports.getComponent = -> new FilterProperty

4) Process

  # remove need for underscorejs
  noflo = require 'noflo'
  { deepCopy } = require 'owl-deepcopy'

  exports.getComponent = ->
    c = new noflo.Component
    c.icon = 'filter'
    c.description = 'Filter out some properties by matching RegExps
    against the keys of incoming objects'

    c.inPorts = new noflo.InPorts
      in:
        datatype: 'object'
        description: 'Object to filter properties from'
        required: true
      key:
        datatype: 'string'
        description: 'Keys to filter (one key per IP)'
        required: true
      recurse:
        datatype: 'boolean'
        description: '"true" to recurse on the object\'s values'
        control: true
        default: false
      keep:
        datatype: 'boolean'
        description: '"true" if matching properties must be kept, otherwise removed'
        control: true
        default: false
      # Legacy mode
      accept:
        datatype: 'all'
      regexp:
        datatype: 'all'
    c.outPorts = new noflo.OutPorts
      out:
        datatype: 'object'

    c.filter = (object, keys, recurse, keep, input) ->
      for key, value of object
        isMatched = false

        # the keys are filters we want to match in the object
        for filter in keys
          match = key.match filter

          # if they match, we delete them
          matchButDontKeep = not keep and match
          keepButDontMatch = keep and not match
          if matchButDontKeep or keepButDontMatch
            delete object[key]
            isMatched = true
            break

        if not isMatched and recurse and typeof value is 'object'
          c.filter value, keys, recurse, keep, input

    c.process (input, output) ->
      # because we only want to use non-brackets
      return input.buffer.get().pop() if input.ip.type isnt 'data'
      return unless input.has 'in', 'key'

      legacy = false
      if input.has('accept') or input.has('regexp')
        legacy = true
        accepts = input.get('accept').data
        regexp = input.get('regexp').data

      # because we can have multiple data packets,
      # we want to get them all, and use just the data
      keys = input.buffer
        .find 'key', (ip) -> ip.type is 'data' and ip.data?
        .map (ip) -> new RegExp ip.data, "g"

      data = input.getData 'in'
      recurse = input.getData 'recurse'
      keep = input.getData 'keep'

      if keep? and typeof keep is 'object'
        keep = keep.pop()

      unless legacy
        if typeof data is 'object'
          data = deepCopy data
          c.filter data, keys, recurse, keep, input
          output.sendDone data
      # Legacy mode
      else
        newData = {}
        match = false
        for property, value of data
          if accepts.indexOf(property) isnt -1
            newData[property] = value
            match = true
            continue

          for expression in regexp
            regex = new RegExp expression
            if regex.exec property
              newData[property] = value
              match = true

        return unless match
        output.sendDone newData
        input.buffer.set 'key', []