Nowadays Elasticsearch is more and more popular. Besides it original search functionalities, I found Elasticsearch can be
- used as a logging container. That is what the ELK stack is created for.
- utilized as a JSON server with richful APIs, which can be combined with its Kibana as BI servers.
That is the data store I see everyday
- 10PB stocking data
- average 30TB incoming data everyday
- various data sources including binary files such PDF
- including very complicated SQL queries (fortunately no stored procedures)
- millions of JSON creations daily
People want to know what is going on with such data. So a business intelligence or an OLAP system is needed to visualize/aggregate the data and its flow. Since Elasticsearch is so easy to scale out, it beats other solutions for big data on the market.
1. Batch Worker
There are many options to implement a batch worker. Finally the decision falls to either Spring Data Batch or writing a library from the scratch in Python.
1.1 Spring Data Batch v.s. Python
Spring Data Batch
- Pros:
- is a full framework that includes rollback, notification and scheduler features
- provides great design pattern for Dependence Injection, such as factory and singleton, which help multiple persons work together. For example –
@Bean
public Step IndexMySQLJob01() {
return stepBuilderFactory.get("IndexMySQLJob01")
.<Data, Data> chunk(10)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
Python
- Pros:
- less codes; flexible
- super easy from dictionary to JSON
- has official/3rd party libraries for everything
- Cons:
- you create your own library/framework
- if the pattern like Spring Data Batch is deployed, has to inject dependencies mannually, such as –
class IndexMySQLJob01(object):
def __init__(self, reader, processor, writer, listener):
self.reader = reader
self.processor = processor
self.writer = writer
self.listener = listener
...
Eventually Python is picked, because the overall scenario is more algorithm-bound instead of language-bound.
1.2 The algorithms
Since the data size is pretty big, time and space are always considered. The direct way to decrease the time complexity is using the hash tables, as long as the memory can hold the data. For example, a join between an N rows table and an M rows table can be optimized from O(M*N) to O(M).
To save the space, a generator chain is used to stream data from the start to the end, instead of materializing sizable objects.
class JsonTask01(object):
...
def get_json(self, generator1, hashtable1):
for each_dict in generator1:
key = each_dict.get('key')
each_dict.update(hashtable1.get(key))
yield each_dict
1.3 The scheduler
A scheduler is a must: cron
is enough for simple tasking, while a bigger system requires a work flow. Airflow is the one that helps organize and schedule. It has a web UI and is written in Python, which is easy to be integrated with the batch worker.
1.4 High availability with zero downtime
Indexing of large quantity of data will impose signifciant impact. For mission-critical indexes that need 100% up time, the zero down algorithm is implemented and we keep two copies of an index for maximum safety. The alias will switch between the two copies once the indexing is finished.
def add_alias(self, idx):
LOGGER.warn("The alias {} will point to {}.".format(self.index, idx))
self.es.indices.put_alias(idx, self.index)
def delete_alias(self, idx):
LOGGER.warn("The alias {} will be removed from {}.".format(self.index, idx))
self.es.indices.delete_alias(idx, self.index)
2. Elasticsearch Cluster
2.1 Three kinds of nodes
An Elasticsearch node can choose one of three roles: master node, data node and ingest node(previously called client node). It is commonly seen to dedicate a node as master and ingest and data all together. For a large system, it is always helpful to assign the three roles to different machines/VMs. Therefore, once a node is down/up, it will be quicker to failover or recover.
elasticsearch-head can clearly visualize the data transfer procoess of the shards once an accident occurs.
With the increased number of cluster nodes, the deployment becomes painful. I feel the best tool so far is ansible-elasticsearch. With ansible-playbook -i hosts ./your-playbook.yml -c paramiko
, the cluster is on the fly.
2.2 Memory is cheap but heap is expensive
The rules of thumb for Elasticsearch are –
Give (less than) Half Your Memory to Lucene
Don’t Cross 32 GB!
The result causes an awkward situation: if you have a machine that has more than 64GB memory, then the additional memory will mean nothing to Elasticsearch. Actually it is meaningful to run two or more Elasticsearch instances side by side to save the hardware. For example, there is a machine with 96GB memory. We can allocate 31GB for an ingest node, 31 GB for a data node and the rest for the OS. However, two data nodes in a single machine will compete for the disk IO that damages the performance, while a master node and a data node will increase the risk of downtime.
The great thing for Elasticsearch is that it provides richful REST APIs, such as http://localhost:9200/_nodes/stats?pretty. We could use Xpack(paid) or other customized tools to monitor them. I feel that the three most important statistics for the heap and therefore the performance are –
The heap usage and the old GC duration
The two statistics intertwined together. The high heap usage, such as 75%, will lead to a GC, while GC with high heap usage will take longer time. We have to keep both numbers as low as possible.
"jvm" : {
"mem" : {
"heap_used_percent" : 89,
...
"gc" : {
"collectors" : {
...
"old" : {
"collection_count" : 225835,
"collection_time_in_millis" : 22624857
}
}
}
Thread pools
There are three kinds of thread pools: active, queue, and reject. It is useful to visualize the real time change. Once there are a lot of queued threads or rejected threads, it is good time to think about scale up or scale out.
"thread_pool" : {
"bulk" : {
"threads" : 4,
"queue" : 0,
"active" : 0,
"rejected" : 0,
"largest" : 4,
"completed" : 53680
},
The segments’ number/size
The segments are the in-memory inverted indexes correponding to the indexes on the hard disk, which are persistent in the physical memory and GC will have no effect on them. The segment will have the footage on every search thread. The size of the segments are important because they will be multiplied by a factor of the number of threads.
"segments" : {
"count" : 215,
"memory_in_bytes" : 15084680,
},
The number of shards actually controls the number of the segments. The shards increase, then the size of the segments decreases and the number of the segments increases. So we cannot increase the number of shards as many as we want. If there are many small segments, the heap usage will turn much higher. The solution is Force merge, which is time-consuming but effecitve.
3. Kibana as BI dashboard
3.1 Plugins
Kibana integreated DevTools(previously The Sense UI) for free. DevTools has code assistance and is a powerful tool for debugging. If the budget is not an issue, Xpack is also highly recommended. As for Elasticsearch, since 5.0, ingest-geoip
is now a plugin. We will have to write it to Ansible YAML such as –
es_plugins:
- plugin: ingest-geoip
3.2 Instant Aggregation
There are quite a few KPIs that need system-wide term aggregations. From 5.0 the request cache will be enabled by default for all requests with size:0
.
For example –
POST /big_data_index/data/_search
{ "size": 0,
"query": {
"bool": {
"must_not": {
"exists": {
"field": "interesting_field"
}
}
}
}
}
The Fore merge
as mentioned above, such asPOST /_forcemerge?max_num_segments=1
, will combine the segments and dramatically increase the aggregation speed.
3.3 Proxy
Nginx is possibly the best proxy as the frontend toward Kibana. There are two advantages: first the proxy can cache the static resources of Kibana; second we can always check the Nginx logs to figure out what causes problem for Kibana.
Conclusion
Elasticsearch and Kibana together provide high availability and high scalability for large BI system.
if you have any comment, please email me wm@sasanalysis.com
Read more →