RxJava — The worst of all worlds

2017-08-31 in java

RxJava, short for “Reactive Java“, the Java implementation of the “ReactiveX” specification, is sometimes used to facilitate asynchronous or concurrent programming in Java, particularly in code from Netflix, its original implementor. It enjoys some popularity in the realm of server backend programming, and there appears to be surprisingly little criticism of it.

“ReactiveX” is an extension of the observer pattern and perhaps works well in realms where that pattern is actually used, such as GUIs. But nobody in their right mind writes GUIs in Java anymore outside a single mobile platform. RxJava is instead purported to instead be a solution to asynchronous tasks on the server, something for which the observer model is an extremely poor fit.

RxJava appears to draw people in on the basis of pretty diagrams that make it look easy to work with and vacuous conceptions of universality. In reality, outside of a very, very narrow space where it fits well, it brings far more harm than good.

I want to emphasise that the below is specifically a critique of observed usage of RxJava for certain server tasks, particularly asynchronous Jersey-based HTTP APIs. Some points may apply to usage of other ReactiveX implementations, but I have no experience with those. This is not a criticism of the design or implementation of RxJava itself.

Universality is meaningless

One occasionally sees someone promote one pattern or another with the fact that you can “do anything with X”. This isn’t unique to ReactiveX; it’s easy to find these statements about object-orientation, functional programming languages, over-engineered XML interpreters, webscale databases, and the latest way to fix all your business’s problems by just restructuring your meetings.

And they’re not wrong. But the fact that you can do anything with a certain methodology is not a high bar. I don’t mean this in the Turing-completeness “you can also do anything in Befunge” sense — the universality does at least need to be usable. But the number of building blocks needed to achieve usable universality in a system is quite small when building atop an existing language.

I am sad to admit that I know from experience that it takes around a day to code a sufficient fraction of Lisp primitives to somewhat easily code arbitrary things in a wonky Supplier-based concurrency model. RxJava is not special in this regard; universality says nothing about its quality, usefulness, or, most importantly, applicability.

RxJava appears to be two abstractions in one, but in reality is neither

The central type of RxJava is Observable<T>, where T is the type of item the observable “emits”. One can “subscribe” to an observable to receive items it emits after the subscription occurs, as well as notifications for when the observable has terminated or failed, should either of those conditions occur. The library provides an extremely large suite of combinators that can be applied to an observable, such as map, which takes a function and uses that function to transform every item emitted by the observable. This is all well and good for the observer pattern. It falls apart because people try to use it for other things.

The quintessential example I’ve had to work with is an HTTP client based on this pattern, where you might have a method like

1    public Observable<HttpResponse> execute(HttpRequest request);

This function is clearly trying to use Observable<T> as a sort of Future<T> — i.e., it will eventually produce exactly one value or fail. There are many problems with this construction.

There is nothing preventing the returned observable from terminating without emitting any responses, or from emitting multiple responses, or even from hanging forever. Looking at the function name, we can easily see that it “should” emit exactly one by knowledge of how HTTP works, but things become much blurrier with more generic types. What should you expect of Observable<String>? Is it a single value or a streaming sequence of values? Contrast this to Rust’s futures, which distinguishes between Future and Stream.

This leads to the conclusion that Observable conflates “future value” and “asynchronous stream of values”. RxJava actually has a Single<T> type that expresses an observable-like which emits exactly one value, but it is used astonishingly rarely. The failure to correctly encode the difference between a single value and a stream in preference for treating everything as a stream is extremely frustrating.

However, this conclusion is incorrect, for in reality an Observable is neither a value nor a stream. It’s an event bus.

The distinction between an event bus and a stream is subtle and leads to many misconceptions about the semantics of Observable-based code. Say you call the above execute function. When is the HTTP request actually issued? Is it issued? This is critically important since this could be, for example, a side-effecting service call. In a Future-based (the Java kind, not Rust) API, one would expect that the request would start immediately, and barring an incredibly pointless implementation of Future.get(), would be guaranteed to start immediately.

Not only is the Observable-based API not guaranteed to initiate the request immediately, it cannot start immediately! Since Observable is an event bus, if it started immediately, there would be the possibility that the request would complete before anyone had subscribed, and then the HttpResponse would be emitted with no subscribers; i.e., it would disappear into the void and the Observable would appear to be empty. Instead, the request must not be initiated until a subscriber subscribes to the observable. As a result, determining when and even if the request is executed is an extremely non-local question that cannot be determined by simply observing that the code was executed.

