22 December 2018

Abstract
We tackle Kevin Lynagh’s Shipping Puzzle challenge in Differential Dataflow.

The puzzle itself is not that difficult to solve. We need to greedily join legs together into routes until no two of the remaining legs intersect. Whatever number of routes we end up with is the number of planes we will need to service all legs according to plan.

Differential is a programming framework for constructing data-parallel dataflow computations that a) update quickly in response to arbitrary input changes and b) can be — if needed — distributed across multiple worker threads and machines. Differential itself is written in Rust.

Even without worrying about dataflows or incremental computation, we know that we will be dealing with some data.

enum Day { M, T, W, R, F, }

/// Legs are tuples of (id, dow, from, to).
/// e.g. (123, Day::W, "LOS_ANGELES", "CHARLOTTE")
type Leg = (u64,Day,String,String);

/// Routes are sequences of leg ids with some bookkeeping data.
type Route = (Vec<u64>,String,bool);

We will initialize all legs as single-leg routes. It is easy to see that two routes can be joined iff the last leg of one arrives at the same airport and on the same day at which the first leg of the other departs.

At any given day of the week, we can split the set of all active legs into arriving and departing ones. Finding joinable routes is a matter of indexing departing legs like so (in slightly simplified Rust):

// Departing legs at day d will be at their source on that day.
departing.map(|(id,day,start,end)| ((day,start),vec![id]))

arriving ones like so:

// Arriving legs at day d will be at their destination 
// on the following day, next_day(d).
arriving.map(|(id,day,start,end)| ((next_day(day),end),vec![id]))

and to group them together by those keys:

arriving
    .concat(departing)
    .group(|(day,place), input, output| {
        // @TODO pair-up joinable routes 
    })

The group closure will receive a list of joinable routes as input. In order to pair them up correctly, we need to re-introduce the distinction between arriving and departing, zip them together, and deal with any remaining ones (pseudo-ish code again).

let (arrivals,departures) = input.partition(/* is-arriving? */);
let pairs = arrivals.zip(departures);

for ((arriving,_,_),(departing,to,_)) in pairs {
    let mut route = arriving.clone();
    route.extend(departing);
                        
    output.push((((next_day(day),to),(route,to,false)), 1));
}

if arrivals.len() > departures.len() {
    for (route,to,_) in arrivals.drain(departures.len()..) {
        output.push((((day,place),(route,to,false)), 1));
    }
} else if departures.len() > arrivals.len() {
    for (route,to,_) in departures.drain(arrivals.len()..) {
        output.push((((next_day(day),to),(route,to,false)), 1));
    }
}

Our working code will have to be slightly messier, because we need to keep track of which routes are arriving and which are departing. We also need to repeat the above for each day of the week.

Evaluation

> cargo run --release --example shipping

Finished release [optimized + debuginfo] target(s) in 0.9s 
Running `target/release/examples/shipping`

7.112284ms	loaded
found ((), 0, 2414) routes
47.437305ms	stable

Our solution finds 2414 routes in the 10’000 leg dataset in ~40ms, running on a single-thread on my 2015 MacBook (2.7 GHz). This is about an order-of-magnitude slower than Kevin’s Rust solution and around 20x slower than Kevin’s improved Rust solution (using references rather than owned values).

So what has using Differential here bought us?

Updating The Computation

Two things. The first becomes apparent when we add or remove legs after computing the initial result:

// Add a new leg.
legs_in.insert((20000, Day::W, "LOS_ANGELES", "CHARLOTTE"));
legs_in.advance_to(2);
legs_in.flush();

// Remove an existing leg.
legs_in.remove((206, Day::T, "DETROIT", "SEATTLE"));
legs_in.advance_to(3);
legs_in.flush();

This results in the following output.

> cargo run --release --example shipping

Finished release [optimized + debuginfo] target(s) in 0.10s
Running `target/release/examples/shipping`

6.996085ms	loaded
found ((), 0, 2414) routes
48.665294ms	stable
found ((), 1, -1) routes
49.503192ms	stable
found ((), 2, -1) routes
52.384948ms	stable

Computing the initial result again takes around 40ms, but updating the computation w.r.t. the two changes only takes ~0.8ms! We can see that inserting an additional leg from Los Angeles to Charlotte has indeed allowed us to join two more routes, reducing the number of planes required by one.

Removing the leg 206 from Detroit to Seattle had the same effect, which means that leg 206 could not be joined with any other route previously.

At some point, we expect each individual (time,place) combination to saturate, as we add more legs. E.g. if instead of adding a single new Los Angeles - Dallas leg, we add 12:

legs_in.insert((20000, Day::W, "LOS_ANGELES", "CHARLOTTE"));
legs_in.insert((20001, Day::W, "LOS_ANGELES", "CHARLOTTE"));
// ...
legs_in.insert((20009, Day::W, "LOS_ANGELES", "CHARLOTTE"));
legs_in.insert((20010, Day::W, "LOS_ANGELES", "CHARLOTTE"));
legs_in.insert((20011, Day::W, "LOS_ANGELES", "CHARLOTTE"));

legs_in.advance_to(2);
legs_in.flush();

we receive the following output:

> cargo run --release --example shipping

Finished release [optimized + debuginfo] target(s) in 0.09s
Running `target/release/examples/shipping`

6.703326ms	loaded
found ((), 0, 2414) routes
45.915438ms	stable
found ((), 1, -10) routes
47.853393ms	stable

The -10 tells us, that by adding 12 new legs (which took ~2ms), we only managed to reduce the number of planes required by 10.

Distributing The Computation

The second thing Differential allows us to do, is to distribute the computation across multiple workers, such that each worker processes a slice of all input data. E.g. to run across two worker threads:

> cargo run --release --example shipping -- -w2

Finished release [optimized + debuginfo] target(s) in 0.08s
Running `target/release/examples/shipping -w2`

5.101823ms	loaded
5.086969ms	loaded
found ((), 0, 2414) routes
33.379149ms	stable
33.405318ms	stable

We receive the same output, with each worker finishing its part of the computation after ~28ms which is a nice improvement — even though we would not expect to benefit much from distributed execution on this particular problem, due to the small dataset and the inherently sequential, day-by-day processing of legs.

Conclusions

Your author is of the opinion that small puzzles like this one are great to explore a non-trivial problem from many different angles (Kevin’s posts reference multiple solutions in Clojure, Rust, Carp, Alloy, and MiniZinc).

Differential dataflow computations provide just another angle, but (again in your author’s opinion) an especially fun one, that at the same time results in straight-forward, functional code, attractive performance characteristics, and a great story for scaling to the yet-to-be-gathered 10 billion leg dataset.

Sources