Introduction to Logstash Pipelines

2025-06-09 by Mahfoud CHIKH AISSA

What is Logstash?

Let’s check Elastic official definition,

Logstash is an open source data collection engine with real-time pipelining capabilities. Logstash can dynamically unify data from disparate sources and normalize the data into destinations of your choice. Cleanse and democratize all your data for diverse advanced downstream analytics and visualization use cases.

source: Loghstash Doc


Based on this definition let’s say:

Logstash is a tool that help us ingest data from different sources*, then make transformations to it for example to structure them in one model or enrich them with other data from a referential, …) and finally send them to an output source, mostly storage system (like Elasticsearch or a database).

So it’s all about pipelines! 😃


A Logstash pipeline can be defined with a config file that contains at most three (3) parts, input, filter and output, each of those parts contains plugins to do the operations.


📜pipeline.conf

input { ... }

filter { ... }

output { ... }


Example of a simple .conf file of a pipeline:


input {
file {
path => "/tmp/logs.json"
start_position => "beginning"
codec => "json"
}
}

filter {

date {
match => ["@timestamp", "ISO8601"]
target => "@timestamp"
}
mutate {
add_field => { "ingested_at" => "%{@timestamp}" }
}

}

output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "tmp-logs"
}
}


This pipeline reads a json file located in /tmp/logs.json adds a field ingested_at that contains the ingest time and finally sends it to an Elasticsearch (to simplify let's say it running on the same machine).


So Logstash uses the plugins to do all the work, from ingesting data to transforming and aggregation to the sending it to other pipelines or another systems/applications,

Following some examples of plugins:


Input Plugins:

  1. file: Reads logs from files (e.g. /var/log/syslog, application logs).
  2. beats: Listens for data from Beats agents like Filebeat, Metricbeat.
  3. tcp and udp:
    Exposes and listens on a specific tcp/udp port
  4. jdbc:
    Scheduled queries that reads data from relational database (like postgresql, Oracle DB…)


Filter Plugins:

  1. grok:
    Parses unstructured text (like application’s logs) into structured fields using regular expression patterns
  2. date:
    Parses and formats Timestamp string to Date objects
  3. mutate:
    Manipulates fields (renaming, adding, deletion, type conversion… etc)
  4. json:
    Parses data strings just like grok but specifically from json strings


Output Plugins:

  1. kafka:
    Sends data to kafka topics in a kafka cluster
  2. elasticsearch
    Ingest data to elasticsearch for indexation and storage
  3. http:
    Sends events to a remote HTTP endpoint via POST requests


A More advanced example of pipelines:

Let’s imagine a pipeline that gets an application’s logs from a server (using filebeat and the beats plugin).

For more about Filebeat: FileBeats Doc



In filter part, we will use the grok plugin to parse the lines of the log files and create typed fields based on the format of the initial data,

we have also a date plugin that transformes the data to another timezone (ex: Paris timezone)

last but not least, we have an output plugin that is kafka, that sends data to a kafka cluster to be sent to an elasticsearch for example for storage.


input {
beats {
port => 5044
}
}

filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{GREEDYDATA:log_message}"
}
}

date {
match => ["timestamp", "ISO8601"]
timezone => "Europe/Paris"
target => "@timestamp"
}

mutate {
remove_field => ["timestamp"]
}
}

output {
kafka {
bootstrap_servers => "https://kafka_host_name:9092"
topic_id => "app-logs"
codec => json
}

stdout {
codec => rubydebug
}
}

the @timestamp is one of the metadata info* added automatically to all events by logstash and it contains the ingest time (the time when the event is processed by the pipeline) and in our example we changed its value and the timezone based on the timestamp got from the event and then we deleted the timestamp field because it’s a duplication.


* Logstash metadata


Let’s say you want an advanced and custom transformation and you can’t find any plugin in the +200 built-in ones, so in this case you can use a magical plugin that is called “ruby”.

ruby is a scripting programming language similar to python, and ruby plugin in Logstash pipeline allows us add our proper scripts to make custom transformations.

Example:

let’s take our latest example, imagine that when we have an error a that log message will just contains the error code and we want to create a new field “error_msg” that will contains a message depends on the error code, and to do this transformation we will call for the ruby plugin:

filter {
ruby {
code => "
http_errors = {
400 => 'Bad Request',
401 => 'Unauthorized',
403 => 'Forbidden',
404 => 'Not Found',
500 => 'Internal Server Error'
}

code = event.get('error_code')
if code && http_errors.key?(code.to_i)
event.set('error_msg', http_errors[code.to_i])
else
event.set('error_msg', 'Unknown Error')
end
"
}
}

So a final event will be something like this:

{
"timestamp": "2025-05-25 15:32:02.014",
"level": "ERROR",
"pid": "12345",
"thread": "http-nio-8080-exec-5",
"logger": "org.springframework.web.servlet.PageNotFound",
"message": "Request failed with status 404 for /api/unknown",
"code_err": 404
}


But before we apply or filter we need to do a slight change to the grok pattern to get our error code field from the message:

grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{INT:error_code} %{GREEDYDATA:log_text}"
}
}

And the final Pipeline code will be:


📜my-pipeline.conf

input {
beats {
port => 5044
}
}

filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{INT:error_code} %{GREEDYDATA:log_text}"
}
}

date {
match => ["timestamp", "ISO8601"]
timezone => "Europe/Paris"
target => "@timestamp"
}

mutate {
remove_field => ["timestamp"]
}
ruby {
code => "
http_errors = {
400 => 'Bad Request',
401 => 'Unauthorized',
403 => 'Forbidden',
404 => 'Not Found',
500 => 'Internal Server Error'
}

code = event.get('error_code')
if code && http_errors.key?(code.to_i)
event.set('error_msg', http_errors[code.to_i])
else
event.set('error_msg', 'Unknown Error')
end
"
}
}

output {
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092"
topic_id => "app-logs"
codec => json
}

stdout {
codec => rubydebug
}
}


Our pipeline is ready to be started, to do so, we need either to pass its location path as argument to a preinstalled Logstash agent like this:

logstash -f /path/to/my-pipeline.conf

Or if we need to launch it along with other pipelines we can add the config file on the pipelines.yml file and then run Logstash without specifying the file.

For more: https://www.elastic.co/docs/reference/logstash/multiple-pipelines


This was a brief presentation of Logstash and its pipelines and we can see all the possibilities that we have using this tool.


See you with more advanced REX of using Logstash, until then, stay safe and keep learning 😉


Ciao!

0