This means that subscribing to an Observable in this type of system can be extremely side-effecting, standing in stark contrast to the usual observer pattern, where observers are considered passive and the only reason to react to a subscription is to optimise away work if nobody is subscribed.

Another consequence of being an event bus is that streams are in general unordered. The largest impact is found in the flatMap combinator, which applies each emitted value to a function which itself returns a new Observable. The method’s similarity to the method of the same name found on java.util.stream.Stream and other constructs lends itself to the expectation that the inputs are processed in a sequential manner, or at the very least that results are emitted in an order corresponding to the inputs. This is in fact false; flatMap returns values in whatever order is most convenient. In some cases, this happens to correspond to input order, masking the problem.

(There is an eagerFlatMap combinator which does preserve order, but that doesn’t help the smell that one is using the wrong abstraction.)

RxJava combines the worst of the synchronous and asynchronous worlds

“Synchronous programming” refers to the traditional one-thing-at-a-time style procedure. If a program tries to do something but isn’t able to do it immediately, the program blocks until it is able to do it. During the waiting time, no useful work occurs. In order to do multiple things at once in a synchronous environment, one must spin up extra operating-system-level threads. A process that needs to handle many concurrent requests must therefore create hundreds or even thousands of threads, each of which spend much of their lives idling.

The alternative, “asynchronous programming”, entails detecting the fact that an operation would block, and in that case saving the current state off somewhere and doing something else until that operation can be performed without blocking. This allows a single thread to handle many simultaneous requests.

A handful of programming environments, such as Go and Erlang, do this automatically by providing their own threading abstraction independent of operating system threads and able to do a light context switch if one of the virtual threads would block. For these environments, no effort is required for asynchronous programming nor are any language features lost.

Environments without built-in support require building solutions within them. The traditional approach is to define explicit state machines which act as that “saved state” when a task is suspended. This works, but can be tedious or difficult to implement. Other solutions include asynchronous callbacks as used in Node.js or the polling model provided by Rust futures.

Writing asynchronous code, even with these nicer abstractions, in a natively synchronous environment has its downsides. Because the language runtime is not aware of the logical connections between parts of the code, error backtraces generally do not correspond to the logical flow of the program. Breaking what are logically individual functions into smaller parts breaks language features relying on lexical structure, such as trycatch exception handling, try-with-resources, and certain closures. A bug in the handling of an event can cause a chain of actions to be lost, resulting in a section of code hanging forever without any visibly stalled actions.

RxJava inherits all these problems. Its stack traces are particularly infamous, often containing hundreds of lines of intra-Observable method calls but not a single frame belonging to the application, making it impossible to determine where in the logical flow an error actually occurred. Despite how bad this sounds, it is not in and of itself the big problem here.

The big problem is that the Java ecosystem is full of synchronous code. Virtually all database drivers are synchronous-only. A number of synchronous libraries masquerade as async; particularly egregious is a certain HTTP client which can return an Observable, but then proceeds to execute the HTTP request not only synchronously, but on the calling thread by default.

Blocking on a reactor thread (i.e., a thread running the asynchronous state machine) is catastrophically bad, as reactor thread pools are typically tiny and, depending on the implementation, tasks may be pinned to threads, so a single blocker will block all tasks that share the same thread. To mitigate this, blocking tasks are outsourced to thread pools; the asynchronous worker threads watch for the task completing on the thread pool before continuing the respective task.

Thread pools work, but take us a step back towards the key problem of synchronous code we were trying to avoid: thread sprawl. How bad does it get? Worse than synchronous code. Say an application’s request handling is largely database-bound. That database’s Java driver probably uses JDBC, so it’s synchronous and database queries will need to run in a thread pool. In order for database operations to not pile up behind each other, that thread pool needs to have a size equal to the maximum concurrent requests you expect.

But wait, “thread pool of size equal to the maximum concurrent requests” is exactly how the request thread pool for synchronous code is set up! In addition, you also have a few reactor threads, plus another thread pool from some synchronous library masquerading as asynchronous, plus overhead for constantly synchronising and transferring things across threads. If running in a Jetty/Jersey environment, the icing on the cake is that you even still have the thread pool used for synchronous requests. The end result is that there are even more threads than the straight-forward synchronous solution would have produced.

Going async only makes sense if you can go all-in. Vanishingly few server-side Java applications meet this requirement. In the majority that do not, using RxJava for asynchrony brings all the pain of asynchronous code and all the problems of synchronous code.

RxJava combines the worst of the serial and concurrent worlds

