Thursday, May 29, 2008

Including other jars in your hadoop job jar

I learned that you can include other jars in your hadoop job jar by placing them in the lib/ directory under your job jar. Very nice and convenient!

Now, if I can only make the DistributedCache work. Right now, it just isn't.

Monday, May 26, 2008

Record Linkage Papers

I have created a site where I am putting up links to record linkage papers. I hope to include comments on them soon. If you have a paper you'd like put on the site, just leave me a comment.

Saturday, May 24, 2008

Min-Hash signature for bigrams

I don't believe there is a decent description of how to create a min-hash signature for bigrams on the web, so I'm going to try and provide one. Of course, my description will probably be flawed, but I hope that it will be better than what is out there currently.

First, what is a min-hash signature?

The idea is that, given two records, you can provide signatures such that the similarity of the signatures is approximately equal to the similarity of the records.

Here is a slide show based around using min-hash signature to provide keys for indexing into a database. Here is the relevant paper.

So, basically it is a way of generating a fuzzy key such that if the two keys match there is a high probability that the two records will match.

We're going to examine a way of doing this using bigrams.

Let's assume we have the local part of an email address. In our base file, we have the following entries:

tgibbs
tanton_gibbs
bbarker
tkieth
bspears

If we look at the bigrams generated for each local part, we have the following:
tgibbs = { tg, gi, ib, bb, bs }
tanton_gibbs = {ta, an, nt, to, on, n_, _g, gi, ib, bb, bs}
gbarker = {gb, ba, ar, rk, ke, er}
tkisth = {tk, ki, is, et, th}
bspears = {bs, sp, pe, ea, ar, rs}

If we order all the bigrams alphabetically, then we have the following universe of bigrams: {an, ba, bb, bs, ... th}

Now, a min hash signature requires an input f that tells how many hash functions to use. Let's set f to 3. That means we'll hash each record in 3 different ways.

To generate a hash function, we randomly permute the bigram universe.

So, our first permutation may look like:

{tk, ib, an, pe, ... rs}

our second permutation may look like:

{rk, gi, bs, th, ... pe}

and our third permutation may look like:

{bb, ba, er, ke, ... an}

Now, the next thing we need is a similarity threshold, t. Let's assume t is 0.8.
So, for each record, we will use 80% of the bigrams to produce the hash.

So, if our input record is tgbbis, then we have the following bigrams to choose from
{tg, gb, bb, bi, is} We would choose 3 or 4 bigrams (probably both) to produce the hash. But, which 3 or 4 do we choose?

For each permutation of our bigram universe that we produced above, we will pick the bigrams that appear the earliest. So, for permutation 1, we might end up choosing {gb, bi, is}. For permutation 2, we might end up with {bi, tg, bb}. For permutation 3, we might have {bb, is, gb}. We would do the same thing for combinations of 4 bigrams.

Now, we can create a string from the bigrams {gbbiis, bitgbb, bbisgb}. These strings become our prospecting keys. We perform the same key generation routine on the base records. Then we can join our input keys to our base record keys to find our match candidates.

If we want a more approximate min-hash signature, we could sort the bigrams before creating the key so that we would have the keys {bigbis, bbbitg, bbgbis}. Obviously, the duplicate key could be thrown away. This has the effect of handling more transpositions in the input at the cost of bringing back more candidates.

Hopefully this illuminates min-hash signatures a bit more so that the references above make sense.

Friday, May 23, 2008

pig.properties

If you are going to set up pig, you need to be aware of the pig.properties file.

It has things like the cluster setting, which is important to set correctly. It also has things like whether or not to run it locally or remotely.

See this thread on the Pig mailing list for how to set the cluster setting:

Thursday, May 22, 2008

Pig redux

I rewrote my original pig script to use a local foreach block.

It now looks like:

