> Mike Valenty

Unit Testing Scalding Jobs

| Comments

Scalding is a powerful framework for writing complex data processing applications on Apache Hadoop. It’s concise and expressive – almost to a fault. It’s dangerously easy to pack gobs of subtle business logic into just a few lines of code. If you’re writing real data processing applications and not just ad-hoc reports, unit testing is a must. However tests can get unwieldy to manage as job complexity grows and the arity of data increases.

For example, consider this scalding job:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class ComplicatedJob(args: Args) extends Job(args) {

  val bloatedTsv = Tsv("input",
    ('user_id,
      'timestamp,
      'host,
      'referer,
      'remote_addr,
      'user_agent,
      'cookie,
      'connection,
      'accept_encoding,
      'accept_language)).read

  bloatedTsv
    .map('timestamp -> 'timestamp) { ts: String => toDateTime(ts) }
    .filter('timestamp) { ts: DateTime => ts.isAfter(DateTime.now.minusDays(30)) }
    .map('user_agent -> 'browser) { userAgent: String => toBrowser(userAgent) }
    .map('remote_addr -> 'country) { ip: String => toCountry(ip) }
    .map('country -> 'country) { c: String => if (c == "us") c else "other" }
    .groupBy('browser, 'country) { _.size('count) }
    .groupBy('browser) { _.pivot(('country, 'count) ->('us, 'other)) }
    .write(Tsv("output"))

  def toDateTime(ts: String): DateTime = { ... }

  ...
}

Testing this job end-to-end would be fragile because there is so much going on and it would be tedious and noisy to build fake data to isolate and highlight edge cases. The pivot operations on lines 20-22 only deal with browser and country yet test data with all 10 fields is required including valid timestamps and user agents just to get to the pivot logic.

There are a few ways to tackle this and an approach I like is to use extension methods to breakdown the logic into smaller chunks of testable code. The result might look something like this.

1
2
3
4
5
6
7
8
9
10
11
class ComplicatedJob(args: Args) extends Job(args) {

  ...

  bloatedTsv
    .timestampIsAfter(DateTime.now.minusDays(30))
    .userAgentToBrowser()
    .remoteAddrToCountry()
    .countCountryByBrowser()
    .write(Tsv("output"))
}

Each block of code depends on only a few fields so it doesn’t require mocking the entire input set.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import Dsl._

object ComplicatedJob {

  implicit class ComplicatedJobRichPipe(pipe: Pipe) {

    // this chunk of code is testable in isolation
    def countCountryByBrowser(): Pipe = {
      pipe
        .map('country -> 'country) { c: String => if (c == "us") c else "other" }
        .groupBy('browser, 'country) { _.size('count) }
        .groupBy('browser) { _.pivot(('country, 'count) ->('us, 'other)) }
    }

    ...
  }

}

In this example only browser and country are required so setting up test data is reasonably painless and the intent of the test case isn’t lost in a sea of tuples. Granted, this approach requires creating a helper job to set up the input and capture the output for test assertions, but I think it’s a worthwhile trade off to reveal such a clear test case.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import ComplicatedJob._
import ComplicatedJobTests._

@RunWith(classOf[JUnitRunner])
class ComplicatedJobTests extends FunSuite with ShouldMatchers {

  test("should count and pivot rows into columns") {

    val input = List[InputTuple](
      ("firefox", "us"),
      ("chrome", "us"),
      ("safari", "us"),
      ("firefox", "us"),
      ("firefox", "br"),
      ("chrome", "de")
    )

    val expected = Set[OutputTuple](
      ("firefox", 2, 1),
      ("safari", 1, 0),
      ("chrome", 1, 1)
    )

    count(input) { _.toSet should equal(expected) }
  }

}

object ComplicatedJobTests {
  type InputTuple = (String, String)
  type OutputTuple = (String, Int, Int)

  // this is a helper job to set up the inputs and outputs
  // for the chunk of code we're trying to test
  class CountCountryByBrowser(args: Args) extends Job(args) {

    Tsv("input", ('browser, 'country))
      .read
      .countCountryByBrowser() // this is what we're testing
      .project('browser, 'us, 'other)
      .write(Tsv("output"))

  }

  // helper method to run our test job
  def count(input: List[InputTuple])(fn: List[OutputTuple] => Unit) {
    import Dsl._
    JobTest[CountCountryByBrowser]
      .source(Tsv("input", ('browser, 'country)), input)
      .sink[OutputTuple](Tsv("output")) { b => fn(b.toList) }
      .run
      .finish
  }
}

Parsing Json With Defaults in Scala

| Comments

The json library in Play does this thing where it explodes if the json you’re reading is missing a property. Gson would happily leave the property null, but that’s just not the scala way. In scala, if something is optional, it’s wrapped in an Option. The problem is I’d like to add a property to my data model and I don’t want it to be optional because it’s not.

In Java land, I probably would have added the property and thought about how to deal with migrating old data later, but not in scala. This battle has to be fought right now. That’s what type safe means and it’s why the NullPointerException has all but died in pure scala apps, not unlike the carpal tunnel epidemic.

In my case I’ve got some data in Couchbase and when I add a new property to my data model, I won’t be able to read the old data. What I need to do is transform the json before hydrating the object. Fortunately, this is a snap by using the rich transformation features described in JSON Coast-to-Coast.

My approach was to create a subclass of Reads[A] called ReadsWithDefaults[A] that reads json and uses a transformation to merge default values. It looks like this:

1
2
3
4
5
6
7
8
9
10
case class MyObject(color: String, shape: String)

object MyObject {

    val defaults = MyObject("blue", "square")

    implicit val readsMyObject = new ReadsWithDefaults(defaults)(Json.format[MyObject])

    implicit val writesMyObject = new Json.writes[MyObject]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import play.api.libs.json._
import play.api.libs.json.Reads._

class ReadsWithDefaults[A](defaults: A)(implicit format: Format[A]) extends Reads[A] {

  private val mergeWithDefaults = {
    val defaultJson = Json.fromJson[JsObject](Json.toJson(defaults)).get
    __.json.update(of[JsObject] map {o => defaultJson ++ o})
  }

  def reads(json: JsValue) = {
    val jsObject = json.transform(mergeWithDefaults).get
    Json.fromJson[A](jsObject)(format)
  }
}

Web Framework Scaffolding Considered Harmful

| Comments

If you’re a start up and spending time building back office tools to input application data into web forms, you’re doing it wrong. Sure, it’s a pretty simple task to build a few CRUD screens using the hottest new MVC framework, but you can do better.

Integrating with Google spreadsheets to manage application data unlocks powerful possibilities and will take you further than anything you could scaffold up in a comperable amount of time. To be clear, I’m not suggesting you take a runtime dependency on Google docs. I’m talking about using a lightweight business process that takes advantage of the rich collaborations features of Google docs and simply import the data into your database using the API.

Revision history

Google docs keeps a revision history of what was changed, when and by whom. That’s a pretty cool feature and one you’re not likely to build when there are more pressing customer facing features to work on.

Sometimes release notes aren’t detailed enough when trying to correlate a surprising change in a KPI, so it’s reassuring to have the nitty gritty revision history there when you need to dig a little deeper. It’s just like reviewing source control history when tracking down a bug that was introduced in the latest code release. Your data deserves the same respect.

Concurrent editing

My older brother carries a pocket knife at all times, and when he believes in something, he can’t help but sell everyone else on the idea. He used to tell me I needed to carry a pocket knife so I could cut off my seatbelt when trapped in a burning (or sinking) car. In his defense, GM recalled 240,000 SUVs in 2010 citing “…[the seat belt] may seem to be jammed.”

Naturally, I started carrying a pocket knife too. I figured that along with untrapping myself from a burning car or defending myself at an ATM, it would be handy when I needed to open a box. To my surprise, new opportunities presented themselves and I was using my knife several times a day. I could cut off a hanging thread, scrape bee pollen off my windsheild, improve the vent in my coffee lid, remove a scratchy tag from my son’s shirt and yes, open a box.

It turns out I had a similar experience with concurrent editing. Working on the same document at the same time with multiple people is pretty cool, and you’d be surprised how this powerful ability can change the way you work. There’s no way you would justify an extravagant feature like this in your own back office tools and you get it for free with Google docs.

Formulas

Since it’s a spreadsheet, you can use formulas and this is cooler than you might think. Consider a list of products. Chances are the product prices have some kind of relationship that you can capture with a formula and save yourself from tedious and error prone manual entry.

In reality though, you probably don’t care about the actual product prices as much as you care about your margins or some other derived number. With a spreadsheet, you can keep your formula together with your data. When you change a price, you can see how it affects your bottom line in a calculated field a few cells over. Or work backwards and calculate the price based on a combination of other factors. Or just use a forumla as a sanity check that all your numbers add up to the right amount.

Again, this is the kind of feature that invites new ways of working. You might use a formula to find a bug before the data ever hits your app or add a new forumla so you don’t get burned twice by the same mistake. A spreadsheet gives you a place to capture knowledge about your data. You could capture this knowledge in a wiki, but the fact that it’s in a Google spreadsheet that’s connected to your app, makes it real. It’s the difference between a code comment and a unit test.

Import

A data update is a deployment just like a code deployment and things can go horribly wrong when there’s a mistake in your data. Big companies get this and respond with change advisory boards and more process to protect themselves, but you can do better.

Your data deserves a repeatable one-click deployment process and you shouldn’t settle for anything less. An import is idempotent which means you can run it multiple times and the end result is always the same. In other words, you can practice in a test environment and iterate until you get it right. Oh, and if you spin off a copy prior to making changes, you get one-click rollback too.

Conclusion

One-click deployment and rollback with revision history for your data without developer interaction. That’s a really big deal because the end result is confidence. Confidence means that as an organization, you can make decisions, execute and not look back.

Empower your team (not just developers) with a familiar tool like a spreadsheet and give them the opportunity to impress you. Watch as calculations and charts emerge to add deeper meaning to your data. Watch as collaboration occurs and team members are brought together rather than divided by functional roles. Watch as confidence increases in your ability to deploy changes.

Seriously. Don’t waste your time half-assing back office tools when you can invest a similar amount of effort to import data from Google spreadsheets. It’s a scrappy tool that delivers on the features that accelerate the way you work.

If you can do a half-assed job of anything, you’re a one-eyed man in a kingdom of the blind. – Kurt Vonnegut

Using Scala to Perform a Multi-Key Get on a Couchbase View

| Comments

To retrieve documents from Couchbase by anything other than the document key requires querying a view and views are defined by map and reduce functions written in JavaScript. Consider a view that returns data like this:

1
2
3
4
5
6
key         value
---         -----
"key1"      { "term": "red", "count": 2 }
"key2"      { "term": "red", "count": 1 }
"key3"      { "term": "blue", "count": 4 }
...

And this Scala case class to hold the documents retrieved from the view.

1
case class TermOccurrence(term: String, count: Int)

It’s a common scenario to retrieve multiple documents at once and the Java driver has a pretty straight forward api for that. The desired keys are simply specified as a json array.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import com.couchbase.client.CouchbaseClient
import play.api.libs.json.Json
import CouchbaseExtensions._

@Log
def findByTerms(terms: List[String]): List[TermOccurrence] = {

  val keys = Json.stringify(Json.toJson(terms map (_.toLowerCase)))

  val view = client.getView("view1", "by_term")
  val query = new Query()
  query.setIncludeDocs(false)
  query.setKeys(keys)

  val response = client.query(view, query).asScala

  response.toList map (_.as[TermOccurrence])
}

The Java driver deals with strings so it’s up to the client application to handle the json parsing. That was an excellent design decision and makes using the Java driver from Scala pretty painless. I’m using the Play Framework json libraries and an extension method _.as[TermOccurrence] defined on ViewRow to simplify the mapping of the response documents to Scala objects.

1
2
3
4
5
6
7
8
9
10
11
object CouchbaseExtensions {

  implicit class RichViewRow(row: ViewRow) {
    def as[A](implicit format: Format[A]): A = {
      val document = row.getValue
      val modelJsValue = Json.parse(document)
      Json.fromJson[A](modelJsValue).get
    }
  }

}

In order for this extension method to work, it requires an implicit Format[TermOccurrence] which is defined on of the TermOccurrence compainion object.

1
2
3
object TermOccurrence {
  implicit val formatTermOccurrence = Json.format[TermOccurrence]
}

Hadoop MapReduce Join Optimization With a Bloom Filter

| Comments

Doing a join in hadoop with Java is painful. A one-liner in Pig Latin can easily explode into hundreds of lines of Java. However, the additional control in Java can yield significant performance gains and simplify complex logic that is difficult to express in Pig Latin.

In my case, the left side of the join contained about 100K records while the right side was closer to 1B. Emitting all join keys from the mapper means that all 1B records from the right side of the join are shuffled, sorted and sent to a reducer. The reducer then ends up discarding most of join keys that don’t match the left side.

Any best practices guide will tell you to push more work into the mapper. In the case of a join, that means dropping records in the mapper that will end up getting dropped by the reducer anyway. In order to do that, the mapper needs to know if a particular join key exists on the left hand side.

An easy way to accomplish this is to put the smaller dataset into the DistributedCache and then load all the join keys into a HashSet that the mapper can do a lookup against.

1
2
3
for (FileStatus status : fileSystem.globStatus(theSmallSideOfTheJoin)) {
    DistributedCache.addCacheFile(status.getPath().toUri(), job.getConfiguration());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
protected void setup(Mapper.Context context) {
    buildJoinKeyHashMap(context);
}

@Override
protected void map(LongWritable key, Text value, Context context) {

  ...

    if (!joinKeys.contains(joinKey))
      return;

    ...

    context.write(outKey, outVal);
}

This totally works, but consumes enough memory that I was occassionally getting java.lang.OutOfMemoryError: Java heap space from the mappers. Enter the Bloom filter.

A Bloom filter is a space-efficient probabilistic data structure that is used to test whether an element is a member of a set. False positive matches are possible, but false negatives are not. Elements can be added to the set, but not removed. The more elements that are added to the set, the larger the probability of false positives. -Wikipedia

I hadn’t heard of a Bloom filter before taking Algorithms: Design and Analysis, Part 1. If not for the course, I’m pretty sure I would have skimmed over the innocuous reference while pilfering around the hadoop documentation. Fortunately, recent exposure made the term jump out at me and I quickly recognized it was exactly what I was looking for.

When I took the course, I thought the Bloom filter was an interesting idea that I wasn’t likely to use anytime soon because I haven’t needed one yet and I’ve been programming professionally for more than a few years. But you don’t know what you don’t know, right? It’s like thinking about buying a car you didn’t notice before and now seeing it everywhere.

Configuration

The documentation is thin, with little more than variable names to glean meaning from.

1
2
3
public BloomFilter(int vectorSize,
                   int nbHash,
                   int hashType)
  • vectorSize – The vector size of this filter.
  • nbHash – The number of hash function to consider.
  • hashType – type of the hashing function (see Hash).

I know what you’re thinking. What could be more helpful than The vector size of this filter as a description for vectorSize? Well, the basic idea is there’s a trade-off between space, speed and probability of a false positive. Here’s how I think about it:

  • vectorSize – The amount of memory used to store hash keys. Larger values are less likey to yield false positives. If the value is too large, you might as well use a HashSet.
  • nbHash – The number of times to hash the key. Larger numbers are less likely to yeild false positives at the expense of additional computation effort. Expect deminishing returns on larger values.
  • hashType – type of the hashing function (see Hash). The Hash documentation was reasonable so I’m not going to add anything.

I used trial and error to figure out numbers that were good for my constraints.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class BloomFilterTests {
    private static BloomFilter bloomFilter;
    private static String knownKey = newGuid();
    private static int numberOfKeys = 500000;

    @BeforeClass
    public static void before() {
        bloomFilter = new BloomFilter(numberOfKeys * 20, 8, Hash.MURMUR_HASH);
        bloomFilter.add(newKey(knownKey));

        for (int i = 0; i < numberOfKeys; i++)
            bloomFilter.add(newKey(newGuid()));
    }

    @Test
    public void should_contain_known_key() {
        assertThat(hasKey(knownKey), is(true));
    }

    @Test
    public void false_positive_probability_should_be_low() {

        int count = 0;
        for (int i = 0; i < numberOfKeys; i++)
            if (hasKey(newGuid()))
                count++;

        int onePercent = (int) (numberOfKeys * .01);

        assertThat(count, is(lessThan(onePercent)));
    }

    private static String newGuid() {
        return UUID.randomUUID().toString();
    }

    private static Key newKey(String key) {
        return new Key(key.getBytes());
    }

    private boolean hasKey(String key) {
        return bloomFilter.membershipTest(newKey(key));
    }
}

When you have your numbers worked out, simply swap out the HashMap with the BloomFilter and then blog about it.