On a second note, I've gotten Pig up and running and have successfully run some large jobs. Woot!
Here is a pig script to count the number of AbiliTec Consumer Links associated with the sorted local part of an email address.
In other words, if there were two emails tgibbs@blah.com and tgbbis@blah.com, the sorted local part of both would be bbgist. If they were each associated with a different consumer link, then the following pig script would output bbgist. Otherwise, it would not.
register /home/tgibbs/pig/pig/myPigFunctions.jar
A = LOAD '/user/tgibbs/eproducts' USING PigStorage(',') AS (seqid, cl, local, domain, fn, ln, zip, email);
B = FOREACH A GENERATE FLATTEN(SortChars(local)) as sorted_local, cl;
BDIST = DISTINCT B PARALLEL 10;
C = GROUP BDIST BY sorted_local PARALLEL 10;
D = FILTER C BY COUNT(BDIST.cl) > 1;
E = FOREACH D GENERATE group;
STORE E INTO '/user/tgibbs/eproducts-sorted-local-out' USING PigStorage();
The register line loads a custom jar file so that I can call my custom functions. More on that later.
The next line, the assignment of A, just reads a comma delimited file into A. It also associates a name with each of the comma delimited fields. So, field 0 is seqid, field 1 is cl, etc...
The third line loops through each record in A (that's the FOREACH) and sorts the characters in the local part. I wrote the SortChars class and will post that at the end of this entry. The FLATTEN is needed because SortChars returns a tuple and since I only have one element going in, there is only one element coming out and I want to treat that one element as an atomic data item instead of as a tuple . The 'as sorted_local' portion renames the data item.
The type of B is now a tuple (sorted_local, cl).
The next line eliminates duplicates. So, if two records have the same sorted local part AND cl, then we can safely erase the duplicate because it will not affect our final count. In fact, we'll need to eliminate it for the rest of our logic to work. The PARALLEL keyword ups the number of reduces. This means we'll end up with 10 output files instead of 1, but that's ok because we'll process them all later, anyway.
In the end BDIST has the same type as B (sorted_local, cl).
The line that aliases C groups BDIST by its sorted_local part. This basically creates one record for each distinct sorted_local part. The value is a data bag that contains one BDIST record for each distinct sorted local part.
So, C now has the type (group, BDIST:{sorted_local, cl}).
The D line gets rid of any groups that have only one cl. D has the same type as C.
The E line gets only the groups, ignoring the actual values. And the store line writes those groups out. So, now I have one group for every sorted local part that has more than one CL. How cool is that!
Here is my PigFunction for sorting the characters:
import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataBag;
import org.apache.pig.EvalFunc;
import java.util.Arrays;
import java.io.IOException;
public class SortChars extends EvalFunc
@Override
public void exec(Tuple input, Tuple output) throws IOException {
String str = input.getAtomField(0).strval();
byte[] bytes = str.getBytes();
Arrays.sort(bytes);
str = new String(bytes);
Tuple newOut = new Tuple(str);
output.copyFrom(newOut);
}
}
No comments:
Post a Comment