Interested in working with us? We are hiring!

See open positions

D is for Data Science

Andrew Pascoe Written by Andrew Pascoe, November 17, 2014

The D programming language has quickly become our language of choice on the Data Science team for any task that requires efficiency, and is now the keystone language for our critical infrastructure. Why? Because D has a lot to offer.


A Brief Introduction

One of the clearest advantages of using D compared to other typical data science workflows is that it compiles down into machine code. Without an interpreter or virtual machine layer, we can rip through data significantly faster than other tools like a Java hadoop framework, R, or python would allow. But D’s compiler is fast enough that in many cases it can be run as if it were a scripting language. Let’s try comparing to python by generating a million uniform random variates, sorting, and finding the deciles:

from random import uniform

variates = [uniform(0.0, 1.0) for i in range(1000000)]
variates.sort()
for i in range(0, 1000000, 100000):
    print variates[i]
%> time python deciles.py > some_deciles

real	0m0.857s
user	0m0.825s
sys	0m0.032s

And similarly for D:

import std.random;
import std.stdio;

void main(string[] args) {
    double[] variates = new double[1_000_000];
    foreach(ulong i, double _; variates) {
        variates[i] = uniform(0.0, 1.0);
    }
    variates.sort;
    foreach(int i; 0..10) {
        writeln(variates[i * 100_000]);
    }
}
%> time rdmd deciles.d > some_deciles

real	0m0.929s
user	0m0.725s
sys	0m0.177s

Wait… what? It took longer, but that’s because I just ran it the once and it includes the compilation time. If I don’t change anything and run it again:

%> time rdmd deciles.d > some_deciles

real	0m0.293s
user	0m0.291s
sys	0m0.004s

That’s better. rdmd won’t bother to recompile if there’s no code change. These savings can add up quite significantly once your code becomes more computationally complex and you need to perform many computations over a substantial amount of data.

But of course there’s no real revelation here. If we want hyper-efficient code, we know that it’s best to drop into a compiled language. The key thing here that separates D from other efficient languages like the oft-suggested C or C++ is that D frees you to program in the style you feel most comfortable with at the given time.

The code above shows how we can write a quick “script” in D without incurring any additional headache over the simpler python. But D is also just as clean if you want to start writing some object-oriented code:

import std.random;
import std.range;
import std.stdio;

class Rectangle {
    double width;
    double height;

    this(double width, double height) {
        this.width = width;
        this.height = height;
    }

    double area() {
        return width * height;
    }
}

void main(string[] args) {
    Rectangle[] rs = new Rectangle[1_000_000];
    rs = rs.map!(x => new Rectangle(uniform(0.0, 1.0),
                                    uniform(0.0, 1.0))).array;
    rs.sort!((x, y) => x.width < y.width);
    foreach(int i; 0..10) {
        writefln("%0.4f\t%0.4f\t%0.4f",
                 rs[i * 100_000].width,
                 rs[i * 100_000].height,
                 rs[i * 100_000].area());
    }
}
%> rdmd rectangle_deciles.d
0.0000 0.9382	0.0000
0.1000 0.2020	0.0202
0.1996 0.3612	0.0721
0.2995 0.3947	0.1182
0.3994 0.7440	0.2972
0.4993 0.8733	0.4360
0.5997 0.0221	0.0132
0.6997 0.6624	0.4634
0.7997 0.6204	0.4961
0.9003 0.4640	0.4177

And D is as ready as you are for your high-performance needs. For example, if we want to calculate a fast inverse square root, we can use some pointer voodoo (very lightly modified from the linked Wikipedia article):

import std.conv;
import std.stdio;

void main(string[] args) {
    //computes 1/sqrt(x)
    auto x = to!float(args[1]);
    int i;
    float x2, y;
    const float threehalves = 1.5f;

    x2 = x * 0.5f;
    y = x;
    i = *cast(int*)&y;
    i = 0x5f3759df - ( i >> 1 );
    y = *cast(float*)&i;
    y = y * (threehalves - (x2 * y * y));

    writeln(y);
}
%> rdmd invsqrt.d 1
0.998307
%> rdmd invsqrt.d 2
0.70693
%> rdmd invsqrt.d 4
0.499154
%> rdmd invsqrt.d 16
0.249577

D will even let you write inline assembly if you really want to squeeze the most performance out of it. But this is all just fun and games. How can D help in a real-world scenario?


