Treat time series data with Hadoop Streaming and Ruby

Hi everyone,

This is my second post, and I will speak about how to treat time ordered data with hadoop streaming and wukong (you can discover more about wukong in my last post).

So to easily and efficiently treat this type of data, the solution that I used is a feature of mapreduce: the partitioner.

So for example, let’s say you have some logs, for your website, with the following format:

Then you can easily use the following code to count the number of unique visitors for each page over time:

So let me explain you some things:

  • first of all, the mapper: it’s empty, because the mapper is only used to treat each line idependently from each other, so it does not need to do anything in our case
  • second of all, the reducer: I am here using the accumulating reducer. It’s a build in streamer from wukong wich let you declare how you are going to split the input stream by key. Unfortunatly, you still need to tell hadoop how you are going to sort and partition each line, and this is where come the partitioner
  • third of all, the parameters in you can see that I am using: : how many fields used to sort data
    - mapred.text.key.partitioner.options : how many fields used to partition data among reducer.
    You can find the documentation about the KeyFieldBasedPartitioner here.

So as you can see, each reducer will receive all the lines for a page_id (the key is composed of page_id), but the data received by the reducer will be sorted by 2 fields: page_id and page_view_date. The data received by the reducer will be correctly sorted by date, which give us the possibility to calculate unique users for each date and each page.

This code can obviously be enhanced by, for example, considering that a user is unique if he didn’t visit for the last 30 days, or you can use bloom filter if you are a big website (meaning a lot of different user_ids) and don’t need absolutely accurate results.

I hope it will save times to some of you, and if you have any questions, you can ask them in the comments, I will be glad to answer them as good as I can.

Wukong : easy hadoop in Ruby

During my intership in Mogreet, I could do some big data stuff that I always was passionate about. By the way, Mogreet is an awesome company : a startup in Venice, Los Angeles, California, at less than 1 mile from Venice beach. What could be worse?

Back to ruby, hadoop and wukong. First of all : what is wukong?

Wukong: Hadoop made so easy a Chimpanzee could run it.

Treat your dataset like a

  • stream of lines when it’s efficient to process by lines
  • stream of field arrays when it’s efficient to deal directly with fields
  • stream of lightweight objects when it’s efficient to deal with objects

And now the $1 million question : why use a slow langage such as Ruby, instead of java to write some hadoop jobs?

  • First of all, if you’re in the same case as me (which is : you’re in a ruby shop where nobody know much about java), why would you want to introduce more complexity with a new and not so simple langage?.
  • Second of all, ruby is sexy! I mean, I first try to write jobs in java, but there were so much code to treat some simple maps! Whereas in ruby, it’s just too easy and really fast.

Let’s compare the same two jobs, one written in Ruby with Wukong and the other one in Java. The jobs will do the Hello World of hadoop : the word count example.

Word count in Java :

Word count in Ruby + Wukong :

And… that’s it.

But after seeing the awesomeness of this way to write hadoop jobs, let’s see what are the drawbacks :

  • First of all : Wukong encapsulate Hadoop Streaming (which is the way to write hadoop jobs with any langage), but wukong is not perfect, and for example, it does not support combiner by default. You can find my fork on wukong on github where I added combiner support, but it’s not retrocompatible with wukong official release.
  • Second of all : because it’s not java, it will often be slower than pure Java jobs.
  • Third of all : you will not have all hadoop refinement such as sequencefile and compressed input/output.
  • Last of all : because hadoop streaming use stdin and stdout to “discuss” with your ruby script, if you need to work with big datastructure, you will need up to 3 times the memory size of the structures (structure memory in your script + structure string representation in your script + structure string representation in hadoop streaming). So it might not work for really big datastructures.

Next time, I will present how to treat time series data with hadoop and wukong. My example will be based on calculating unique visitors for a website.