The phrase “you don’t know or care about X” is sometimes found in descriptions of simplifying abstractions. It makes sense for things like sorting routines or database query optimisation. But apparently someone decided that setting X to “whether your code is run on one or multiple threads” was a good idea.

Quite a few asynchronous frameworks are single-threaded. Node.js and various Python frameworks are often mentioned because their language runtimes don’t even permit running live code concurrently. But this also extends to runtimes with full threading support, such as Rust futures. Restricting asynchrony to a single thread (more specifically, pinning each task to a single thread) has the advantage that you don’t need to think about thread safety unless you explicitly outsource work to a separate thread.

RxJava is not one of those systems. What thread a particular callback is executed upon is almost entirely at the whim of the upstream observable. Two separate callbacks could easily be executed on separate threads. This alone is bad; in theory, an Observable is supposed to consistently execute its callbacks on one thread, but apparently not even that holds, because the class provides a combinator to “fix” observables broken this way by outsourcing running the callbacks to yet another thread pool.

In other words, code downstream of an Observable needs to be defensively thread-safe. Determining whether this is strictly necessary requires either explicitly triggering a rescheduling onto a different thread pool, or understanding how the source observable(s) and all combinators in between handle scheduling.

The only benefit of dealing with this forced thread-safety is the fact that the code works. There’s no appreciable speed-up from (potentially) running the tiny inter-operation microtasks concurrently, because such compute-heavy code normally gets outsourced to thread pools in large chunks explicitly.

In other words, any multi-threaded RxJava code-base pays the maintenance and understandability costs of thread-safety while getting compute performance that is at most equal to single-threaded execution.

RxJava combines the worst of the procedural and functional worlds

Functional paradigms are often seen as being easier to work with in the long run as they encode computation in a way that does not depend on understanding execution order.

Indeed, Haskell is entirely built on this premise: there are no side-effects, so execution order is not only ill-defined, it is unobservable. Interacting with the outside world does entail side-effects; Haskell tackles this problem by using monads to encode a procedural-style description of those side-effects, which execute in a well-defined order.

In contrast, RxJava is built around expressing computations in procedural Java code. Interacting with the outside world is tackled by using combinators to encode a functional-style description of those side-effects, which execute in an ill-defined, but observable, order.

Puns aside, the functional-style combinators of RxJava aren’t really a problem, and in similar contexts would be welcome. The real problem goes back to the event-bus model. An operation which produces an output cannot start until it has been subscribed to. This means that understanding the order of execution, if it is even deterministic, requires tracing through the functional combinators to determine what order they will subscribe to each other.

For operations that need not happen in a particular order, or where one inherently depends on the value output from a prior one, this isn’t an issue. It becomes an issue when multiple operations must be executed in sequence but do not directly depend on each other. This is generally solved by various awkward hacks, such as threading a meaningless value through multiple layers of flatMap, which isn’t really “meant” to be used that way.

Essentially, one is forced to factor code into a functional style using the combinators while at the same time needing to think about procedural-style execution order.

You probably don’t need async

Not only is the set of server-side Java applications which can benefit from async code rather small, the subset where this benefit is material is far smaller. Modern operating systems are really good at managing large numbers of threads. The JVM was built on the premise that this was the way people were going to do things.

Your application needs to handle a few hundred requests per second? Unless each request entails a dozen network requests and nothing else, make it synchronous and you can still run it on a toaster. Too much for your toaster? Ten toasters is probably still cheaper than the increased developer time to write and maintain async code, even with any perceived simplifications RxJava brings.

The cheapness of server hardware versus programmer time comes off in a lot of discussions of trade-offs, and is one of the central reasons, for example, of why people write Ruby on Rails apps instead of coding everything in C. But for some reason, a lot of RxJava proponents act as if it’s free. It’s not. But unlike a lot of other async solutions, its cost comes more during maintenance rather than initial implementation, making it an easier trap to fall into.

On the other side of the spectrum, applications that really, really need async — as in the “cloning nginx” level of needing async — are also poor fit for using RxJava to manage it, as performance requirements dictates having fine control over scheduling, which RxJava can do but is extremely easy to get wrong, especially once back-pressure comes into the picture.

Somewhere between these two extremes, there is a place, a certain limbo, where RxJava functions well. But even in that place, I’m not sure it’s worth it. Were RxJava a system of two separate “future value” and “asynchronous stream” abstractions like so many people try to pretend it is, probably. However, that’s not what it is, and limbo applications often have other, far less painful opportunities for optimisations which make a bigger impact than async.