Ripping Through Data

During the course of our work at AdRoll, we had some infrastructure in D that was running fine for a while, but at a certain point, when our data problems exceeded the scope this code was designed for, we had to optimize. And, believe it or not, the problem was as banal as pulling some fields out of delimited files. This is the gist of what we did.

This particular log file contains some ad data delimited by the ASCII record separator. Let’s say we want to pull out some timestamps and the country whence these data come. As you can probably imagine by now, the naïve D solution is quite readable, but is perhaps not as snappy as we’d like:

import std.stdio;
import std.string;

static immutable uint TIMESTAMP_INDEX = 19;
static immutable uint COUNTRY_INDEX = 42;
static immutable char DELIMITER = cast(char)30;

void main(string[] args) {
    auto file = File(args[1], "r");
    foreach(char[] line; file.byLine()) {
        char[][] fields = split(line, DELIMITER);
        writefln("%s\t%s",
                 fields[TIMESTAMP_INDEX],
                 fields[COUNTRY_INDEX]);
    }
    file.close();
}
%> time rdmd parser.d log.txt > country_info

real	0m13.421s
user	0m13.270s
sys	0m0.153s

Thirteen seconds? That’s an eternity! This one log file is but a mere sample of what we’re producing at AdRoll—uncompressed, we generate about 130TB of log files each day. Ok, we could potentially distribute the problem, but spinning up clusters takes time, and one of our mottos is to “do more with less.” How can we improve the performance here to keep our scalability down?

One of the best things we can do is minimize the amount of memory we’re allocating; we allocate a new char[] every time we read a line. But even beyond that, we read through the line to put into this char[], and read through it again to split it by our record separator. But this splitting creates an array of all of the fields in the line, and allocates memory for each and every one of them. From an efficiency standpoint, this clean, straightforward code is actually quite a mess.

To address our first concern with memory allocation, we can have a buffer that’s already allocated to read into. The trick here is that the next line will immediately follow the previous line in the same buffer, and the end of it may not fit into the buffer. Worse, we may only get a partial bit of a field that we care about.

The solution is to have two buffers which we swap between. The fields we care about may straddle both buffers, so we’ll also construct a double-length buffer for simple reconstruction. When we reach the end of the current buffer, we’ll make it the “last buffer,” and then load more data into the other buffer, promoting it to the current buffer. The caveat is that we need to make sure that none of our lines has a length longer than both of these buffers combined.

Our second concern had to do with the inefficiency of splitting. Instead of breaking apart the line in totality, once it’s in memory, let’s just sequentially read through it. We’ll keep track of our progress through both the buffer (index of the array) and our current line (number of delimiters we’ve seen). Once we hit the right number of delimiters, we just need to find the next delimiter, and we know our field is the contents in between. If we hit the end of a line by reading a newline, we’ll reset our line progress.

Finally, once we’ve collected all our fields, we just need to rip through the buffer until we find the next newline.

Enough chit-chat; let’s look at some code:

import std.stdio;

immutable static ulong READ_BUFFER_SIZE = 32768;
immutable static char DELIMITER = cast(char)30;
immutable static char NEWLINE = cast(char)10;

immutable static uint TIMESTAMP_INDEX = 19;
immutable static uint COUNTRY_INDEX = 42;

immutable static char[] UNKNOWN_FIELD = "unknown";

class FastReader {
    File file;

    char[] bufferA;
    char[] bufferB;
    char[] double_buffer;
    char[] current_buffer;
    char[] last_buffer;
    ulong num_buf;

    ulong index;
    uint num_del;
    bool line_end;

    this(string filename) {
        file = File(filename, "r");
        bufferA = new char[READ_BUFFER_SIZE];
        bufferB = new char[READ_BUFFER_SIZE];
        double_buffer = new char[2 * READ_BUFFER_SIZE];
        current_buffer = bufferA;
        last_buffer = bufferB;
        num_buf = 0;

        index = 0;
        num_del = 0;
        line_end = false;
    }

    void reset_progress() {
        num_del = 0;
        line_end = false;
    }

    void swap_and_load() {
        last_buffer = current_buffer;
        num_buf++;
        if((num_buf & 1) == 0)
            current_buffer = bufferA;
        else
            current_buffer = bufferB;
        current_buffer = file.rawRead(current_buffer);
        index = 0;
    }

