Pipes

What is a Pipe?

A Pipe is a self-contained data processing unit. Pipes are developed using the Pipes language, a custom DSL developed by Panoptix.

Pipes are associated with Targets - they can be run on a server, on distributed servers often referred to as collectors, or even in the cloud, practically anywhere. Currently they are either to run as Linux systemd or Docker services. This flexibility allows a distributed processing strategy, where functionality can be moved to the edge, or moved back to the centre without re-writing definitions.

A Pipe is divided into three discrete steps, namely Inputs, Actions and Outputs. Input data comes from inputs, goes through processing actions and finally gets written to an output.

The essential parts of a Pipe definition are:

  • name
  • input
  • actions (an array of actions)
  • output

Or as expressed in the pipes language:

name: uptime
input:
    exec:
        command: uptime
        interval: 10s
actions:
    - extract:
        input-field: _raw
        remove: true
        pattern: 'load average: (\S+), (\S+), (\S+)'
        output-fields: [m1, m5, m15]
output:
    write: console

In this example, input data comes from executing the Unix uptime command every 10 seconds; the load averages (1min, 5min, 15min) are extracted using regular expressions from the input line using the extract action. We then write the results out to the console.

So from a typical input '09:07:36 up 16 min, 1 user, load average: 0.08, 0.23, 0.31' we get a JSON record that looks like {"m1":"0.08","m5":"0.23","m15":"0.31"}.

Time series events normally have an associated time, and must be in an appropriate form to be processed and queried by data analytics systems. If our actions steps were:

- extract:
    input-field: _raw
    remove: true
    pattern: 'load average: (\S+), (\S+), (\S+)'
    output-fields: [m1, m5, m15]
- convert:
    - m1: num
    - m5: num
    - m15: num
- time:
    output-field: '@timestamp'

Then the result looks like this:

{"m1":0.08,"m5":0.23,"m15":0.31,"@timestamp":"2018-08-21T09:30:36.000Z"}

These records can then be indexed by Elasticsearch, and queried with a tools such as Kibana or Grafana.

So a pipe is "input -> action1 -> action2 -> ... -> output". Note that actions are optional... the only required top-level fields are name, input, and output.

Most action steps work with JSON data. Although real-world data usually arrives as raw text, the exec input will by default 'quote' it as JSON, like {"_raw":"output line"}. The various extraction steps work on this raw data and generate sensible event fields.

Here are the available inputs and outputs, with the possible processing steps documented here.

Adding Pipes to the Hotrod System

Assuming you are logged in, then:

$ hotrod targets add Joburg
target Joburg has id e94ccdca-f379-447b-8c90-6976e77652ec
$ hotrod targets add Durban
target Durban has id 8a1a0a29-d8f8-4098-a016-9d08f841f9a4
$ hotrod targets list
name   | id                                    | tags | pipes | last seen
-------+-------------------------------------- +------+-------+-----------
Joburg | e94ccdca-f379-447b-8c90-6976e77652ec  |      |       |
Durban | 8a1a0a29-d8f8-4098-a016-9d08f841f9a4  |      |       |

Unless you explicitly specify --id with targets add, new targets will be assigned a uuid unique identifier. Target names and ids must be unique.

A Pipe definition is a file NAME.yml, where NAME is the provided name of the Pipe. Pipe names must be unique, and the hotrod tool will also enforce that the name of the file matches. To enable versioning or annotation of a pipe definition an addition fullstop with some text is allowed in front of the extension, for example: NAME.testing.yml or NAME.0.1.yml.

A pipe is loaded with the pipes add subcommand:

$ hotrod pipes add --file uptime.yml
$ hotrod pipes list
 name
------
 uptime
$ hotrod pipes show uptime
name: uptime
input:
    exec:
        command: uptime
        interval: 2s
actions:
    - extract:
        input-field: _raw
        remove: true
        pattern: 'load average: (\S+), (\S+), (\S+)'
        output-fields: [m1, m5, m15]
    - convert:
        - m1: num
        - m5: num
        - m15: num
    - time:
        output-field: '@timestamp'
output:
    write: console

This Pipe is not yet associated with any target, so we update a particular target:

$ hotrod targets update Joburg --add-pipe uptime
$ hotrod targets list
name   | id                                    | tags | pipes  | last seen
-------+-------------------------------------- +------+--------+-----------
Joburg | e94ccdca-f379-447b-8c90-6976e77652ec  |      | uptime |
Durban | 8a1a0a29-d8f8-4098-a016-9d08f841f9a4  |      |        |

The Pipe is now staged for the "Joburg" target, and will be deployed in a short while using the configured Hotrod Agent.

If you do update Joburg --remove-pipe uptime then it will be removed from the staging area, and stopped on the Target by the Hotrod Agent.