Ad Hoc Analytics with MongoDB's Aggregation Framework

Fair warning: this post is really long, has a lot of code, and I tend to ramble a lot. If you're looking for the graphs, they're at the bottom. :)

Background

While working on some recent projects, I had the need to run some basic dashboard analytics against moderate volumes of machine generated data. Already having some experience with MongoDB (and being quite the fan of it), I decided to do some research on real-time analytics with MongoDB.

A quick search turns up dozens of articles and presentations on how this can be achieved. However, after reading through quite a few of them, it became clear that most of the existing how-tos on the subject are based on pre-Aggregation Framework techniques, relying largely on MongoDB's atomic upsert, $inc, and $set operations.

These techniques are still largely useful, and are powering several successful applications. Unfortunately, however, they tend to be lacking when it comes to the ad hoc side of things - specifically, once multiple values from distinct events have been aggregated into a single value, the ability to slice and dice the results becomes limited. Additionally, these techniques typically require pre-aggregating at multiple levels to support pre-determined aggregation durations, rely on MapReduce, or delegate some re-reducing labor to the application itself.

Given that we now have the Aggregation Framework available to us (since MongoDB 2.1), I decided to run some tests to see how feasible it is to achieve real-time, interactive, ad hoc, dashboard analytics with MongoDB.

Note that this article is intended to be platform agnostic, so all tests are implemented as MongoDB shell scripts. If you're not familiar with scripting the shell, you can read about that here.

Defining our Terms

Several of the terms I used above seem to have wildly varying definitions, so let me take a moment to define how they're being used within the scope of this article:

Real-time
Analytics must be up-to-date; results should be accurate for the data as it is now, not for what the data looked like 2 hours ago.
Interactive
Queries should respond with "web latency". If you're tempted to go browse Hacker News while you wait for a response, we're too slow.
Ad hoc
In general, we should be able to support the above two requirements without one-off indexes or other performance measures that require a priori knowledge of the queries themselves.
Dashboard analytics
This term really has no standard definition, but it should be understood that we're talking about relatively superficial analytics here: mins, maxes, averages, counts, etc. This isn't the be-all and end-all solution to BI.

Test Data

I initially reached for the ubiquitous "server request log" test data, but as I began drafting this post, hurricane Sandy hit and I thought it would be interesting to work with event data that contains some of the metrics I imagine an offshore buoy might send back. Here's what I came up with:

{
    ts:                 // time of the event
    buoy:               // string buoy identifier
    airTempF:           // air temperature
    waterTempF:         // water temperature
    atmosPressureHPA:   // atmospheric pressure
    windSpeedMPH:       // wind speed
}

To see how things behave over a reasonable volume of events, we're going to create 1 million events that represent a week's worth of data from 25 distinct buoys. Since we'll be comparing 3 different schemata, I've created a file that will act as our "test runner" and "event generator" across all 3:

// ./runner.js

function TestRunner(test) {
    this.index = test.index;
    this.insert = test.insert;
    this.queries = test.queries;
    this.db = connect('localhost:27017/dashboard-schema-test');
}

TestRunner.prototype.run = function() {
    print('Dropping database...');
    this.db.dropDatabase();
    print('... done');

    print('Creating index...');
    this.db.events.ensureIndex(this.index);
    print('... done');

    print('Inserting events...');
    this.time(true);
    var generator = new EventGenerator('11/01/2012', '11/08/2012', 1000000),
        count = 0, event;
    while(event = generator.getNext()) {
        this.insert(this.db, event);
        ++count % 100000 || print(count + ' events created')
    }
    print('...done (' + this.time() + ')');

    for (var key in this.queries) {
        print('Running "' + key + '" query...');

        var pipeline = this.queries[key];

        // clear the cobwebs
        for (var i = 5; i--;) {
            this.db.events.aggregate(pipeline);
        }

        // timed queries
        var numQueries = 100;
        this.time(true);
        for (var i = numQueries; i--;) {
            this.db.events.aggregate(pipeline);
        }
        var duration = this.time();
        print('... done (' + duration + ')');
        print((duration/numQueries).toFixed(4) + ' per query');

        // uncomment to see results:
        // var res = this.db.events.aggregate(pipeline);
        // printjson(res);
    }

    print('Database stats:');
    printjson(this.db.events.stats());
};

