Data processing is a common task in development which can often be solved through processing pipelines. Like the name implies, a pipeline is made up of a series of pipes where the output type of one pipe is the input type for its adjacent pipe. A simple, light weight, and powerful language construct for building pipes are coroutines, which can suspend and resume operation while maintaining state. If you're developing in a language with native coroutine support—like Python—then you can put that scaffolding to work developing the processing pipeline of your dreams. However, if you're not fortunate enough to have access to a native coroutine construct, they still serve as a great source for inspiration when developing your own solution.

The Anatomy of a Coroutine

It wouldn't be fair to dangle the coroutine mention and not take a brief detour to demonstrate its usage. But first, lets make one thing clear: coroutines should not be thought of solely as pipes, they're much more than that. Instead, we can think of a pipe as a concrete implementation of a coroutine. Let's look at some simple coroutine examples in Python, that hopefully are easy enough to follow along with regardless of your familiarity with the language:

def log_filter(pattern, out):
    fout = open(out, 'w+')
    try:
        while True:
            line = (yield)
            if pattern in line:
                fout.write(line)
                fout.write('\n')
    except GeneratorExit:
        fout.close()

# our generator reference
g = log_filter('pipe', '/var/log/logs-about-pipes.log')
# prime the generator
g.next()
# send input to our generator
g.send('did someone say pipe?')
# close the generator
g.close()

What's with the generator mentions? Well, technically a coroutine is a type of generator, but unlike a generator they're primarily used to consume values rather than produce them. In this quick example we define a coroutine function called log_filter which filters messages to the provided log output file if they contain a particular pattern. Once the close method is called on the generator, it cleans up after itself by closing the open file.

Now that the basic principles of the coroutine are established, lets take the example one step further to see how a pipe is actually formed.

@coroutine
def log_filter(pattern, target):
    try:
        while True:
            line = (yield)
            if pattern in line:
                target.send(line)
    except GeneratorExit:
        fout.close()

@coroutine
def sink():
    try:
        while True:
            line = (yield)
    except GeneratorExit:
        pass

# create our pipe
pipe = log_filter('pipe', log_filter('dream', sink()))

# push data through the pipe
with open('input/file.txt', 'r') as fin:
    for line in fin:
        pipe.send(line)

First off, we've added a decorator called coroutine (whose implementation I'm omitting here for simplicity) which automatically primes coroutines by making the first call to next. You'll also notice our log_filter coroutine now receives a target argument in place of the output file path, which is expected to be another pipe for it to continue funneling data into that matches its pattern. And naturally, every pipe needs a sink at the end, so we've added a sink coroutine which will endlessly receive values that it does nothing with. The input file is read line by line, pushed through the pipe, and only strings about pipe dreams find their way into the sink. How befitting.

This may not be the most useful pipe, but one could easily imagine something more thoughtful. For example, we could construct a similar pipe to filter messages of different log levels that are output by a web application. The sink might be responsible for handling the filtered messages by notifying other system services, or triggering email notifications. Perhaps instead of filtering data, we create a pipe that transforms data, contributing additional meta data to a context object, similar to middleware in a web framework.

The Coroutine Inspired Java Pipe

Let's start with a simple interface. We know our pipe should have a send and close method. Because we're working with a typed language, it would also make sense for this to be a generic.

public interface Piped<I> extends java.io.Closeable  {
    void send(I input);
}

Using this interface, we can construct an abstract class that encapsulates some more of the boilerplate functionality. We'll start by making a synchronous pipe.

public abstract class SyncPipe<I, O> implements Piped<I> {
    private Piped<O> target;
    private boolean isOpen = true;

    SyncPipe(Piped<O> target) {
        this.target = target;
    }

    public void send(I input) throws IllegalStateException {
        if (isOpen) {
            O next = use(input);
            target.send(next);
        }
        else {
            throw new IllegalStateException();
        }
    }

    protected abstract O use(I i);

    @Override
    public void close() {
        isOpen = false;
    }
}

When extending the SyncPipe one only need implement the use method, which will get called each time a new input is sent to the pipe, and be expected to return the correct output type to send along to the target pipe. Pretty straight forward. Of course, if we're going to create a synchronous pipe, we can't not create an asynchronous version as well. Since we'll be using the same interface, one could easily create a system of pipes that consist of both versions, choosing the most appropriate for the task at hand.

import java.util.concurrent.*;

public abstract class AsyncPipe<I, O> extends Thread implements Piped<I> {
    private Piped<O> target;
    private boolean isOpen = false;
    private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
    private ExecutorService es = new ThreadPoolExecutor(4, 4, 0,
            TimeUnit.SECONDS, this.queue);
    private LinkedBlockingQueue<Future<O>> futureQueue = new LinkedBlockingQueue<>();

    AsyncPipe(Piped<O> target) {
        setDaemon(true);
        this.target = target;
    }

    public void send(I input) {
        futureQueue.add(es.submit(use(input)));
    }

    public boolean hasMore() {
        return queue.peek() != null;
    }

    protected O getNextOutput() {
        O nextOutput = null;

        try {
            nextOutput = futureQueue.take().get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        return nextOutput;
    }

    protected abstract Callable<O> use(I i);

    @Override
    public void run() {
        isOpen = true;
        while (isOpen || hasMore()) {
            O nextOutput = getNextOutput();
            if (nextOutput == null) continue;
            target.write(nextOutput);
        }
    }

    @Override
    public void close() {
        isOpen = false;
    }
}

There's quite a bit more moving parts here. The first change you'll notice is we're extending the Thread class. This isn't entirely necessary, especially if you wanted to mix synchronous queues with asynchronous ones. In fact it could make managing a series of mixed pipes more of a headache. I've gone this route because it seems more likely one would construct a pipeline of entirely asynchronous pipes, and the blocking calls to the futureQueue would hinder performance.

The next big change is our use method that now returns Callable<O> which will get submitted to the ExecutorService. We could use something like an ExecutorCompletionService if it didn't matter what order the outputs were generated in. However, since this is a pipeline, I'll assume order matters, and we'll adhere to a FIFO (first in first out) approach.

The rest of the meat is in the run method. Here we flag the pipe as open just like before, and we continually call getNextOutput (which blocks until it has a new output object) and feed it to the target pipe. If the pipe gets closed, we allow the thread to continue running until its completely emptied its task queue.

So... Who Needs Coroutines?

Even though we managed to create our own coroutine-ish implementations of a pipe without real coroutines, we can't forget a few important facts. Native coroutine implementations are light weight, performant, stateful, and save the developer a lot of boilerplate code. Coroutines also serve as a common alternative to threads, which is much welcome in languages (like Python) that don't sport featureful concurrency utilities like Java. As such, our AsyncPipe could easily knock the socks off a synchronous pipeline that used native coroutines when dealing with certain types of operations (for instance, heavy network and IO tasks), but it comes with additional overhead too. In short, coroutines rock, but even in their absence we're still able to come up with creative solutions.

comments powered by Disqus