Table of Contents
Why are Reports Important?
Imagine you’re holding an account or a loan in a bank but with no provision to get a statement. Wouldn’t that leave you clueless and scared? That is exactly what our customers would feel, if they don’t get the right report, especially when money is involved.
Not only are reports important, they also have to be accurate. Imagine that your customer paid ₹100.50 and you refund only ₹100. Do you think your customer would visit your site again? How many times have you visited a shop or petrol pump which consistently didn’t give you back a balance of ₹1?
Unlike other use cases such as performance reports, procurement reports or stock status reports, any report representing money has to be extremely precise. Not being accurate leads to disputes and tax problems. Imagine a million cases of 50 paise being unaccounted for. What a nightmare!
Traditional Solutions – Why and When they don’t Work?
Solution A: Use the same OLTP RDBMS
In this option, the same RDBMS (such as MySQL, PostgreSQL, Oracle etc.) that’s used for production OLTP (Online Transaction Processing) systems are used for generating reports.
This may not work for the following reasons:
- The nature of querying the data from the OLTP system and from the reporting system doesn’t have to be the same; so as with the indices. It’s costly to add indices that are needed for reports to OLTP RDBMS. Not adding them will slow down the report generation and sometimes makes it impossible. In the former case, it may also slow down OLTP queries.
- Most often reporting systems require data to be aggregated. This leaves us with two choices:
- Create those aggregations at runtime, which is costly and will impact the entire data store. Sometimes, it is not possible to generate this at runtime for huge data/ long time ranges.
- Create and store those aggregations which may impose difficulty in keeping them up to date as well as consume storage and CPU in OLTP RDBMSs.
Solution B: Replicate the data from OLTP RDBMS to another RDBMS
Here, the OLTP RDBMS is unaffected as the data is replicated to a slave RDBMS database. Problems (1) & (2) mentioned above are applicable here, except that they don’t impact the OLTP systems.
Additionally, keeping the slave database up to date with the primary database is a challenge.
Solution C: Replicate the date to NoSQL data store
This is the third most common solution you see. Here, the data is replicated to a NoSQL data store such as MongoDB, Cassandra or Elasticsearch/Solr.
This is also the most viable solution if employed efficiently. In a short while, we will see the techniques of employing it efficiently and the challenges in the absence of it.
The Challenges
In summary, the following challenges surface when the reporting datastore is poorly chosen or implemented:
- Slow & Fails for reports that require a high volume of data
- Inconsistent/missing data (if you choose solution B or C mentioned above). Data goes out of sync with primary data store
- Multiple services/teams need/generate reports
- Rigid dimensions which lead to bad customer experience (if you choose pre-computed aggregation)
- Reports for long time range leads to failures
- Addition of new/derived fields is not supported without significant changes in primary or secondary data store (if you choose pre-computed aggregation)
How Did We Solve it?
Design considerations
- Should be different from primary OLTP data store
- Should be able to define new indices
- Should be able to aggregate and store
- Should work for high volume data (both the search space and result set)
- Should have fast query time
- Should be resilient
Since not all ACID properties are not mandatory for this use case, we decided to go with NoSQL data store. We had two options: MongoDB and Elasticsearch.
Before we dive into what we selected, let’s see why we selected what we selected:
The Problem of Scale
Is 10,000 qps a scaling problem? It’s subjective. Let’s look at the examples below.
Example – 1
Having 1,000 nodes to handle 10,000 qps is not a scaling problem at all, as each node will have to handle only 10 requests per second
Example – 2
Having 10 nodes to handle 10,000 qps is a scaling problem as each node will have to handle 1,000 requests per second
The Problem of Search Space
In most of the search data stores/systems, the performance of search is dictated by the volume of data that you have, and NOT by the search space that you are interested in.
Confusing? Let’s take an example.
You’re looking for data that spans across two days in a year.
Is searching two days of data not simple? Well,it’s subjective in traditional systems.
Example – 1
Facts
- Each day has 10 Million records
- Searching 10 Million records take 1 minute
- Overall you have 20 Million records in your search system
So, this search will take approximately 2 minutes.
Example – 2
Facts
- Each day has 10 Million records
- Searching 10 Million records take 1 minute
- Overall you have 1 Trillion records in your search system
So, technically this search should not take more than 2 minutes. But in reality, the search will go on forever depending on your hardware capacity as you are searching 1 trillion records.

