Bigg(ish) Data: Working the Middle ground between RAM and a Cluster
We all want fast answers, even as our data sets grow in size. Simple techniques stop working when our data no longer fits in a single process. In this talk I'd like to share a few of the techniques I've learned that stretch what you can do on a single machine. I'll cover taking random samples from large data sets with stream sampling and identifying (probable) duplicates when the set is too large to fit in memory.
NB: I'm the process of updating this talk.
- data processing techniques for single processes
- stream sampling
- bloom filters
- clojure and distributed data processing
- getting a REPL on your Storm topology
- ? connecting a REPL to Spark
...tell a story...
-
When RAM is not enough
There are a couple of algorithms that I’ve learned that help stretch what can be done in a single process. They help you use a fixed amount of memory to achieve the same outcomes without pulling your entire data set into memory.
These started out as interesting algorithms when I read about them, I didn't know where or when I was going to use them. Knowing about them let me start to see where they could be applied. That's one of the things I love about learning new techniques.
Two of the techniques I'm going to cover in this talk are: taking random samples and detecting duplicates in large data sets. These techniques let you work with data that's too large to fit in memory in a single process, while still staying in a single process.
-
Mission: Probable
A few years ago, my company purchased a large data set and wanted to get a sense for how good the data was. We needed to pull a random sample of the data and verify it. The verification was too manual and expensive to do for the all of the data.
The first thing that comes to mind is to iterate over the data and emit a record with a 20k out of 400M probability. This is pretty quick, but it doesn't guarantee that we'll get the exact sample size we're looking for.
Lets see, what else comes to mind?
-
Flip a Coin?
We could iterate through the data set, generate a random number for each element and if it's less than 20k out of 400M, select that element. This will let us do one pass over the data, though we will be over or under our sample size by some small amount. If we have random access to the elements, we could generate random indexes into the 400M and select elements that way. We'd have to be a little careful not to select the same record more than once. Can we refine the ‘flip a coin’ approach? If you flip a coin 1/M for each element there is a chance you won’t hit your sample size. Should we choosing a random index until we hit your sample size? That’s sampling with replacement, which may result in duplicates. Was that our last hope? No, there is another.
h1. Running the example code
You need to have Java installed and configured. You'll need two utilities: "bake":https://github.com/kyleburton/bake and "leiningen":https://github.com/technomancy/leiningen. You can get both of these by executing the following commands:
test -d $HOME/bin || mkdir $HOME/bin test -x $HOME/bin/bake || curl https://raw.githubusercontent.com/kyleburton/bake/master/bake > $HOME/bin/bake chmod 755 $HOME/bin/bake test -x $HOME/bin/lein || curl https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein > $HOME/bin/lein chmod 755 $HOME/bin/lein export PATH=$HOME/bin:$PATH bake emacs-install-cider bake emacs-install-ac-cider bake get-sample-data
More fun:
================================================================================ h1. Beat the DB: Recipes for Large data Analysis with Clojure
Learn techniques for working with large data sets: those which are too large to fit in RAM, but not so large they require distributed computing. Take random samples and find highly duplicated values - without a database import.
I'll walk you through efficient recipes for: taking random samples in a single pass, finding highly duplicated values in a single pass using a bloom filter, and processing large data files in parallel - without preprocessing. These techniques let you avoid having to import the data sets into your database, and efficiently perform operations that would incur large memory or IO overhead in a database.
Though not specific to Clojure, many of the techniques lead to elegant, easy to understand implementations in Clojure by leveraging it's sequence abstraction, immutability and parallelization features.
Come hear about some new gadgets to add to your data munging utility belt!
I recently learned some new tools and techniques for working with large data sets – those which are too large to fit in RAM, but not so large you need distributed computing to work with them. I’ll discuss things like: taking random samples; finding duplicated values as well as other types of basic analysis.’
h2. How to Win the Random Sample Turkey Shoot
You have been asked to take a random sample of a few hundred million records in order to estimate some metrics of the data set as a whole.
[ dice and a turkey ]
h3. I Know: the database!
SELECT * FROM some_table ORDER BY RANDOM() LIMIT 1000
[ HUGE red X ]
BZZT! You just wasted time fixing the other guy's data export errors, then you wasted time importing the data into your database only to find that the above query is estimated to take somewhere between two days and the heat death of the universe to execute on your workstation.
[ a clock face and a DB cylinder ]
h3. I'll use the 'GNU Signal!'
[ Bat-signal-like spotlight shining a huge GNU onto clouds? ]
Turns out you can, sort supports a 'random' flag:
sort -R
This is faster than the database...
uses up disk space on the order of your original data set [wasteful] lots of IO spent computing the output
[ unhappy anthropomorphic database ]
h3. "You Shall Not Pass!" (multiple times)
"Selection without replacement" Stream the data: odds are N/M of picking an element N is your desired sample set size M is the size of the population you haven't yet considered
Decrement M after considering each member of the population
Decrement N after you've chosen an element.
Sit back and bask in the Win.
[ relaxed person? the Reddit meme paint drawing / person? ]
One Machine, One Pass over the file, output on the order of your sample set size. The very definition of W.I.N. (Why did I Not think of that before)
h2. That Was My Hook...
I was advised to tell you something valuable in the first few minutes of the talk by a good friend of mine: Jonny Tran. I hope that counts...
h2. What do I mean by 'Large Data'?
When your data set, or the information you need to track to analyze it, fits into RAM, you can effectively ignore most of these techniques. Finding duplicated values in an array that already fits in memory is simple: you iterate through it keeping a table of the times you've seen each element. At worst (no duplicates), you'll use memory on the order of the size of the data set, plus the storage space it takes to store the count (an integer).
h2. Counting and Analysis
h3. Just Keep a HashMap/Table
[ HUGE red X ]
BZZT!
Remember: too big for RAM, possibly way to big.
h3. Next Utility Belt Gadget: Bloom Filters
h3. What's a Bloom Filter
h3. Find likely Dupes in 1 pass
Emit anything that's a HIT with the Bloom Filter.
You'll get Some false positives.
This is good enough for many of my use cases.
h3. Find Actual Dupes / Counts in 1 Pass
Use a HashMap to count the elements that are a hit.
Dump anything that has a count > 1 when you're done.
h3. Bonus: use proxy keys, not just natural keys
You can compute a value on the data before testing it vs the bloom filter.
h2. Records and Identity
Identity within the file.
Many of the dumps I receive have no persistent identifiers.
Many have no relative identifiers!
Give each record an ID.
[ seq util that adds a counter to each record ]
You can just count up from 1, or if you want to get fancy, use an LFSR.
h2. What's an LFSR?
Linear Feedback Shift Register.
Class of PRNG.
Interesting subset: m-sequences (have maximal period)
Binary Numbering System - but non-sequential (remember: a prng)
Fast, easy to serialize, not a hash.
Keeps adjacent records from being close based on ID.
This is not an issue for software / machines: but may help humans looking at the data from assuming that IDs which are close have some meaning.
[ seq util that adds an lfsr id to each record ]
h2. Cheap Indexing Outside the Database / Grouping Records
Random Access
Really means ability to seek within the file, not really random.
Not a Streaming / Sequencing interface, boo
With a few Clojure functions over the indexes we can re-make an ordered sequence of records!
h2. Creating Indexes
Natual Keys are easy: just extract and return
indexer.clj supports > 1 key for each record
Proxy keys are easy, just compute the proxy keys for the row and return them (eg: soundex)
h2. Using Indexes
Interactive Lookups (yes, like a sql select)
Intersecting multiple indexes
h3. Getting Record Groupings
The sequence merge function.
h3. Fuzzy Duplicate Detection
Given a record grouping (based on proxy key), can we estimate similarity?
h2. Concurrency: Keeping your cores busy
h3. Processing a File: lines in parallel
(pmap some-fn (ds/read-lines some-file))
For heavyweight stuff this works but may not. Also, based on pmap's implementation (see: http://data-sorcery.org/2010/10/23/clojureconj/), you may not get the parallelization you anticipate if you don't keep the threadpool hot / busy. Please see David Edgar Liebke's slide deck for more details on this, but I've seen it happen.
h3. Creating simple work queues:
Break a large file down with GNU split split -l 10000 some-large-file.tab working-directory/inp-
Simple to create a work queue:
(pmap some-expensive-operation
(map str (filter #(.isFile %) (.listFiles (java.io.File. "working-directory/")))))
Recombine results:
for f in working-directory/outp-*; do
cat $f >> all-results.tab
done
h3. Do this w/o Leaving Clojure : Processing a file: By Chunks
(byte-partitions-at-line-boundaries "some-huge-file.tab" (* 5 1024 1024)) => ([start1 end1] [start2 end2] [start3 end3] ...)
These will be 'about' every 5Mb through the file. Then, turn each of those into a seq of lines:
(pmap (fn [[start end]] (doseq [line (read-lines-from-file-segment "some-huge-file.tab" start end)] (some-expensive-thing line) (byte-partitions-at-line-boundaries "some-huge-file.tab" (* 5 1024 1024)))
You can also do this pretty easily by line count as well, by using the Clojure built-in function: partition and @pmap@'ing over the results.
h2. Aside: @pmap@ and When it's worth it.
Concurrency has some overhead.
For cheap operations, this it outweighs the benefits.
h1. Special Thanks
Rob DiMarco for helping with the title and abstract for this talk.
Paul Santa Clara for being part of the learning process.
h1. Resources