brain of mat kelcey


e11.1 from bash scripts to hadoop

October 18, 2009 | View Comments

let's rewrite v1 using hadoop tooling, code is on github

we'll run hadoop in non distributed standalone mode. in this mode everything runs in a single jvm so it's nice and simple to dev against.

step 1: extract the locations strings from the json stream

in v1 it was

bzcat sample.bz2 | ./extract_locations.pl > locations

using the the awesome hadoop streaming interface it's not too different. this interface allows you to specify any app as the mapper or reducer. the main difference is that it works on directories not just files.

for the mapper we'll use exactly the same script as before; extract_locations.pl and since there is no reduce component of this job so we use an "identity" script, ie cat, as the reduce phase.

mkdir json_stream
bzcat sample.bz2 | gzip - > json_stream/input.gz
# hadoop supports gzip out of the bound but not bzip2 :(
export HADOOP_STREAMING_JAR=$HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar
hadoop jar $HADOOP_STREAMING_JAR \
  -mapper ./extract_locations.pl -reducer /bin/cat \
  -input json_stream -output locations

this gives us the locations in a single file locations/part-0000

step 2: extract iphone and ut lat longs strings

the second step is another text munging problem where we extract just the lat longs for the iPhone and UT tagged locations

ie for strings of the form

iPhone: 21.320328,-157.877579
\u00dcT: 41.727877,-91.626323

we want to extract

21.320328 -157.877579
41.727877 -91.626323

since this is just text manipulation we'll use streaming again

in v1 it was

cat locations | ./extract_lat_longs_from_locations.rb iphone > locations.iphone
cat locations | ./extract_lat_longs_from_locations.rb ut > locations.ut

for hadoop streaming it's

hadoop jar $HADOOP_STREAMING_JAR \
  -mapper './extract_lat_longs_from_locations.rb iphone' -reducer /bin/cat \
  -input locations -output locations.iphone
hadoop jar $HADOOP_STREAMING_JAR \
  -mapper './extract_lat_longs_from_locations.rb ut' -reducer /bin/cat \
  -input locations -output locations.ut

step 3: convert from lat long to mercator coordinates and aggregate into buckets for the heat map

in v1 it was

cat locations.{ut,iphone} | ./lat_long_to_merc.rb | ./bucket.rb | sort | uniq -c

this converts the three tuples { lat, long }

35.670086 139.740766
-23.492420 -46.846916
35.657570 139.744858

into two tuples { frequency, left-offset, top-offset }

1 0.36 0.45
2 0.88 0.28

the first two parts, converting to mercator (lat_long_to_merc.rb) and the bucketing (bucket.rb), i'll combine into one script.

hadoop jar $HADOOP_STREAMING_JAR \
  -mapper ./lat_long_to_merc_and_bucket.rb -reducer /bin/cat \
  -input locations.iphone -input locations.ut -output x_y_points

but the use of sort and uniq to aggregate the data is represented by the shuffle and reduce stages of hadoop.

we could use the aggregate functionality of the streaming interface but i'm trying to learn more pig so we'll use that instead. pig is a scripting language that translates a pig latin query language into map reduce jobs. my main motivation for using it has been that it's great at doing joins, something i've found to be a big pain to represent in plain map reduce jobs.

( note we didn't do the conversion to mercator and bucketing in pig, the arithmetic operations provided are a bit lacking. )

enter a pig shell running in standalone (ie non hadoop distributed) mode

bash> pig -x local

load the points

grunt> pts = load 'x_y_points/part-00000' as (x:float, y:float);
grunt> describe pts;
pts: {x: float,y: float}
grunt> dump pts
(0.06F,0.32F)
(0.15F,0.27F)
(0.16F,0.27F)
...

group them together

grunt> buckets = group pts by (x,y);
grunt> describe buckets;
buckets: {group: (x: float,y: float),pts: {x: float,y: float}}
grunt> dump buckets;
((0.06F,0.32F),{(0.06F,0.32F)})
((0.15F,0.27F),{(0.15F,0.27F)})
((0.16F,0.27F),{(0.16F,0.27F),(0.16F,0.27F),(0.16F,0.27F),(0.16F,0.27F)})
...

from the groups emit the size of each bucket, this corresponds to the frequency

grunt> freq = foreach buckets { generate group, SIZE(pts) as size; }
grunt> describe freq;
freq: {group: (x: float,y: float),size: long}
grunt> dump freq
((0.06F,0.32F),1L)
((0.15F,0.27F),1L)
((0.16F,0.27F),4L)
...

and based on the sizes we can evaluate the min and max frequencies which we'll use in the colour coding of the heat map

grunt> freqs = group freq all;
grunt> describe freqs;
freqs: {group: chararray,freq: {group: (x: float,y: float),size: long}}
grunt> dump freqs;
(all,{((0.06F,0.32F),1L),((0.15F,0.27F),1L), ... })
grunt> store freq into 'freqs';</pre>
<pre>grunt> min_max = foreach freqs { generate MAX(freq.size) as max, MIN(freq.size) as min; };
grunt> describe min_max;
min_max: {max: long,min: long}
grunt> dump min_max;
(7L,1L)
grunt> store min_max into 'min_max';</pre>
<pre>bash> cat freqs
(0.06,0.32)   1
(0.15,0.27)   1
(0.16,0.27)   4

these call all be run as one command

bash> pig -x local -f freqs.pig

we just need our final conversion to a javascript snippet to jam into a map page

bash> cat freqs | ./as_draw_square.rb 1 7

win!

to make things a little different lets use a bigger sample of 475e3 tweets from oct 13 07:00 to 20:00. this results in 10e3 iphone locations (7e3 unique) and 22e3 ut locations (15e3 unique)

lat longs are bucketed into only 478 pixels for map

here's one plot with the raw numbers; highest freq is 9e3 in jakarta

raw frequencies

raw frequencies

scaling down by log 10 gives a smoother map

log10 frequencies

log10 frequencies

and here is a comparison of iphone vs ut. without knowing what ut is i can see it's not big in northern europe or japan but it's popular in indonesia.

iphones

iphones

ut

ut

next steps, animating based on the hour of the day

blog comments powered by Disqus