Logstash best Practices (my REX)

2025-08-30 by Mahfoud CHIKH AISSA

Logstash is a real-time data processing engine that uses pipelines that receive, transform, and send data to different destinations. For more details about Logstash and how it is used, you should check my previous article HERE.

In this one, we will talk about some tips and good practices to make your life easier when using the tool, because if you have a big project, things could quickly become messy and hard to manage.


Best practices using Logstash:


1- Creating plugins / local scripts


Consider that you have a snippet of code that you need to reuse across multiple pipelines, allowing you to follow the DRY principle and simplify modifications to your pipelines for any reason. In this case, you need either to create a Logstash plugin and host it on the local Logstash gems registry.(Doc)

Another easier method, especially if you need your script on one Logstash project and not across multiple Logstash projects, is to use Ruby scripts.

Let’s take an example:

Let’s imagine a Logstash server that receives logs from other servers using filebeat on port 5044, and it checks the status of the service that is running on those servers and sends it to Elasticsearch.

So, a basic example of such pipelines will be:

input {
beats {
port => 5044
ssl => true
ssl_certificate => "/path_to_your_cert/logstash.crt"
ssl_key => "/path_to_your_key/logstash.key"
}
}

filter {
if ![status] or [status] == "" {
drop { }
} else {
ruby {
code => '
valid_status_list = ["running", "stopped", "failed"]
unless valid_status_list.include?(event.get("status"))
event.tag("status_unknown")
end
'
}
}
}

output {
elasticsearch {
hosts => ["<https://your-es-server:9200>"]
index => "service-monitoriong-events"
user => "elastic"
password => "aStrongPassword"
ssl => true
cacert => "/path_to_ca/ca.crt"
}
}


Now, if we want to reuse the code in the filter part, we can create a Ruby script file check_status_name.rb:

# check_status_name.rb

def filter(event)
valid_status_list = ["running", "stopped", "failed"]

# Drop if empty or missing
if event.get("status").nil? || event.get("status").strip.empty?
event.cancel
else
# Add tag if not in valid list
unless valid_status_list.include?(event.get("status"))
event.tag("status_unknown")
end
end

return [event]
end

Note that we changed the script code to implement it just with Ruby code. For example, instead of the “drop” plugin, we can use event.cancel because, in Ruby plugin’s code, we should just manipulate the event object.


2- Separate pipeline conf file into multiple bricks

Another interesting approach is to break your pipeline into multiple files. Each file contains a part of the pipeline; for example, one file for input, N files for the filter, and a last one for the output.

To do this, you should name your files with the same order that you want Logstash to assemble them, and give Logstash the directory where your files are.

Example:

📁/etc/logstash/conf.d/pipeline-date-parser

├── 1_Input.conf

├── 21_Filter_null_events.conf

├── 22_Format_load_date.conf

└── 3_Save_into_file.conf


# 01_Input.conf

input {
http {
port => 10450
codec => json
}
}


# 1_Filter_null_events.conf

filter {

if ![message] or [message] == "" {
drop {}
}
}


# 2_Format_load_date.conf

filter {
date {
match => ["load_date", "yyyy-MM-dd HH:mm:ss"]
timezone => "Europe/Paris"
target => "parsed_load_date"
}
}


# 3_Save_into_file.conf

output {
file {
path => "/apps/nas/events-from-server-%{+YYYY-MM-dd}.log"
codec => json_lines
}
}

Lastly, we need to add the pipeline in the pipelines.yml file the following.

# pipelines.yml

- pipeline-id: "my-pipeline-example"
pipeline-path: "/etc/logstash/conf.d/pipeline-date-parser"

The importance of this is that we can easily reuse code by duplicating our shared files in multiple pipelines without copy-pasting code, and separate the standard parts of code from the custom parts by putting the custom code in a separate file and limiting modification of the standard blocks.

Let’s take the latest example and say that we want to add to send events to elasticsearch, so we need just to add a new file “4_save_to_elasticsearc.conf” and add our output in it and add it to the directory.

It should be useful also for complex pipelines to be separated to simplify their maintenance.


3- Separate pipeline logs

What I mean by logs are the technical logs of Logstash; when the application is running by default, it emits logs in a single file.

By default the path could be:

/var/log/logstash/logstash-plain.log or <LOGSTASH_HOME>/logs/logstash-plain.log depending on the version and where you installed it.

So we can change it with the parameter: in logstash.yml file:

# logstash.yml

pipeline.separate_logs: true

Or with the argument: --pipeline.separate_logs

For a pipelines.yml file like this:

# pipelines.yml

- pipeline.id: pipeline1
path.config: "/etc/logstash/conf.d/pipeline1.conf"

- pipeline.id: pipeline2
path.config: "/etc/logstash/conf.d/pipeline2.conf"

The log files will be like this:

/var/log/logstash/pipeline_pipeline1.log

/var/log/logstash/pipeline_pipeline2.log

So separating logs can be useful when you have many pipelines and you look for specific messages / errors of a single pipeline of them.

Another tip exploring logs is that in order to know if a pipeline has started or not, we can search for the message “Pipeline started” with the pipeline id in logs (log level: info).


4- Use a Filebeat agent to get Logstash logs in Elasticsearch

Looking for an error or a message in Logstash log files may be very fastidious, especially if you search on a big log file.

You can instead install a Filebeat agent on your Logstash server and parameter this agent to send logs to an Elasticsearch, so that you can benefit from the power of Elastic Dashboard (also known as Kibana) to make queries and dashboards.

In filebeat.yml file you need to specify the log file path:


# filebeat.yml

filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/logstash


For the output part:

# filebeat.yml

output.elasticsearch:
hosts: ["https://your-es-server:9200"]


5- Use Logstash Api to check the state of Logstash

Logstash API is another way to monitor your Logstash service and make sure everything is ok.

It exposes multiple endpoints such as:

The health report API (Health report API) can be used to get indicators about how Logstash pipeline running.

Also the stats API (Node stats API) gives more detailed information about the pipelines, the number of events received and sent, the errors on plugins,… etc


6- Using persistence queues

As a Logstash user, I need to think of the resiliency aspect of the service. Queues can be used for this need.

A Logstash persistence queue is a mechanism that holds data in the local file system temporarily if Logstash can’t deliver it and resumes sending this data once the issue is resolved. The issue can be on the filter or output side like a loss of connection to an Elasticsearch on the output.

So it can replace tools that are used for this same need like Apache Kafka.

To activate the persistence queue, you should update the settings:

In logtstash.yml:

# logtstash.yml

queue.type: persisted # Activate the persisted queue (memory by default)
path.queue: /var/lib/logstash/queue # Specify the path to the queue data
queue.max_bytes: 1gb # Maximum size of the queue
queue.checkpoint.acks: 1024 # Number of events before checkpoint
queue.checkpoint.writes: 1024 # Number of writes before checkpoint
queue.checkpoint.interval: 1000 # Milliseconds between checkpoints


Those were some of the tips you can think of when using Logstash, I hope you learned something new.


Don’t hesitate to check the other articles of my Blog ;)


Stay safe and Keep learning..

0