A = load 'myInputFile' using PigStorage(',') as (seqid, cl, local, domain, fn, ln, zip, email);
B = foreach A generate flatten(SortChars(local)) as sorted_local, cl;
C = GROUP B by sorted_local PARALLEL 10;
D = FOREACH C {
CLS = B.cl;
DCLS = DISTINCT CLS;
GENERATE group, COUNT(DCLS) as clCount;
}
E = FILTER D by clCount > 1;
F = FOREACH E GENERATE group;
store F into 'myDir' using PigStorage();

A, B, and C are the same as before. However, we now push the distinct into the FOREACH loop for D. This allows Pig to use a DistinctBag to dedup the CLs for a sorted_local part. Before, the DISTINCT had to use a separate map-reduce step; however, now it can do it all in only one map-reduce job. This effectively halves the time the script takes.

For more on how things get executed, you can use the explain option. For instance, explain F; will show how F gets executed. More information can also be found on the pig wiki.

Pig continued

Now that I have the sorted local parts of an email address, I want to know which emails are identified by those sorted local parts. So, I know that bbgist has more than one CL associated with it. Now, I want to know that tgibbs and tgbbis both go to bbgist. Also, I only want those emails with more than one CL, so I don't want tgobbs because it (let's pretend) only associated with one CL.

Here is the pig script to do that:

register /home/tgibbs/pig/pig/myPigFunctions.jar
A = load 'myFile' using PigStorage(',') as (seqid, cl, local, domain, fn, ln, email);
B = load 'mySortedLocalFile' using PigStorage() as (sorted_local);
C = foreach A generate local;
D = DISTINCT C PARALLEL 10;
E = FOREACH D generate local, flatten(SortChars(local)) as sorted_local;
F = JOIN E by sorted_local, B by sorted_local PARALLEL 10;
G = FOREACH F generate local;
store G into '/user/tgibbs/eproducts-local-merged-out' using PigStorage();

On line 1, once again, I register my SortChars function that is in my jar file.
Line 2 loads the original email file.
Line 3 loads the file of sorted local parts that I created last blog entry.
Line 4 reduces the original email file down to just the local part.
Line 5 (the D line) gets rid of any duplicate local parts, since I don't care that tgibbs appears 10 times and it reduces the amount of data I have to deal with later.
Line 6 (the E line) generates the sorted version of the local part and keeps the original local part as well.
Line 7 (F) joins the two files by the sorted local portion. It's schema looks something like (E::local, sorted_local, sorted_local).
As an example, it would have (tgibbs, bbgist, bbgist) and (tgbbis, bbgist, bbgist).
Line 8 (G) just gets the local part, which is what I want to store and the last line stores it.

Next up, I plan to try out the Illustrate command which should display the execution plan.

Mahout

Just a quick post to point out Mahout. Mahout is a machine learning library built on top of Hadoop. They are still fairly pre-alpha, but they already have some interesting algorithms developed. I plan on trying out their canopy clustering algorithm relatively soon.

On a second note, I've gotten Pig up and running and have successfully run some large jobs. Woot!

Here is a pig script to count the number of AbiliTec Consumer Links associated with the sorted local part of an email address.

In other words, if there were two emails tgibbs@blah.com and tgbbis@blah.com, the sorted local part of both would be bbgist. If they were each associated with a different consumer link, then the following pig script would output bbgist. Otherwise, it would not.

register /home/tgibbs/pig/pig/myPigFunctions.jar
A = LOAD '/user/tgibbs/eproducts' USING PigStorage(',') AS (seqid, cl, local, domain, fn, ln, zip, email);
B = FOREACH A GENERATE FLATTEN(SortChars(local)) as sorted_local, cl;
BDIST = DISTINCT B PARALLEL 10;
C = GROUP BDIST BY sorted_local PARALLEL 10;
D = FILTER C BY COUNT(BDIST.cl) > 1;
E = FOREACH D GENERATE group;
STORE E INTO '/user/tgibbs/eproducts-sorted-local-out' USING PigStorage();

The register line loads a custom jar file so that I can call my custom functions. More on that later.