TestRunner.prototype.time = function(reset) {
    if (reset) return this._ts = Date.now();
    return Date.now() - this._ts;
};

function EventGenerator(start, end, num) {
    this.startTime = new Date(start).getTime();
    this.endTime = new Date(end).getTime();
    this.numEvents = num;
    this.count = 0;
}

EventGenerator.prototype.getNext = function() {
    if (this.count >= this.numEvents) return null;

    var duration = this.endTime - this.startTime,
        percentageComplete = this.count / this.numEvents;

    var ret = {
        ts: new Date(this.startTime + duration * percentageComplete),
        buoy: 'buoy-' + Math.ceil(Math.random() * 25),
        airTempF: 50 + Math.random() * 40,
        waterTempF: 40 + Math.random() * 30,
        atmosPressureHPA: 1000 + Math.random() * 20,
        windSpeedMPH: 5 + Math.random() * 50
    };
    ++this.count;
    return ret;
};

We'll simply load this file at the top of our individual test cases, which will allow the actual test files to focus on defining the indexes, insert statements, and queries. Also note that this data is in no way realistic - values are purely random and can jump from one extreme to the other. They're just here to give us something to crunch over.

Three MongoDB Real-time Analytics Schemata

There are undoubtedly more ways to crack this nut, but I settled on three different schemata to compare. Each one is evaluated with respect to their insert speed, memory requirements, and performance against 4 different queries.

Document Per Event

This first schema is the simplest of the three. For each distinct event that occurs, we'll create a corresponding document. This schema provides our baseline, naive implementation that no significant amount of thought has gone into. Here's what an individual document looks like when printed from the shell:

> db.events.find().limit(1).pretty()
{
    "_id" : ObjectId("50a0578f8c549debecb9e691"),
    "ts" : ISODate("2012-11-01T04:00:00Z"),
    "buoy" : "buoy-11",
    "airTempF" : 80.89462532567612,
    "waterTempF" : 43.71565700140737,
    "atmosPressureHPA" : 1004.3505328172837,
    "windSpeedMPH" : 50.95050802701652
}

Here's what our actual test file looks like for this schema:

// ./doc-per-event.js

load('./runner.js');

var test = new TestRunner({
    index: { buoy: 1, ts: 1 },
    insert: function(db, event) {
        db.events.insert(event);
    },
    queries: {
        'buoy-10 1 hour averages': [
            // filter by buoy and time range
            {
                $match: {
                    buoy: 'buoy-10',
                    ts: {
                        $gte: new Date('11/01/2012 12:00:00'),
                        $lt: new Date('11/01/2012 13:00:00')
                    }
                }
            },
            // calculate our averages
            {
                $group: {
                    _id: 'averages',
                    avgAirTempF: { $avg: '$airTempF' },
                    avgWaterTempF: { $avg: '$waterTempF' },
                    avgAtmosPressureHPA: { $avg: '$atmosPressureHPA' },
                    avgWindSpeedMPHA: { $avg: '$windSpeedMPH' }
                }
            }
        ],
        'buoy-7 1 day hourly averages': [
            // filter by buoy and time range
            {
                $match: {
                    buoy: 'buoy-7',
                    ts: {
                        $gte: new Date('11/01/2012'),
                        $lt: new Date('11/02/2012')
                    }
                }
            },
            // calculate our averages per hour
            {
                $group: {
                    _id: { $hour: '$ts' },
                    avgAirTempF: { $avg: '$airTempF' },
                    avgWaterTempF: { $avg: '$waterTempF' },
                    avgAtmosPressureHPA: { $avg: '$atmosPressureHPA' },
                    avgWindSpeedMPHA: { $avg: '$windSpeedMPH' }
                }
            }
        ],
        'Average wind speeds grouped by buoy': [
            // filter by buoy and time range
            {
                $match: {
                    ts: {
                        $gte: new Date('11/01/2012'),
                        $lt: new Date('11/02/2012')
                    }
                }
            },
            // calculate average wind speed per buoy
            {
                $group: {
                    _id: '$buoy',
                    avgWindSpeedMPH: { $avg: '$windSpeedMPH' }
                }
            }
        ],
        'All buoys average water temp when air temp > 85': [
            // filter by time range and air temperature
            {
                $match: {
                    ts: {
                        $gte: new Date('11/01/2012'),
                        $lt: new Date('11/02/2012')
                    },
                    airTempF: { $gt: 85 }
                }
            },
            // calculate our average water temperature
            {
                $group: {
                    _id: 'averages',
                    avgWaterTempF: { $avg: '$waterTempF' }
                }
            }
        ]
    }
});

