Pipes
What is a Pipe?
A Pipe is a self-contained data processing unit. Pipes are developed using the Pipes language, a custom DSL developed by Panoptix.
Pipes are associated with Targets - they can be run on a server, on distributed servers often referred to as collectors, or even in the cloud, practically anywhere. Currently they are either to run as Linux systemd or Docker services. This flexibility allows a distributed processing strategy, where functionality can be moved to the edge, or moved back to the centre without re-writing definitions.
A Pipe is divided into three discrete steps, namely Inputs, Actions and Outputs. Input data comes from inputs, goes through processing actions and finally gets written to an output.
The essential parts of a Pipe definition are:
- name
- input
- actions (an array of actions)
- output
Or as expressed in the pipes
language:
name: uptime
input:
exec:
command: uptime
interval: 10s
actions:
- extract:
input-field: _raw
remove: true
pattern: 'load average: (\S+), (\S+), (\S+)'
output-fields: [m1, m5, m15]
output:
write: console
In this example, input data comes from executing the Unix uptime
command every 10 seconds; the load averages (1min, 5min, 15min) are extracted using regular expressions from the input line using the extract
action. We then write the results out to the console.
So from a typical input '09:07:36 up 16 min, 1 user, load average: 0.08, 0.23, 0.31'
we get a JSON record that looks like {"m1":"0.08","m5":"0.23","m15":"0.31"}
.
Time series events normally have an associated time, and must be in an appropriate form to be processed and queried by data analytics systems. If our actions
steps were:
- extract:
input-field: _raw
remove: true
pattern: 'load average: (\S+), (\S+), (\S+)'
output-fields: [m1, m5, m15]
- convert:
- m1: num
- m5: num
- m15: num
- time:
output-field: '@timestamp'
Then the result looks like this:
{"m1":0.08,"m5":0.23,"m15":0.31,"@timestamp":"2018-08-21T09:30:36.000Z"}
These records can then be indexed by Elasticsearch, and queried with a tools such as Kibana or Grafana.
So a pipe is "input -> action1 -> action2 -> ... -> output". Note that actions are optional... the only required top-level fields are name, input, and output.
Most action steps work with JSON data. Although real-world data usually arrives as raw
text, the exec
input will by default 'quote' it as JSON, like {"_raw":"output line"}
.
The various extraction steps work on this raw data and generate sensible event fields.
Here are the available inputs and outputs, with the possible processing steps documented here.
Adding Pipes to the Hotrod System
Assuming you are logged in, then:
$ hotrod targets add Joburg
target Joburg has id e94ccdca-f379-447b-8c90-6976e77652ec
$ hotrod targets add Durban
target Durban has id 8a1a0a29-d8f8-4098-a016-9d08f841f9a4
$ hotrod targets list
name | id | tags | pipes | last seen
-------+-------------------------------------- +------+-------+-----------
Joburg | e94ccdca-f379-447b-8c90-6976e77652ec | | |
Durban | 8a1a0a29-d8f8-4098-a016-9d08f841f9a4 | | |
Unless you explicitly specify --id
with targets add
, new targets will be assigned
a uuid
unique identifier. Target names and ids must be unique.
A Pipe definition is a file NAME.yml
, where NAME
is the provided name of the Pipe.
Pipe names must be unique, and the hotrod
tool will also enforce that the name of the file
matches. To enable versioning or annotation of a pipe definition an addition fullstop with some
text is allowed in front of the extension, for example: NAME.testing.yml
or NAME.0.1.yml
.
A pipe is loaded with the pipes add
subcommand:
$ hotrod pipes add --file uptime.yml
$ hotrod pipes list
name
------
uptime
$ hotrod pipes show uptime
name: uptime
input:
exec:
command: uptime
interval: 2s
actions:
- extract:
input-field: _raw
remove: true
pattern: 'load average: (\S+), (\S+), (\S+)'
output-fields: [m1, m5, m15]
- convert:
- m1: num
- m5: num
- m15: num
- time:
output-field: '@timestamp'
output:
write: console
This Pipe is not yet associated with any target, so we update a particular target:
$ hotrod targets update Joburg --add-pipe uptime
$ hotrod targets list
name | id | tags | pipes | last seen
-------+-------------------------------------- +------+--------+-----------
Joburg | e94ccdca-f379-447b-8c90-6976e77652ec | | uptime |
Durban | 8a1a0a29-d8f8-4098-a016-9d08f841f9a4 | | |
The Pipe is now staged for the "Joburg" target, and will be deployed in a short while using the configured Hotrod Agent.
If you do update Joburg --remove-pipe uptime
then it will be removed from the staging
area, and stopped on the Target by the Hotrod Agent.