A moment ago what seemed like an insane option is now doable with a finite time. Isn’t that awesome?
Most of the time our data tells something unique that we don’t pay attention to.
In a fintech company, the date has an important role to play. Let’s look at some of the example reports:
- All the transactions that were received between two dates
- All the settlements that were made today, yesterday or on a specific date.
- All the refunds that were processed in a given date
- Sum of all the adjustments, transactions, settlements for a give day
In these examples, the data can be bucketed into days (apart from other dimensions). Since you know which bucket you are interested in, your search space is reduced drastically, as it is shown in the above example of the apples.
Let’s now explore our two options: mongodb and elasticsearch.
Both support sharding of the data. Elasticsearch lets you to shard (a.k.a indices) the data at any granular level you want, that too dynamically; many shards (indices) can reside in one node. But sharding mongodb dynamically isn’t practical and you can refer to this page for its limitations.
How Data is Published to Elasticsearch?
Let’s dive into the data ingestion part. We use maxwell, a CDC tool, to capture the change in data and publish it to Kafka. From Kafka, the data is consumed, transformed and published to elasticsearch by the application.
As highlighted at the beginning of this article, data dimensions for reports can be different from what an OLTP application stores/wants. Since we know the type of data and the family of the reports we need, we denormalized data to a certain extent before it is written to elasticsearch.
This has the following advantages and disadvantages:
Advantage:
- Reading the data is faster. No complex join required. In fact, impossible join in OLTP RDBMS can also be achieved here.
Disadvantage:
- Writing is a bit slow.
But that is inevitable; either the reading or the writing side should absorb the cost. In this use case, writing happens once and reading happens numerous times. In fact most of the data is immutable a few days after it is written and some of them are immutable immediately.
So we chose to incur the cost at write and improve the read speed.

To denormalize and further improve the write speed, we used infinispan, an in-memory distributed and replicated cache. We also use Apache ActiveMQ for parallel consumption of messages (we have some limitations on our Kafka implementation) and support retry capability.
Maxwell
There are two options to capture the data.
- Let the application publish the data.
- This approach works for targeted use cases where a small amount of data/event is published from the application. This also has reliability concerns and requires two phase commit
- But this won’t work for reporting use case as the entire datastore is needed for reports in most of the cases
- Replicate the DB using Change Data Capture tools such as debezium or maxwell
We went with Maxwell as it has simplified integration for MySQL and Kafka.
Kafka
Ten of millions of updates happen on DB on an hourly basis and Kafka is known to stream them well. Alternatives such as Apache artemis can also be used.
You can read more about the messaging systems here.
- https://softwaremill.com/mqperf/#activemq-artemis
- https://medium.com/double-pointer/kafka-vs-activemq-vs-rabbitmq-vs-amazon-sns-vs-amazon-sqs-vs-google-pub-sub-4b57976438db
Note: we are not referring to Apache activemq classic when we say Apache Artemis
Infinispan Cache
Most often, when you try to normalize tables from RDBMS, you need to look up the record in the parent table when you want to append the child record in the parent. This is a costly operation; imagine 10 million such lookups in 15 minutes. This will kill the elastic search.
Hence we wanted to add a caching layer. Among the choices we had, we wanted something that is in-process, distributed and replicated to maximize the performance of lookups. So we went with infinispan.
ActiveMQ
The common issues that you encounter with Kafka are:
- The parallelism of the consumer is controlled by the number of partitions that you have on a topic. Each partition on each topic adds load on kafka. Hence we couldn’t go with many partitions for all the topics we had.
- The problem of granularity of batch size. Kafka is known for batch consumption unlike traditional message brokers such as Artemis, ActiveMQ or RabbitMQ which enables per message acknowledgment. The problem with batch is either you acknowledge the entire batch or reprocess them all. In the denormalization scheme of things this is not suitable. If you NACK the entire batch, you may end up reprocessing them multiple times, leading to the death of elasticsearch.
For the above-mentioned reasons, we employed ActiveMQ classic (though we aspired for ActiveMQ artemis) in the following scenarios:
- For any reasonable size topic that we couldn’t partition in kafka, we let one of the application nodes consume the messages from that topic and publish to ActiveMQ; so that all other nodes (now, number of consumer nodes can be changed dynamically) can now process the message concurrently.
- For any topic that is consumed directly by the application and sent to Elasticsearch, if any message from the batch fails, we acknowledge the entire batch but publish the failed messages to ActiveMQ, which has its own redelivery policy defined.
JGroups
We also used jgroups to elect master among the application nodes to carry out certain tasks and to help in point (A) of ActiveMQ:

Partitioning data
Each table has a different temporal level, i.e., when to create a new index. For example we will be getting millions of orders, so it is good to roll the order index at day level whereas payments made are in thousands per day. Hence we can roll the payment index once a month.
After a certain amount of days/months some of these indices can be made immutable and moved to a warm node.

Conclusion
We have reached the end of the story. Here is the outcome of the solution we discussed so far, which could have easily (a) timed out or (b) taken a few hours to generate similar reports in a similar system.

Do the solutions that we are building for the fintech space sound exciting to you? Well then, there are some great opportunities in store for intrepid engineers just like you!