test.run();

As a quick reminder, MongoDB shell scripts are invoked like this: $ mongo ./document-per-event.js

Here's what our raw numbers look like for this approach:

Number of Events:1000000
Number of Documents:1000000
Data Size:133.50 MB
Storage Size:166.60 MB
Total Index Size:94.39 MB
Inserts Per Second:16706.76
Average Query 1 Time:2.158 msec
Average Query 2 Time:48.17 msec
Average Query 3 Time:965.14 msec
Average Query 4 Time:490.93 msec

(Comparison graphs are at the bottom of the article.)

Document Per Buoy Per Hour

This second approach is an attempt at seeing if the Aggregation Framework can unwind and aggregate sub documents faster than it can operate against multiple root level documents (as we tried above).

To test this, we'll create a single root level document per buoy per hour, with all associated events being pushed onto an array. Here's what an example document will look like if you print it from the shell:

> db.events.find().limit(1).pretty()
{
    "_id" : ObjectId("50a13aa5a5b45b0249117b0f"),
    "buoy" : "buoy-4",
    "events" : [
        {
            "ts" : ISODate("2012-11-01T04:00:09.734Z"),
            "buoy" : "buoy-4",
            "airTempF" : 52.9108469568815,
            "waterTempF" : 57.63903529300773,
            "atmosPressureHPA" : 1006.2795177300849,
            "windSpeedMPH" : 22.919841655371517
        },
        /* truncated - actual array had 236 events */
    ],
    "ts" : ISODate("2012-11-01T04:00:00Z")
}

Here's what our actual test file looks like for this schema:

// ./doc-per-buoy-per-hour.js

load('./runner.js');

var BUCKET_SIZE = 60 * 60 * 1000;