The next line, the assignment of A, just reads a comma delimited file into A. It also associates a name with each of the comma delimited fields. So, field 0 is seqid, field 1 is cl, etc...

The third line loops through each record in A (that's the FOREACH) and sorts the characters in the local part. I wrote the SortChars class and will post that at the end of this entry. The FLATTEN is needed because SortChars returns a tuple and since I only have one element going in, there is only one element coming out and I want to treat that one element as an atomic data item instead of as a tuple . The 'as sorted_local' portion renames the data item.

The type of B is now a tuple (sorted_local, cl).

The next line eliminates duplicates. So, if two records have the same sorted local part AND cl, then we can safely erase the duplicate because it will not affect our final count. In fact, we'll need to eliminate it for the rest of our logic to work. The PARALLEL keyword ups the number of reduces. This means we'll end up with 10 output files instead of 1, but that's ok because we'll process them all later, anyway.

In the end BDIST has the same type as B (sorted_local, cl).

The line that aliases C groups BDIST by its sorted_local part. This basically creates one record for each distinct sorted_local part. The value is a data bag that contains one BDIST record for each distinct sorted local part.

So, C now has the type (group, BDIST:{sorted_local, cl}).

The D line gets rid of any groups that have only one cl. D has the same type as C.

The E line gets only the groups, ignoring the actual values. And the store line writes those groups out. So, now I have one group for every sorted local part that has more than one CL. How cool is that!

Here is my PigFunction for sorting the characters:

import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataBag;
import org.apache.pig.EvalFunc;
import java.util.Arrays;
import java.io.IOException;

public class SortChars extends EvalFunc {
@Override
public void exec(Tuple input, Tuple output) throws IOException {
String str = input.getAtomField(0).strval();
byte[] bytes = str.getBytes();
Arrays.sort(bytes);
str = new String(bytes);
Tuple newOut = new Tuple(str);
output.copyFrom(newOut);
}
}

Wednesday, May 21, 2008

Pig

Pig is Yahoo!'s data flow language that is designed to run atop hadoop. I've spent a few hours today getting it set up and running. One thing I would like to point out is that you can't run the pig script in the bin directory (or at least not and connect to the hadoop cluster).

I had to manually run:
java -cp pig.jar:$HADOOPSITECONFIG org.apache.pig.Main

Also, if you dump a variable, it has to run the map phases to get to it. I thought it would just tell the schema, but no....

Hadoop Streaming

Hadoop Streaming appears to be a way to write quick hadoop jobs. I've recently been playing with it and have finally gotten it to work for me.

The main parameter that I had to add was -jobconf stream.shipped.hadoopstreaming=$HADOOP_HOME/contrib/streaming

It was somehow getting set to /tmp which was causing everything in my /tmp directory to get added to the job jar it generates.

Another good thing to keep in mind is the -verbose flag. It can help figure out what is going on under the hood.

Tuesday, May 20, 2008

Multi-User Hadoop

I've been setting up hadoop on a few (6) boxes and have been attempting to make it work for multiple users. It is not as easy as it sounds because the docs are a bit spread out.

Nevertheless, if everyone is in the same group, then you need to set your default group to represent that.


dfs.permissions.supergroup
groupname


You'll also need to change the group of any files you've already created.
hadoop dfs chgrp -R groupname /

Next, to allow multiple users to run mapreduce jobs, you'll need to set your configuration directory to be a place that is accessible to ALL the boxes. I'm using an nfs mount, but you could use hdfs just as easily.


mapred.system.dir
/mnt/myNFSMount/hadoop/mapred/system


Make sure that directory exists and is writable by the group mentioned above (or at least that all your mapreduce users can write to it).

At this point, that is all that I know needs to be changed. I have two people (including myself) using our hadoop cluster. So, I'll let you know as we run into more problems.

Saturday, May 03, 2008

Linear Algebra

A great undergraduate linear algebra class from MIT's Open Courseware program can be found here.

Other MIT Open Courseware courses with heavy video/audio content can be found here.