    void get_field(uint field_id, ref char[] field) {
        if(line_end) {
            field = UNKNOWN_FIELD.dup;
            return;
        }
        ulong start = index;
        while(num_del < field_id) {
            if(start == current_buffer.length) {
                swap_and_load();
                start = 0;
            }
            if(current_buffer[start] == NEWLINE) {
                line_end = true;
                break;
            }
            if(current_buffer[start] == DELIMITER)
                num_del++;
            start++;
        }
        if(start == current_buffer.length) {
            swap_and_load();
            start = 0;
        }
        if(line_end) {
            field = UNKNOWN_FIELD.dup;
            index = start + 1;
            return;
        }
        ulong end = start;
        bool swapped = false;
        while(current_buffer[end] != DELIMITER) {
            end++;
            if(end == current_buffer.length) {
                swap_and_load();
                swapped = true;
                end = 0;
            }
            if(current_buffer[end] == NEWLINE) {
                line_end = true;
                num_del--; //Don't count as delimiter
                break;
            }
        }
        num_del++;

        if(!swapped)
            field = current_buffer[start .. end];
        else {
            ulong size_end = last_buffer.length - start;
            double_buffer[0 .. size_end] = last_buffer[start .. $];
            double_buffer[size_end .. (size_end + end)] =
                current_buffer[0 .. end];
            field = double_buffer[0 .. (size_end + end)];
        }
        index = end + 1;
    }

    void advance_to_next() {
        ulong start = index;
        if(start == current_buffer.length) {
            swap_and_load();
            start = 0;
        }
        while(current_buffer[start] != NEWLINE) {
            start++;
            if(start == current_buffer.length) {
                swap_and_load();
                start = 0;
            }
        }
        index = start + 1;
        reset_progress();
    }

    bool eof() {
        return file.eof & (index == current_buffer.length);
    }

    void close() {
        file.close();
    }
}

void process_file(string file) {
    char[] timestamp;
    char[] country;
    FastReader frd = new FastReader(file);
    while(!frd.eof) {
        frd.get_field(TIMESTAMP_INDEX, timestamp);
        frd.get_field(COUNTRY_INDEX, country);
        frd.advance_to_next();
        writefln("%s\t%s", timestamp, country);
    }
    frd.close();
}

void main(string[] args) {
    process_file(args[1]);
}

This really beefs up our simple program from before to accomplish the same task. Let’s break it down a bit. bufferA and bufferB are the two backing buffers, and they will be pointed to by either current_buffer or last_buffer, depending on our state, which we track with num_buf. To keep track of our progress through current_buffer we use index, and to keep track of our progress through a line we use num_del to represent the number of delimiters we’ve seen thus far. Finally, we want to know if we’ve hit the end of a line with line_end.

The instantiation of FastReader is straightforward: we start with bufferA. reset_progress() gets called when we hit the start of a new line and it just updates the state to reflect that. swap_and_load() is our toggling method. Note that the num_buf operation to determine our current_buffer is equivalent to num_buf % 2.

There’s also a quick catch here with the file.rawRead() method: though it takes a char[] in and populates it from the File, if we hit an end of file, the char[] will have its previous contents past the EOF. For example:

import std.stdio;

void main(string[] args) {
    auto file = File("test.txt", "r");
    char[] buffer = "0123456789".dup;

    writeln("buffer length:\t", buffer.length);
    writeln("buffer content:\t", buffer);

    file.rawRead(buffer);

    writeln("buffer length:\t", buffer.length);
    writeln("buffer content:\t", buffer);
}
%> echo -n "test" > test.txt
%> rdmd test.d
buffer length:	 10
buffer content:	 0123456789
buffer length:	 10
buffer content:	 test456789

For our purposes, this is an undesirable behavior because we want to know when we’ve actually hit the end of the File and the end of the last line. It turns out that file.rawRead() also returns a char[] which is the slice of the passed in array that has new content. Just changing the line to buffer = file.rawRead(buffer) fixes the issue:

%> rdmd test.d
buffer length:	10
buffer content:	0123456789
buffer length:	4
buffer content:	test

We use the same construction in swap_and_load(), finally resetting our progress through current_buffer to 0.