var test = new TestRunner({
    index: { buoy: 1, ts: 1 },
    insert: function(db, event) {
        var bucketTime = event.ts - (event.ts % BUCKET_SIZE);
        this.db.events.update({
                buoy: event.buoy,
                ts: new Date(bucketTime)
            },
            {
                $push: {
                    events: event
                }
            },
            true
        );
    },
    queries: {
        'buoy-10 1 hour averages': [
            // filter by buoy and time range
            {
                $match: {
                    buoy: 'buoy-10',
                    ts: new Date('11/01/2012 12:00:00')
                }
            },
            // unwind array of events into distinct documents
            {
                $unwind: '$events'
            },
            // calculate our averages
            {
                $group: {
                    _id: 'averages',
                    avgAirTempF: { $avg: '$events.airTempF' },
                    avgWaterTempF: { $avg: '$events.waterTempF' },
                    avgAtmosPressureHPA: { $avg: '$events.atmosPressureHPA' },
                    avgWindSpeedMPH: { $avg: '$events.windSpeedMPH' }
                }
            }
        ],
        'buoy-7 1 day hourly averages': [
            // filter by buoy and time range
            {
                $match: {
                    buoy: 'buoy-7',
                    ts: {
                        $gte: new Date('11/01/2012'),
                        $lt: new Date('11/02/2012')
                    }
                }
            },
            // unwind array of events into distinct documents
            {
                $unwind: '$events'
            },
            // calculate our averages per hour
            {
                $group: {
                    _id: { $hour: '$ts' },
                    avgAirTempF: { $avg: '$events.airTempF' },
                    avgWaterTempF: { $avg: '$events.waterTempF' },
                    avgAtmosPressureHPA: { $avg: '$events.atmosPressureHPA' },
                    avgWindSpeedMPH: { $avg: '$events.windSpeedMPH' }
                }
            }
        ],

        'Average wind speeds grouped by buoy': [
            // filter by time range
            {
                $match: {
                    ts: {
                        $gte: new Date('11/01/2012'),
                        $lt: new Date('11/02/2012')
                    }
                }
            },
            // unwind array of events into distinct documents
            {
                $unwind: '$events'
            },
            // calculate average wind speed per buoy
            {
                $group: {
                    _id: '$buoy',
                    avgWindSpeedMPH: { $avg: '$events.windSpeedMPH' }
                }
            }
        ],
        'Average water temp when air temp > 85': [
            // filter by time range
            {
                $match: {
                    ts: {
                        $gte: new Date('11/01/2012'),
                        $lt: new Date('11/02/2012')
                    }
                }
            },
            // unwind array of events into distinct documents
            {
                $unwind: '$events'
            },
            // filter by air temperature
            {
                $match: {
                    'events.airTempF': { $gt: 85 }
                }
            },
            // calculate our average water temperature
            {
                $group: {
                    _id: 'averages',
                    avgWaterTempF: { $avg: '$events.waterTempF' }
                }
            }
        ]
    }
});

test.run();

Here's what our raw numbers look like for this approach:

Number of Events:1000000
Number of Documents:4225
Data Size:122.79 MB
Storage Size:600.45 MB
Total Index Size:0.44 MB
Inserts Per Second:5224.17
Average Query 1 Time:1.60 msec
Average Query 2 Time:29.53 msec
Average Query 3 Time:592.16 msec
Average Query 4 Time:793.90 msec

(Comparison graphs are at the bottom of the article.)

Hybrid: Increment-On-Write / Aggregation Framework

This last approach is an attempt at combining the already popular upsert/increment-on-write technique with the awesome power of the Aggregation Framework. Granted, this approach suffers from the same flaw I criticized at the start of this post, namely there is a loss of resolution. This approach does hold some appeal when it comes to high volume feeds, though. Specifically, it allows us to set a "maximum resolution" for our data. In the example below, we'll pre-aggregate on a minute-by-minute basis, so it won't matter whether we're consuming 5 events per minute or 5000; we essentially get a configurable slider with low latency and storage requirements on the one side, and increased granularity on the other. All we have to do is find the sweet spot for the task at hand.

With that said, the only improvement this examples makes to the standard formula is to rely on the Aggregation Framework to perform in-database analytics over our pre-aggregated documents, rather than using MapReduce or in-application logic.

To test this approach, we'll create a single root level document per buoy per minute, which will store our sum totals of each metric for that period. We'll also keep count of how many events are represented by each document, allowing us to provide counts and averages. Here's what an example document will look like if you print it from the shell:

> db.events.find().limit(1).pretty()
{
    "_id" : ObjectId("50a19cf1a5b45b0249119a8e"),
    "airTempFSum" : 285.50538671695347,
    "atmosPressureHPASum" : 4038.3247183726226,
    "buoy" : "buoy-1",
    "count" : 4,
    "ts" : ISODate("2012-11-01T04:00:00Z"),
    "waterTempFSum" : 240.47156178830446,
    "windSpeedMPHSum" : 151.12045503418437
}

