Skip to main content

Logstash 7 Working with Structured Data

Guangzhou, China

Working with CSV Data

Data Source see Github/Coralogix:

Create a file payment.csv with the following content in /opt/logstash:

1,2019-08-29T01:53:12Z,Amex,Yuki Calvaire,Female,,Industrial,Canada,55
2,2019-11-16T14:55:13Z,Mastercard,Misaki Zelretch,Male,,Clothing,Japan,32
3,2019-10-07T03:52:52Z,Amex,Michaella Gerrietz,Female,,Computers,Taiwan,32
4,2019-07-05T22:58:10Z,Mastercard,Renee Markov,Male,,Toys,Estonia,51
5,2019-06-26T08:53:59Z,Visa,Sion Fiori,Male,,Computers,Paraguay,25

Now create the /opt/logstash/pipeline/logstash.conf file:

input {
file {
path => "/usr/share/logstash/payment.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
filter {
csv {
separator => ","
skip_header => "true"
columns => ["id","timestamp","paymentType","name","gender","ip_address","purpose","country","age"]
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "payment-csv"

stdout {}


Now run the Logstash container:

chown -R 1000:1000 /opt/logstash
docker run \
--name logstash \
--net=host \
--rm -it \
-v /opt/logstash/pipeline/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
-v /opt/logstash/payment.csv:/usr/share/logstash/payment.csv \
-e "ELASTIC_HOST=localhost:9200" \

Logstash will now ingest the CSV file. Once it is done open Kibana and create an index pattern payment-csv and set the Time Field to timestamp:

Logstash 7 Working with Structured Data

Data Mutation

We can also use Logstash to modify the data before forwarding it to Elasticsearch:

input {
file {
path => "/usr/share/logstash/payment.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
filter {
csv {
separator => ","
skip_header => "true"
columns => ["id","timestamp","paymentType","name","gender","ip_address","purpose","country","age"]
mutate {
convert => {
age => "integer"
remove_field => ["message","@timestamp","path","host","@version"]
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "payment-csv-clean"

stdout {}


The Mutate Section will now convert the age field Type to Integer and remove meta fields that will be added automatically by Logstash in the CSV Section before.


"message" => "4,2019-07-05T22:58:10Z,Mastercard,Renee Markov,Male,,Toys,Poland,51",
"path" => "/usr/share/logstash/payment.csv",
"@version" => "1",
"paymentType" => "Mastercard",
"country" => "Poland",
"@timestamp" => 2021-07-31T11:54:37.530Z,
"host" => "debian11",
"id" => "4",
"purpose" => "Toys",
"timestamp" => "2019-07-05T22:58:10Z",
"age" => "51",
"ip_address" => "",
"name" => "Renee Markov",
"gender" => "Male"


"paymentType" => "Mastercard",
"gender" => "Male",
"purpose" => "Toys",
"country" => "Poland",
"age" => 51,
"id" => "4",
"ip_address" => "",
"timestamp" => "2019-07-05T22:58:10Z",
"name" => "Renee Markov"

To verify the mapping (age has to be type (long) integer) we can run the following command:

curl -XGET 'http://localhost:9200/payment-csv-clean/_mapping/field/age?pretty=true'

"payment-csv-clean" : {
"mappings" : {
"age" : {
"full_name" : "age",
"mapping" : {
"age" : {
"type" : "long"

Working with JSON Data

Now let's do the same with a JSON file - /opt/logstash/payment.json:

{"id":1,"timestamp":"2019-09-12T13:43:42Z","paymentType":"WeChat","name":"Jenna Starr","gender":"Female","ip_address":"","purpose":"Toys","country":"Canada","age":23}
{"id":2,"timestamp":"2019-08-11T17:55:56Z","paymentType":"WeChat","name":"Yuki Calvaire","gender":"Female","ip_address":"","purpose":"Shoes","country":"Japan","age":34}
{"id":3,"timestamp":"2019-07-14T04:48:25Z","paymentType":"AliPay","name":"Misaki Zelretch","gender":"Female","ip_address":"","purpose":"Sports","country":"Taiwan","age":53}
{"id":4,"timestamp":"2020-02-29T12:41:59Z","paymentType":"WeChat","name":"Renee Markov","gender":"Male","ip_address":"","purpose":"Home","country":"Estonia","age":47}
{"id":5,"timestamp":"2019-08-03T19:37:51Z","paymentType":"Mastercard","name":"Sion Fiori","gender":"Female","ip_address":"","purpose":"Health","country":"Paraguay","age":37}

Now create the /opt/logstash/pipeline/logstash.conf file:

input {
file {
path => "/usr/share/logstash/payment.json"
start_position => "beginning"
sincedb_path => "/dev/null"
filter {
json {
source => "message"
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "payment-json"

stdout {}


And run Logstash with the new configuration file:

docker run \
--name logstash \
--net=host \
--rm -it \
-v /opt/logstash/pipeline/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
-v /opt/logstash/payment.json:/usr/share/logstash/payment.json \
-e "ELASTIC_HOST=localhost:9200" \

From the Logstash STDOUT we can now see that the data was ingested:

"path" => "/usr/share/logstash/payment.json",
"@timestamp" => 2021-07-31T13:10:41.370Z,
"timestamp" => "2019-08-11T17:55:56Z",
"@version" => "1",
"paymentType" => "WeChat",
"gender" => "Female",
"age" => 34,
"country" => "Japan",
"ip_address" => "",
"id" => 2,
"purpose" => "Shoes",
"host" => "debian11",
"message" => "{\"id\":2,\"timestamp\":\"2019-08-11T17:55:56Z\",\"paymentType\":\"WeChat\",\"name\":\"Yuki Calvaire\",\"gender\":\"Female\",\"ip_address\":\"\",\"purpose\":\"Shoes\",\"country\":\"Japan\",\"age\":34}",

curl 'localhost:9200/payment-json/_search?pretty=true'

Data Mutation

Again we can drop meta fields that we don't need with a Mutation section and also add an IF Statement to exclude payment options we don't care about:

input {
file {
path => "/usr/share/logstash/payment.json"
start_position => "beginning"
sincedb_path => "/dev/null"
filter {
json {
source => "message"
if [paymentType] == "Mastercard" {
mutate {
remove_field => ["message", "@timestamp", "path", "host", "@version"]
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "payment-json-clean"

stdout {}


And this cleaned up our index nicely:

"paymentType" => "WeChat",
"country" => "Japan",
"gender" => "Female",
"age" => 34,
"purpose" => "Shoes",
"timestamp" => "2019-08-11T17:55:56Z",
"id" => 2,
"ip_address" => "",
"name" => "Yuki Calvaire"

Splitting Arrays

If your JSON Data contains arrays (see pastEvents):

{"id":1,"timestamp":"2019-06-19T23:04:47Z","paymentType":"Mastercard","name":"Ardis Shimuk","gender":"Female","ip_address":"","purpose":"Home","country":"France","pastEvents":[{"eventId":1,"transactionId":"trx14224"},{"eventId":2,"transactionId":"trx23424"}],"age":34}
{"id":2,"timestamp":"2019-11-26T15:40:56Z","paymentType":"Amex","name":"Benoit Urridge","gender":"Male","ip_address":"","purpose":"Shoes","country":"Brazil","pastEvents":[{"eventId":3,"transactionId":"63323-064"},{"eventId":4,"transactionId":"0378-3120"}],"age":51}
{"id":3,"timestamp":"2019-05-08T16:24:25Z","paymentType":"Visa","name":"Lindsy Ketchell","gender":"Female","ip_address":"","purpose":"Home","country":"Brazil","pastEvents":[{"eventId":5,"transactionId":"68151-3826"},{"eventId":6,"transactionId":"52125-611"}],"age":26}
{"id":4,"timestamp":"2019-06-10T18:01:32Z","paymentType":"Visa","name":"Cary Boyes","gender":"Male","ip_address":"","purpose":"Grocery","country":"Pakistan","pastEvents":[{"eventId":7,"transactionId":"63941-950"},{"eventId":8,"transactionId":"55926-0011"}],"age":46}
{"id":5,"timestamp":"2020-02-18T12:27:35Z","paymentType":"Visa","name":"Betteanne Diament","gender":"Female","ip_address":"","purpose":"Computers","country":"Brazil","pastEvents":[{"eventId":9,"transactionId":"76436-101"},{"eventId":10,"transactionId":"55154-3330"}],"age":41}

You can create the /opt/logstash/pipeline/logstash.conf file with an additional Split Section to expand the data structure:

input {
file {
#type => "json"
path => "/usr/share/logstash/payment-array.json"
start_position => "beginning"
sincedb_path => "/dev/null"
filter {
json {
source => "message"
split {
field => "[pastEvents]"
mutate {
add_field => {
"eventId" => "%{[pastEvents][eventId]}"
"transactionId" => "%{[pastEvents][transactionId]}"
remove_field => ["message", "@timestamp", "path", "host", "@version"]
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "payment-json-array"

stdout {}


And run Logstash with the new configuration file:

docker run \
--name logstash \
--net=host \
--rm -it \
-v /opt/logstash/pipeline/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
-v /opt/logstash/payment-array.json:/usr/share/logstash/payment-array.json \
-e "ELASTIC_HOST=localhost:9200" \

The result is, that every entry will be split in two:

"country" => "Brazil",
"purpose" => "Home",
"age" => 26,
"ip_address" => "",
"pastEvents" => {
"eventId" => 5,
"transactionId" => "68151-3826"
"gender" => "Female",
"timestamp" => "2019-05-08T16:24:25Z",
"name" => "Lindsy Ketchell",
"paymentType" => "Visa",
"id" => 3
"country" => "Brazil",
"purpose" => "Home",
"age" => 26,
"ip_address" => "",
"pastEvents" => {
"eventId" => 6,
"transactionId" => "52125-611"
"gender" => "Female",
"timestamp" => "2019-05-08T16:24:25Z",
"name" => "Lindsy Ketchell",
"paymentType" => "Visa",
"id" => 3

We can now also extract the eventId and transactionId - to get rid of the pastEvents key all together:

input {
file {
#type => "json"
path => "/usr/share/logstash/payment-array.json"
start_position => "beginning"
sincedb_path => "/dev/null"
filter {
json {
source => "message"
split {
field => "[pastEvents]"
mutate {
add_field => {
"eventId" => "%{[pastEvents][eventId]}"
"transactionId" => "%{[pastEvents][transactionId]}"
remove_field => ["message", "@timestamp", "path", "host", "@version"]
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "payment-json-array"

stdout {}


And the result is:

"ip_address" => "",
"timestamp" => "2019-05-08T16:24:25Z",
"purpose" => "Home",
"gender" => "Female",
"age" => 26,
"country" => "Brazil",
"id" => 3,
"pastEvents" => {
"transactionId" => "68151-3826",
"eventId" => 5
"transactionId" => "68151-3826",
"eventId" => "5",
"paymentType" => "Visa",
"name" => "Lindsy Ketchell"
"ip_address" => "",
"timestamp" => "2019-05-08T16:24:25Z",
"purpose" => "Home",
"gender" => "Female",
"age" => 26,
"country" => "Brazil",
"id" => 3,
"pastEvents" => {
"transactionId" => "52125-611",
"eventId" => 6
"transactionId" => "52125-611",
"eventId" => "6",
"paymentType" => "Visa",
"name" => "Lindsy Ketchell"

Logstash 7 Working with Structured Data