How to incorporate external utility scripts into Logstash Pipeline

Overview

Logstash is a great tool to process the logs and extract valuable data from them. There are many useful Logstash filter plugins which make it easy to process the raw log data. However, sometimes external utilities are required to process the data in a more complicated way than existing filter plugins can.

It's possible to code your own filter plugin in Ruby but what to do if you already have the filter implemented in some other programming language and want to reuse it in Logstash?

In this case it's easier to communicate with this external filter from Logstash. This article demonstrates the simplest way of incorporating external applications into the Logstash pipeline:

  1. Logstash launches external program and delivers the input data to it through command line arguments and stdin
  2. External program writes results to stdout in any format understood by Logstash filters (e.g., JSON)
  3. Logstash parses output of the external program and continues to handle it in the pipeline

It's needless to say that it is not the very best approach in terms of performance. E.g., if startup time of the external application is significant, you may consider to launch this application once (as a daemon/service) and communicate with it using ØMQ or other high-performance message queue.

Detailed explanation and usage example are stated below.

Launching external program

We will use ruby filter in order to launch external application and capture its output:

filter {
    # <...> <- More filters are above
    # Launching external script to make a deeper analysis
    if [file_path] =~ /.+/ {
       ruby {
        code => 'require "open3"
                 file_path = event.get("file_path")
                 cmd =  "/opt/bin/my_filter.py -f #{file_path}"
                 stdin, stdout, stderr = Open3.popen3(cmd)
                 event.set("process_result", stdout.read)
                 err = stderr.read
                 if err.to_s.empty?
                   filter_matched(event)
                 else
                   event.set("ext_script_err_msg", err)
                 end'
          remove_field => ["file_path"]
       }
    }
    # Parsing of the process_result is here (see the next section)
 }

Note:

  • External application /opt/bin/my_filter.py is launched only if file_path field is not empty. This field shall be extracted earlier in the filter pipeline. It's value (#{file_path}) is used in the command line to launch external filter.
  • stdin handle is accessible for our tiny ruby script and it can be used to send more data to the external program (/opt/bin/my_filter.py).
  • If application stderr is not empty, filter is not considered to be successful and stderr content is recorded into ext_script_err_msg field.
  • If processing was successful, output of the external program is recorded into process_result filed and file_path field is removed
  • This config has been tested with logstash 5.3.0.

Parsing output of the external program (JSON)

The easiest way to deliver the data back to Logstash is to use one of the structured data formats understood by Logstash filters: JSON, XML or more old-fashioned key-value (kv).

Example with JSON:

  if [process_result] =~ /.+/ {
       json {
          source => "process_result"
          remove_field => [ "process_result" ]
       }
    }

Note:

  • Field process_result holds the output of the external application and is supposed to be in JSON format.
  • If parsing was successful JSON fields are becoming event fields and intermediate field process_result is removed.

Several words about exec output plugin

If you only need to launch external utility upon any matched Logstash event, you may consider to use simpler approach – exec output plugin.

If you liked this post, you can share it with your followers or follow me on Twitter!

#elasticsearch #logstash #ruby