Notice that at the rate of events we're generating, we only have, on average, 4 events per minute per buoy. Even so, as we'll see below in the comparisons, we already start to notice some nice performance gains. These performance margins only improve as we increase the rate of events relative to the duration that we pre-aggregate them at.

Here's what our actual test file looks like for our final schema:

// ./increment-on-write.js

load('./runner.js');

var BUCKET_SIZE = 60 * 1000;

var test = new TestRunner({
    index: { buoy: 1, ts: 1 },
    insert: function(db, event) {
        var bucketTime = event.ts - (event.ts % BUCKET_SIZE);
        db.events.update({
                buoy: event.buoy,
                ts: new Date(bucketTime)
            },
            {
                $inc: {
                    count: 1,
                    'airTempFSum': event.airTempF,
                    'waterTempFSum': event.waterTempF,
                    'atmosPressureHPASum': event.atmosPressureHPA,
                    'windSpeedMPHSum': event.windSpeedMPH
                }
            },
            true
        );
    },
    queries: {
        'buoy-10 1 hour averages': [
            // filter by buoy and time range
            {
                $match: {
                    buoy: 'buoy-10',
                    ts: {
                        $gte: new Date('11/01/2012 12:00:00'),
                        $lt: new Date('11/01/2012 13:00:00')
                    }
                }
            },
            // calculate our "sum of sums"
            {
                $group: {
                    _id: 'sums',
                    airTempFSum: { $sum: '$airTempFSum' },
                    waterTempFSum: { $sum: '$waterTempFSum' },
                    atmosPressureHPASum: { $sum: '$atmosPressureHPASum' },
                    windSpeedMPHSum: { $sum: '$windSpeedMPHSum' },
                    count: { $sum: '$count' }
                }
            },
            // calculate our averages
            {
                $project: {
                    avgAirTempF: { $divide: ['$airTempFSum', '$count'] },
                    avgWaterTempF: { $divide: ['$waterTempFSum', '$count'] },
                    avgAtmosPressureHPA: { $divide: ['$atmosPressureHPASum', '$count'] },
                    avgWindSpeedMPH: { $divide: ['$windSpeedMPHSum', '$count'] }
                }
            }
        ],
        'buoy-7 1 day hourly averages': [
            // filter by buoy and time range
            {
                $match: {
                    buoy: 'buoy-7',
                    ts: {
                        $gte: new Date('11/01/2012'),
                        $lt: new Date('11/02/2012')
                    }
                }
            },
            // calculate our "sum of sums"
            {
                $group: {
                    _id: 'sums',
                    airTempFSum: { $sum: '$airTempFSum' },
                    waterTempFSum: { $sum: '$waterTempFSum' },
                    atmosPressureHPASum: { $sum: '$atmosPressureHPASum' },
                    windSpeedMPHSum: { $sum: '$windSpeedMPHSum' },
                    count: { $sum: '$count' }
                }
            },
            // calculate our averages
            {
                $project: {
                    avgAirTempF: { $divide: ['$airTempFSum', '$count'] },
                    avgWaterTempF: { $divide: ['$waterTempFSum', '$count'] },
                    avgAtmosPressureHPA: { $divide: ['$atmosPressureHPASum', '$count'] },
                    avgWindSpeedMPH: { $divide: ['$windSpeedMPHSum', '$count'] }
                }
            }
        ],
        'Average wind speeds grouped by buoy': [
            // filter by time range
            {
                $match: {
                    ts: {
                        $gte: new Date('11/01/2012'),
                        $lt: new Date('11/02/2012')
                    }
                }
            },
            // calculate our "sum of sums" per buoy
            {
                $group: {
                    _id: '$buoy',
                    windSpeedMPHSum: { $sum: '$windSpeedMPHSum' },
                    count: { $sum: '$count' }
                }
            },
            // calculate our averages
            {
                $project: {
                    avgWindSpeedMPH: { $divide: ['$windSpeedMPHSum', '$count'] },
                }
            }
        ],
        'Average water temp when air temp > 85': [
            // filter by time range
            {
                $match: {
                    ts: {
                        $gte: new Date('11/01/2012'),
                        $lt: new Date('11/02/2012')
                    }
                }
            },
            // calculate our average air temperature for each bucket
            {
                $project: {
                    avgAirTempF: { $divide: ['$airTempFSum', '$count'] },
                    waterTempFSum: '$waterTempFSum',
                    count: '$count'
                }
            },
            // filter by air temperature
            {
                $match: {
                    avgAirTempF: { $gt: 85 }
                }
            },
            // calculate our "sum of sums"
            {
                $group: {
                    _id: 'sums',
                    waterTempFSum: { $sum: '$waterTempFSum' },
                    count: { $sum: '$count' }
                }
            },
            // calculate our average water temperature
            {
                $project: {
                    avgWaterTempF: { $divide: [ '$waterTempFSum', '$count' ] }
                }
            }
        ]
    }
});