get_field() is the trickiest method, but still intelligible. If we’ve already hit the end of the line, there’s no field to find, so we write out that it’s unknown. Starting from where we are in current_buffer, we just start counting delimiters. If we hit the end of current_buffer, it’s time to swap_and_load(). Once we’ve hit the correct number of delimiters, we need to find the next one. This is essentially the same code, but we need to know if we swap buffers during this process. If we hit a newline, we count that as the end of the field.

Constructing the field is simple: if we didn’t swap, it’s just the slice of current_buffer from our start to end indices. Otherwise, we piece it together from both last_buffer and current_buffer.

The advance_to_next() method also works in a similar way to searching out delimiters, but instead we look for newline characters, move into the next line, and then reset_progres().

There are a couple of catches at this point. First, there’s our eof() method. It’s possible that we hit EOF but there are more lines to read. We check for this by ensuring that we’re only truly done when our index is the same as our current_buffer length. Finally, in our process_file() call, we make sure that we move on to the next line after getting all the fields we require. Critically, we search for the fields in their numerical order. We need to do this because we run through our lines in sequence and never look back.

Ok, so our code gained some weight. But fortunately, it gained weight in terms of muscle mass instead of bloat. Check it out:

%> time rdmd parser_fast.d log.txt > country_info_fast

real   0m2.159s
user   0m2.093s
sys    0m0.068s

Hey, that’s better than a 6x improvement! We can now run over way more data in the same amount of time. Just to make sure nothing fishy is going on:

%> diff country_info country_info_fast
%>

Awesome. But hey, y’know what? It would be really cool if we could exploit the fact that I’m running on a multi-core machine and read in multiple files at once. How would we do that?

import std.parallelism;
import std.stdio;
import std.string;

/* Then it's all the same until main()... */

void main(string[] args) {
    auto files = split(args[1],	",");
    foreach(string file; parallel(files)) {
        process_file(file);
    }
}

Well, that was pretty easy. What type of performance do we get?

%> for i in 2 3 4; do cp log.txt log$i.txt; done
%> time rdmd parser_parallel.d log.txt > more_country_info

real	0m2.219s
user	0m2.153s
sys	0m0.070s
%> time rdmd parser_parallel.d log.txt,log2.txt > more_country_info

real	0m2.308s
user	0m4.340s
sys	0m0.228s
%> time rdmd parser_parallel.d log.txt,log2.txt,log3.txt > more_country_info

real	0m2.458s
user	0m6.821s
sys	0m0.354s
%> time rdmd parser_parallel.d log.txt,log2.txt,log3.txt,log4.txt > more_country_info

real	0m2.634s
user	0m9.213s
sys	0m0.781s

Wow! That’s over five time faster than our original solution while running over four times the data, for a 20x performance boost! And we’re not done yet…

rdmd doesn’t perform as many optimizations as it could. Once our code is in the state that we want it, it makes sense to compile with dmd rather than running it under a scripting idiom. To be fair, we’ll also compile down our original naïve solution and our non-parallelized one:

%> dmd -O -release -inline parser.d
%> time ./parser log.txt > country_info

real	0m8.352s
user	0m8.224s
sys	0m0.128s
%> dmd -O -release -inline parser_fast.d
%> time ./parser_fast log.txt > country_info_fast

real	0m0.697s
user	0m0.637s
sys	0m0.060s
%> dmd -O -release -inline parser_parallel.d
%> time ./parser_parallel log.txt > more_country_info
real	0m0.950s
user	0m0.759s
sys	0m0.181s
%> time ./parser_parallel log.txt,log2.txt > more_country_info

real	0m0.857s
user	0m1.467s
sys	0m0.196s
%> time ./parser_parallel log.txt,log2.txt,log3.txt > more_country_info

real	0m1.127s
user	0m2.469s
sys	0m0.617s
%> time ./parser_parallel log.txt,log2.txt,log3.txt,log4.txt > more_country_info

real	0m1.527s
user	0m3.792s
sys	0m1.264s

After compiling everything down, our fast solution has nearly a 12x performance boost over the naïve one, and our parallelized solution, when run over four times the data, has nearly a 22x boost.

At AdRoll Data Science, we’ve become big fans of D, and it’s easy to see why. We can rapidly prototype new infrastructure and analysis tasks, and when efficiency becomes a core concern, we have the ability to refactor that same code base to squeeze as much performance out as possible. If you’re interested in tackling big data problems with an eye for lean, mean code, you should let us know.