This is a multi-part series on using filebeat to ingest data into Elasticsearch. In Part 1, we have successfully installed ElasticSearch 5.X (alias to es5) and Filebeat; then we started our first experiment on ingesting a stocks data file (in csv format) using Filebeat. In Part 2, we will ingest the data file(s) and pump out the data to es5; we will also create our first ingest pipeline on es5.
sources for this blog is available at:
Output to Elastic
Last time we have configured filebeat to output to console; now the first thing we need to do is to modify the output to es5 instead.
Navigate to the “output:” section, then uncomment the “elasticsearch:” row if necessary. The hosts configures an array of available es5 instances, by default it is pointing to your localhost at 9200 port. If you have multiple es5 instances supplied, filebeat would output to them in a round robin fashion, you can view it as a kind of load balancing feature. The index: value determines which es5 index the data would be pumped to, in this case the “blog-test-yyyy.mm.dd” index.
Now let’s run our filebeat again!
Let’s check the results in Kibana by opening the url http://localhost:5601/app/console
you can see that there is 254 entries ingested from the alphabet.csv, great!
perfect? some caveats…
At the first sight, everything seems fine, we have 254 entries ingested which matches the no. of rows in the csv file. However, if you take a closer look at the _source and message fields, you will find that we didn’t break down the “message” into individual fields; which means the data is not in a good shape for query or analytics.
If you still remember the difference between filebeat and logstash is that filebeat is a shipper in which it only ships data from the source to the destination, nothing is processed on the data itself so the data receiver is guaranteed to get the untamed data. So, it means that we need to have another mechanism to break down the data (message field) into individual fields which is ready for search and analytics.
ingest node and pipelines
Starting from 5.0, every node is by default enabled to be an ingest node, the main feature of this node type is to pre-process documents before the actual indexing takes place. If you want to disable this feature, open the elasticsearch.yml and configure “node.ingest: false“.
Now, let’s try to create a simple pipeline just to do a “lowercase” for a field named “hobbies”.
the name of our testing pipeline is test-lowercase-pipeline, under the processors array we only declared a lowercase processor and this processor would perform a lowercasing for the field named hobbies.
Testing time! We will use the “_simulate” API to test our pipeline.
Voilà~ the values in the hobbies field is lowercased!
piping for the 1st time
So far, we have an idea on how pipelines and processors work together, now it is time for our first pipeline building for our stocks documents. I would use the grok processor to break down the “message” field. Also the date processor to convert the first groked field into a date data type.
Grok is a regular expression expert, it would sense out the regex patterns for us and break the matched parts into field(s). In our stocks documents, the “message” field has the following patterns:
We will extract the “date” field first. Create the pipeline named blog-test-stocks-pipeline:
you can see that the grok pattern is:
DATA => .*? in regex, simply means get any characters if any
GREEDYDATA => .* in regex
we are trying to match any character within the double-quotes “ and place the characters into a field named stock_date and put the rest into another field named “remainings”. The results run against a simulation would be like this:
cool~ we’ve made it again! Our first experience in creating a partial pipeline to ingest our stocks documents! I will leave you to solve the rest of the puzzle for the meantime. In Part 3, we will have see how to handle multiple sources (e.g. we are ingesting the alphabet.csv and ibm.csv at the same time), stay tuned~
End of part 2