Transactions and Transitions

The pipe model involves a stream of inputs coming from a command or some other source. We sometimes need to separate it into groups.

Using exec-batch

Consider a pipe that monitors disk usage with df:

scratch$ df -x tmpfs
Filesystem     1K-blocks      Used Available Use% Mounted on
udev             8120300         0   8120300   0% /dev
/dev/sda2      122030736 102279252  13509644  89% /
/dev/sda1         523248      6228    517020   2% /boot/efi
/dev/sdb1      960380628 366946504 544579664  41% /home/steve/hd

This will run occasionally and extract the 'Mounted', 'Used' and 'Available' fields. exec batch here marks the first and last field specially:

name: batch
input:
  exec:
    command: df -x tmpfs
    interval: 1m
    batch:
      begin-marker-field: begin
      end-marker-field: end
output:
    write: console
# .....
{"_raw":"Filesystem     1K-blocks      Used Available Use% Mounted on","begin":true}
{"_raw":"udev             8120300         0   8120300   0% /dev"}
{"_raw":"/dev/sda2      122030736 102287196  13501700  89% /"}
{"_raw":"/dev/sda1         523248      6228    517020   2% /boot/efi"}
{"_raw":"/dev/sdb1      960380628 367768744 543757424  41% /home/steve/hd","end":true}

expand can extract these fields with a delim: ' ' (which matches any whitespace characters) and can be told to ignore the header line with header: true. But that will only work the first time - how to drop every column name row?

name: df-cols
input:
  exec:
    command: df -x tmpfs
    batch:
      begin-marker-field: begin
    interval: 2s
actions:
- expand:
    input-field: _raw
    delim: ' '
    remove: true
    begin-marker-field: begin    
    csv:
      header: true
      relaxed-schema: true
output:
    write: console
# .....
{"Filesystem":"udev","1K-blocks":8120300,"Used":0,"Available":8120300,"Use%":"0%","Mounted":"/dev"}
{"Filesystem":"/dev/sda2","1K-blocks":122030736,"Used":102288448,"Available":13500448,"Use%":"89%","Mounted":"/"}
{"Filesystem":"/dev/sda1","1K-blocks":523248,"Used":6228,"Available":517020,"Use%":"2%","Mounted":"/boot/efi"}
{"Filesystem":"/dev/sdb1","1K-blocks":960380628,"Used":367768868,"Available":543757300,"Use%":"41%","Mounted":"/home/steve/hd"}

expand begin-marker-field will ensure that the column line will be passed over. (relaxed-schema prevents the extra column "on" being a problem.)

You can now drop/rename any fields.

Detecting Stalls And Grouping by Field

Sometimes data is not scheduled as with exec or http-poll. With the tcp input, data can arrive at any time. It is useful to know when and if the data stops flowing.

name: stalled1
input:
  tcp:
    address: localhost:2020
    plain: true
actions:
- stalled:
    timeout: 2s
    marker: [STALL]
output:
    write: console

If there is more than 2s between events, then another event is created that would look like {"_marker":"STALL","streaming":"no"}. When the data starts flowing again, we get {"_marker":"stalled","streaming":"yes"}.

Now, imagine that various sources are connecting and writing events to port 2020. Each of the sources identifies itself with a particular field value. The task is to detect when any particular source stops sending data. In this example, the field is b and there are two sources, "one" and "two", sending data every 100ms:

    {"a":1,"b":"one"}
    {"a":2,"b":"one"}
    {"a":3,"b":"one"}
    {"a":4,"b":"one"}
    {"a":5,"b":"two"}
    {"a":6,"b":"two"}
    {"a":7,"b":"two"}
    {"a":8,"b":"one"}
    {"a":9,"b":"one"}
    {"a":10,"b":"one"}
    {"a":11,"b":"one"}
####
    - stalled:
        timeout: 200ms
        marker: [stalled]
        group-by: b
####
    {"_marker":"stalled","streaming":"yes","stalled":"one"}
    {"a":1,"b":"one"}
    {"a":2,"b":"one"}
    {"a":3,"b":"one"}
    {"a":4,"b":"one"}
    {"_marker":"stalled","streaming":"yes","stalled":"two"}
    {"a":5,"b":"two"}
    {"a":6,"b":"two"}
    {"_marker":"stalled","streaming":"no","stalled":"one"}
    {"a":7,"b":"two"}
    {"_marker":"stalled","streaming":"yes","stalled":"one"}
    {"a":8,"b":"one"}
    {"a":9,"b":"one"}
    {"a":10,"b":"one"}
    {"_marker":"stalled","streaming":"no","stalled":"two"}
    {"a":11,"b":"one"}

The two sources are treated as separate groups - "one" is flowing away, but "two" was late.

Collecting Groups Together with Transaction

transaction collects similar records together. Using the output of stalled above we collect sequences of events into arrays.

    - transaction:
        timeout: [stalled]
        marker: [TRANS]
        group-by: b
    - remove: [duration,complete]
####
    {"_marker":"TRANS","recs":[
      {"a":1,"b":"one"},
      {"a":2,"b":"one"},
      {"a":3,"b":"one"},
      {"a":4,"b":"one"}
    ]}
    {"_marker":"TRANS","recs":[
      {"a":5,"b":"two"},
      {"a":6,"b":"two"},
      {"a":7,"b":"two"}
    ]}
    {"_marker":"TRANS","recs":[
      {"a":8,"b":"one"},
      {"a":9,"b":"one"},
      {"a":10,"b":"one"},
      {"a":11,"b":"one"}
    ]}

Note that transaction consumes the stall events!

The transaction model works well with sessions. We have a log like this:

  {"action":"LOGIN","user":"Bob"}
  {"action":"SEND","user":"Bob"}
  {"action":"LOGIN","user":"Alice"}
  {"action":"RECV","user":"Bob"}
  {"action":"SEND","user":"Alice"}
  {"action":"SEND-AGAIN","user":"Alice"}
  {"action":"SEND-MORE","user":"Alice"}
  {"action":"LOGOFF","user":"Alice"}

Users log in, perform actions, and then logout - this is a session. It starts when the field action matches "LOGIN", and ends when it matches "LOGOFF". start-end defines a pair of fields with a field name and a pattern that needs to match.

A real-world complication is that users often forget to logout, so we need a timeout as well.

        timeout: 200ms
        marker: [stalled]
        group-by: user
    - transaction:
        timeout: [stalled]
        marker: [TRANS]
        start-end:
            start: [action, LOGIN]
            end: [action, LOGOFF]        
        group-by: user
    - remove: [duration]
####
  {"_marker":"TRANS","complete":false,"recs":[
    {"action":"LOGIN","user":"Bob"},
    {"action":"SEND","user":"Bob"},
    {"action":"RECV","user":"Bob"}
  ]}
  {"_marker":"TRANS","complete":true,"recs":[
    {"action":"LOGIN","user":"Alice"},
    {"action":"SEND","user":"Alice"},
    {"action":"SEND-AGAIN","user":"Alice"},
    {"action":"SEND-MORE","user":"Alice"},
    {"action":"LOGOFF","user":"Alice"}
  ]}

User Bob has forgotten to logout properly and timed out, so the session has `"complete":false".