Apache Storm and Clojure

Streaming Compute, Trident, and Marceline

Posted by Derek Troy-West on July 17, 2014 · 12 mins read

Image: unsplash-logoMatt Hardy

Storm is a distributed computation framework. Trident is an abstraction that sits on top of Storm that allows exactly-once semantics and stateful stream processing.

I did wonder if Storm was in decline since its move to Apache. Compared to other open-source projects I’m familiar with (Netty in particular) it seemed lacking in momentum. Happily a recent release indicates a heartbeat, and at the very least I can now use Clojure 1.5.1 rather than being stuck on 1.4.0.

Storm is partly implemented in Clojure, partly Java, it includes a Clojure DSL which can be used to define spouts and bolts and wire them into a topology to process a steam of tuples. Because Storm is distributed all elements of the topology need to be serializable, the (Nimbus) node that you submit the topology to will dice it up and distribute it amongst worker nodes which all exist within separate JVMs, this makes creating toplogies in Clojure more complicated because some of the ways you can implement a Java interface in Clojure (reify, defrecord) are not well suited to serialization.

The DSL deals with that complexity in quite a clever way, it uses Java shims that are loaded with meta about clojure call-back fns, take a look at ClojureBolt if you’re interested in understanding further.

There is no DSL for Trident packaged with Storm, however the team at Yieldbot have released one called Marceline which is similar in implementation to the basic Storm offering.

I found Marcline after blundering through various other options for implementing Trident operations:

Defining operations with Defrecord

The following filter works when testing within a LocalCluster (a single JVM):

(import 'storm.trident.operation.Filter)

(defn keep?

(defrecord SimpleFilter []
    (isKeep [_ tuple]

but fails when you deploy it to a live clustered environment with an error similar to:

java.lang.IllegalStateException: Attempting to call unbound fn: #keep?'

When the AOT’d defrecord class is deserialized within the worker JVM the enclosing namespace has not been loaded or required, so the keep? function is unbound. Further details captured in this Clojure bug.

Defining operations using Reify

Reify generates classes which are not serializeable between JVMs, I think because they’re effectively anonymous (not explicitly named) so the target JVM has no knowledge of them.

(reify storm.trident.operation.Filter
    (isKeep [_ tuple]

Elaborate :gen-class constructs

An example of a configurable filter, it works but look how ugly that is!

(ns filter-test 
  (:import (storm.trident.tuple TridentTuple)
           (storm.trident.operation Filter))
    :name test.trident.operation.LineFilter
    :implements [storm.trident.operation.Filter]
    :state type
    :init init
    :constructors {[String] []}))
(defn -init
  [[] type])
(defn -isKeep [this ^TridentTuple tuple]
"Determine if a stream should include the tuple"
  (let [line (.getValueByField tuple "field-name")]
    (= (.type this) (:type line))))
(defn -prepare [_ _ _])
(defn -cleanup [_])
(defn new-filter
  (.LineFilter. type))

It works because it’s basically a complete Java class described in Clojure. May as well just write Java classes.

Using Marceline

(:require [marceline.storm.trident :as t])

(t/deffilter filter-type

Marceline provides a number of macros which can be used to define filters, functions, and other Trident operations in a Storm topology.

In the simplest case these macros are very easy to use, I’ll cover a few more complicated cases: providing parameters to an operation; performing work when an operation is initialized; when and where to use AOT compilation.

Parameters and prepare methods are fairly standard across all macros within the Marceline DSL. As an example, a filter which operate on a stream of integers:

Filter even integers

This macro results in a ClojureFilter named filter-even being defined which can be passed directly to a topology.

(t/deffilter filter-even
             (even? (t/first tuple)))

A configurable filter

In this case the macro will define a function, when called with a divisor parameter it returns a ClojureFilter object.

(t/deffilter filter-by {:params [divisor]}
             (= 0 (mod (t/first tuple)

Defining a prepare method for operations

To perform state initialization (I use Component) when the operations are prepared, provide an implementation of prepare like so:

(t/deffilter filter-type {:params  [divisor]
                          :prepare true}
             [conf context]
            (prn "preperation done here" divisor)
                 (isKeep [tuple]
                         (= 0 (mod (t/first tuple)

The body is expected to be the prepare function and it must return a filter implementation. This makes more sense if you look at the details of ClojureFilter. There is a second possibility, where the prepare method is explicitly defined within the filter:

Defining a prepare method for operations, long-form

(t/deffilter filter-type {:params  [divisor]
                          :prepare true}
             [conf context]
             (prn "preperation done here" divisor)
                 (prepare [conf context]
                          (prn "preparing again!"))
                 (isKeep [tuple]
                         (= 0 (mod (t/first tuple)

I’m not sure there’s much difference between the two, other than you could also define a cleanup method via this form if you wanted to.

Marceline and AOT

You must AOT any namespace that includes DSL for creating filters, functions, and so on, otherwise you’ll incur unbound-fn errors similar to the one I describe when defining trident operatins using defrecord. To be honest I’m not sure why, I raised a ticket and the friendly Yieldbot team replied they thought it was to do with reifying interfaces and serialization, but if we take the simplest case that will fail with an unbound-fn error:

(t/deffilter filter-type

(defn ->topology
  (let [topology (TridentTopology.)
        spout (TransactionalTridentKafkaSpout.
                       (ZkHosts. "zk-connect-url") "a-topic"))]
    (-> (t/new-stream topology "zk-tx-id" spout)
          (t/each ["bytes"] a-filter)
          (t/parallelism-hint 2))
       (.build topology)))

When deploying to a live cluster, we’ll find:

java.lang.RuntimeException: java.lang.IllegalStateException: Attempting to call unbound fn: #'filter/filter__

Now, that filter__ function which is unbound is created by the macro:

        '(t/deffilter filter-type
   [conf1862 context1863]
   (marceline.storm.trident/filter (isKeep [tuple] true))))
  (marceline.storm.trident/clojure-filter filter-type__ [])))

The macro also def’s something called filter-type, which in this case is a ClojureFilter shim that is passed the namespace and function name “filter__”. The Java shim is serialized across the wire, and deserialized in the worker and its prepare method is called when the topology is initialized. That prepare method then calls the “filter__” function, which fails as unbound.

That function goes on to reify a trident Filter, but the de/serialization is done at that point, and the unbound function failure has already occured, so I’m really not sure what the cause of this error is, though I guess it’s related to this Clojure bug.

It irks me not knowing, so if you do please share. If I AOT the namespace everything works fine, but I’d rather understand the problem than avoid it.