test.run();

Here's what our raw numbers look like for this approach:

Number of Events:1000000
Number of Documents:248964
Data Size:39.55 MB
Storage Size:55.73 MB
Total Index Size:23.47 MB
Inserts Per Second:11447.80
Average Query 1 Time:0.98 msec
Average Query 2 Time:11.25 msec
Average Query 3 Time:316.71 msec
Average Query 4 Time:427.89 msec

Results

Well, it took a long time getting here (you did read everything, right?). Let's see what we got:

You can see the raw data from these tests here.

Observations

Wherein I hastily slap some explanations on top of these results. Any additional insight or corrections in the comments would be very welcome.

First of all, you'll probably notice that several optimizations are being left on the table with respect to the queries being run. This is actually intentional, as the purpose of this article is to demonstrate performance against ad hoc queries, where pre-optimizing is not an option and not every query will be fine tuned in a development environment.

Secondly, overall, MongoDB is holding up pretty well here. It would be interesting to compare these against, e.g., MySQL, but as a very non-scientific opinion based on past experience, MongoDB has become quite competitive in this space.

Thirdly, as MongoDB's aggregation capabilities mature, developers are being given a lot of options with regards to how a challenge will be approached. As always, unique approaches come with their own strengths and weaknesses:

  • In the first 3 queries, our second schemata handedly defeats our first. The primary difference between these query pipelines is that the first relies solely on scanning our index, whereas the second relies more heavily on unwinding sub documents. It appears that in these simple cases, then, embedding sub documents that can later be unwinded is generally worthwhile.
  • The exception to the above is query 4. In the last query, our second schema is over 60% slower than our first schema. What changed here is that we introduced a $match expression after the $unwind. Even though the field we're matching against isn't being indexed, MongoDB seems to be able to perform this operation much more efficiently against root level documents.
  • In all 4 queries, as expected, our our third schema benefits from the pre-aggregation, which significantly reduces the number of documents to re-aggregate.
  • We can also see trade-offs in other areas as well, such as insert speed and memory requirements. While all 3 schemata performed nicely on insert speeds (our worst approach is still over 5.2k inserts a second), we had a clear winner here with our first schema achieving over 16.7k inserts per second (and that's with a compound index!).

Lastly, just to try and wrap this thing up, the Aggregation Framework, despite being quite flexible, is extremely verbose. Personally, I'm a fan of the pipeline model - the query semantics were immediately clear to me, whereas SQL took much longer to grok. I have to wonder, though, if the true strength of this pipeline model will be found in higher level UIs targeting business users. Given that the query semantics are indeed so simple, the impedance mismatch from database to front-end can be effectively dispensed with here, which is quite promising.

Well, I have to end this somewhere, so I better just do it now. If you've made it all the way to the end, thanks for reading - you're a far better person than I am. We've covered quite a bit of code and comparisons, and I'm quite sure I've excluded or screwed up some obvious takeaways, so please have at it in the comments!

comments powered by Disqus