Interested in working with us? We are hiring!

See open positions

Managing Containerized Data Pipeline Dependencies With Luigi

Oleg Avdeev Written by Oleg Avdeev, October 15, 2015

This is the second article in a series that describes how we built AdRoll Prospecting. This time, we’ll talk about managing batch job dependencies.

Batch processing and data pipeline coordination is one of the oldest application of computer systems. In late 1950s US Navy and NASA first started using computers for task scheduling—tasks being assembly jobs performed by people. Controlling people that way is what topological sorting algorithms were invented for. Later, in mid-1960s, one of the killer features of the IBM’s novel operating system OS/360 was its extensive mainframe task management capabilities.

RTOS/360.

Back then, system level limitations, like memory and disk space, were major concerns. Therefore, scheduling required a lot of manual hints and tweaks.Nowadays—well, maybe over the last 40 years—this became less of an issue. One can easily buy hundreds of gigabytes and terabytes of RAM and disk space, enough to store, say, bios and photos of every person on Earth for the price of a family car.

Building effective and robust data processing pipelines became much easier compared to 1960s; there are quite a few tools available to do this today. To name some, since it is hard to keep a comprehensive list:

Many of them can fit the bill for a lot of cases, but still, there are some trivial things that in our experience are easy to overlook and get oneself in trouble. Specifically:

Problem with time based scheduling

Time-based scheduling is something everyone starts with. For example, analysts want a daily revenue report to be generated at 9AM daily. We can just use cron(8) or try something fancier like Chronos for this. And that works fine for simple cases.

timebased_1

Next stop: dependencies

Not too long after setting up cron, it turns out that we need our task to run after a daily database ETL job runs. A simple solution comes to mind: just run ETL job at 6AM, then trigger daily revenue report generation job as ETL job ends.

timebased_2

And a bit later, but sooner than one may think, our pipeline will need to support multiple dependencies. Now these are not that easy to express in a time-based system: one cannot unconditionally trigger jobs downstream anymore, as multiple prerequisites must be met before they run.

timebased_3

Failures and reruns

It wouldn’t be that hard to work around these multiple dependencies if not for the fact that jobs fail. Or even worse, they don’t fail, but instead produce incorrect results due to a software bug somewhere upstream.

timebased_4

And with our fancy time-based scheduling, things get complicated: what if we need rerun yesterday’s report? Or rather, if yesterday’s report failed, if we rerun it will we get today’s or yesterday’s one? Should we backfill failed jobs or just abandon them and go on? What if that happens across midnight? Will it trigger “today’s” jobs downstream or “yesterday’s”? What if the pipeline starts to take too long? Can we restart or disable part of it?

Correct answer is to sit back and maybe reflect a little. Job pipelines are all about inputs, outputs, and dependencies, but not about time—that can easily be made another virtual dependency expressed as “wallclock time > X.”

timebased_5

make(1)

Some 40 years ago, someone already solved this problem in UNIX. If you define your pipelines as a directed acyclic graph of dependencies, they become trivial to reason about. Simply follow a few rules:

Following these principles makes it much simpler than your average makefile, assuming you have enough disk space (which, as we know, is cheap) to write all outputs.

Luigi

That’s why we now use Spotify’s Luigi. It is simple enough, extensible in Python, and doesn’t have weird implicit rules like make(1)—make is a build system, after all, and has its own quirks. All inputs and outputs are in S3, which provides great persistence guarantees, unlimited storage space, and read bandwidth that scales more or less linearly with the number of readers.

Docker

We also wanted to give users the ability to use tools they want for data processing: R comes to mind first; C, which we use extensively; and Lua and Haskell make appearances as well. This is where Docker comes handy: we use Luigi to manage dependencies and schedule containerized jobs, but normally, no data processing happens in the Luigi worker process itself. See our previous blog post for more details.

Here’s a typical Luigi job:

class PivotRunner(luigi.Task):
    blob_path = luigi.Parameter()
    out_path = luigi.Parameter()
    segments = luigi.Parameter()

    def requires(self):
        return BlobTask(blob_path=self.blob_path)

    def output(self):
        return luigi.s3.S3Target(self.out_path)

    def run(self):
        q = {
            "cmdline" : ["pivot %s %s" % (self.out_path, self.segments)],
            "image": 'docker:5000/pivot:latest',
            "caps" : "type=r3.4xlarge"
        }
        quentin.run_queries('pivot', [json.dumps(q)], max_retries=1)

As one can see, it is conceptually very similar to a Makefile, but with Python syntax.

The only missing piece is a system that would run containers on a cluster of EC2 instances, so that one doesn’t have to care about provisioning and scaling the cluster in the Luigi job. We built an in-house solution called Quentin; it is a simple job queue with a REST API and UI on top.

Quentin.

Quentin maintains the task queue and feeds metrics to Cloudwatch so AWS Autoscaling can take care of scaling clusters up and down.

Nice to have: dynamic dependencies

One feature of Luigi that we found very useful is the ability to create nodes in acyclic job graphs dynamically. This makes it possible to shard jobs dynamically based on volume, use Hadoop-like map reduce patterns, and even recursive reduce schemes, with a few lines of code. We use this heavily so our typical pipeline consists of thousands of containerized jobs orchestrated by Luigi, created dynamically by a higher level, “virtual” job.

Luigi.

Not necessary: high availability

Following the practices above, making jobs (mostly) idempotent, and storing all inputs and outputs in a resilient store like S3, we find that it is not necessary for components like the Luigi server or queue service (Quentin) to be highly available. Distributed systems are hard, and, in this case it costs us much more in development and operational efforts to ensure zero downtime survival during server and data center failures.

Currently, if anything fails, a new Luigi server comes up and continues from where it stopped, based on results that are already in S3. This works perfectly as long as the data pipeline is not very sensitive to small delays.

Conclusion

There are certainly things to improve in this ecosystem (like our friends at Pachyderm are trying to do), but even with the existing tools, it really pays off when you keep things simple: S3 as a data store, make-like dependency scheduler on top, simple non-HA job queue, and Docker as a working horse works. This combination works very well for us in practice.

If you’d like more details on our experience, here are slides and video of the talk on our data pipeline at re:Invent 2015.

Video: