prototyping in ruby was a great way to prove the concept but my main motivation for this project was to play some more with pig.
the main approach will be
- maintain a relation with one record per ngram we want to monitoring for trending
- fold 1 hours worth of new data at a time into the model
- check the entries for the latest hour for any trends
the full version is on github. read on for a line by line walkthrough
the ruby impl used the simplest approach possible for calculating mean and stddev; maintain a record of all the values seen so far and recalculate for each new value.
for our pig version we’ll take a fixed space approach. rather than keep all the values for each time series it turns out we can get away with storing just 3…
- n: the number of values
- m: the current mean of all values
- ms: the current mean of the squares of all values
the idea is that the meann+1 = ( n * meann + new value ) / n+1
a similar function holds that derives the standard deviationn+1 from n, the meann and the mean of the squaresn
let’s go over the pig script one command a time.
we’ll assume we’ve already run it 6 times and we’re now folding in the 7th hour
the first thing is to load the existing version of the model, in this case stored in the file ‘model.006′
it contains everything we need for checking the trending for each ngram
> raw_model = load 'model.006' as (key:chararray, n:int, m:double, ms:double);
> describe raw_model;
raw_model: {key: chararray, n: int, m: double, ms: double}
> dump raw_model;
(a b,6,1.3333333333333333,2.0)
(a a,3,1.3333333333333333,2.0)
(a c,4,1.25,1.75)
(a d,1,2.0,4.0)
(b a,3,1.0,1.0)
(b d,1,2.0,4.0)
(b c,6,1.5,2.5)
(d c,1,2.0,4.0)
(c a,4,1.0,1.0)
(d e,1,1.0,1.0)
(c d,4,2.0,4.0)
(d a,2,1.0,1.0)
next we tag each entry from the loaded model with a zero frequency. we’ll see later how this makes it easier to fold in the new data.
> model = foreach raw_model generate key, n, m, ms, 0 as f;
> describe model;
model: {key: chararray, n: int, m: double, ms: double, f: int}
> dump model;
(a b,6,1.3333333333333333,2.0,0)
(a a,3,1.3333333333333333,2.0,0)
(a c,4,1.25,1.75,0)
(a d,1,2.0,4.0,0)
(b a,3,1.0,1.0,0)
(b d,1,2.0,4.0,0)
(b c,6,1.5,2.5,0)
(d c,1,2.0,4.0,0)
(c a,4,1.0,1.0,0)
(d e,1,1.0,1.0,0)
(c d,4,2.0,4.0,0)
(d a,2,1.0,1.0,0)
now that we’ve loaded the existing version of the model we can load the next hour of data, in this case contained in ‘chunks/006′.
> next_chunk = load 'chunks/006';
> dump next_chunk;
(a b a b)
(c d a b)
(a b c)
(a d d d)
from the text we want to get the frequency of the ngrams.
the breaking apart of each line into its 2-grams is handled by a simple ruby script; ngram.rb
> define ngramer `ngram.rb` ship('ngram.rb');
> ngrams = stream next_chunk through ngramer as (key:chararray);
> describe ngrams;
ngrams: {key: chararray}
> dump ngrams;
(a b)
(b a)
(a b)
(c d)
(d a)
(a b)
(a b)
(b c)
(a d)
(d d)
(d d)
calculating the frequencies of the ngrams is a simple two step process of first grouping by the key…
> ngrams_grouped = group ngrams by key;
> describe ngrams_grouped;
ngrams_grouped: {group: chararray, ngrams: {key: chararray}}
> dump ngrams_grouped;
(a b,{(a b),(a b),(a b),(a b)})
(a d,{(a d)})
(b a,{(b a)})
(b c,{(b c)})
(c d,{(c d)})
(d a,{(d a)})
(d d,{(d d),(d,d)})
…and then generating the key, frequency pairs
> ngram_freq = foreach ngrams_grouped generate group as key, SIZE(ngrams) as f;
> describe ngram_freq;
ngram_freq: {key: chararray, f: long}
> dump ngram_freq;
(a b,4L)
(a d,1L)
(b a,1L)
(b c,1L)
(c d,1L)
(d a,1L)
(d d,2L)
from this we know all the distinct 2grams that are contained in the next chunk we’re analysing
for each of these 2grams one of two things is true;
- either the ngram has been seen before (thus it has an entry in the model)
- this is the first time we’ve seen it, in which case we need to add a new entry to the model
the easiest way i’ve worked out in pig to handle this is to generate a ’seed’ model just for this chunk and fold it into the real model but unioning the relations
(i’ve been using pig 0.3 to keep in line with the current version of elastic map reduce but it might be easier with the various extra joins that are in later versions of pig)
so first we generate the ’seed’ relation…
> seed_values = foreach ngram_freq generate key, 0 as n, 0.0 as m, 0.0 as ms, f;
> describe seed_values;
seed_values: {key: chararray, n: int, m: double, ms: double, f: long}
> dump seed_values;
(a b,0,0.0,0.0,4L)
(a d,0,0.0,0.0,1L)
(b a,0,0.0,0.0,1L)
(b c,0,0.0,0.0,1L)
(c d,0,0.0,0.0,1L)
(d a,0,0.0,0.0,1L)
(d d,0,0.0,0.0,2L)
…and fold it in with a 3 step process; unioning with the original model, grouping and collapsing
first the union…
> model_plus_seed = union model, seed_values;
> describe model_plus_seed;
model_plus_seed: {key: chararray, n: int, m: double, ms: double, f: long}
> dump model_plus_seed;
(a b,0,0.0,0.0,4L)
(a b,6,1.3333333333333333,2.0,0L)
(a d,0,0.0,0.0,1L)
(a a,3,1.3333333333333333,2.0,0L)
(b a,0,0.0,0.0,1L)
(a c,4,1.25,1.75,0L)
(b c,0,0.0,0.0,1L)
(a d,1,2.0,4.0,0L)
(c d,0,0.0,0.0,1L)
(b a,3,1.0,1.0,0L)
(d a,0,0.0,0.0,1L)
(b d,1,2.0,4.0,0L)
(d d,0,0.0,0.0,2L)
(b c,6,1.5,2.5,0L)
(d c,1,2.0,4.0,0L)
(c a,4,1.0,1.0,0L)
(d e,1,1.0,1.0,0L)
(c d,4,2.0,4.0,0L)
(d a,2,1.0,1.0,0L)
then the grouping…
> model_plus_seed2 = group model_plus_seed by key;
> describe model_plus_seed2 = group model_plus_seed by key;;
model_plus_seed2: {group: chararray, model_plus_seed: {key: chararray, n: int, m: double, ms: double, f: long}}
> dump model_plus_seed2;
(a a,{(a a,3,1.3333333333333333,2.0,0L)})
(a b,{(a b,0,0.0,0.0,4L),(a b,6,1.3333333333333333,2.0,0L)})
(a c,{(a c,4,1.25,1.75,0L)})
(a d,{(a d,0,0.0,0.0,1L),(a d,1,2.0,4.0,0L)})
(b a,{(b a,0,0.0,0.0,1L),(b a,3,1.0,1.0,0L)})
(b c,{(b c,0,0.0,0.0,1L),(b c,6,1.5,2.5,0L)})
(b d,{(b d,1,2.0,4.0,0L)})
(c a,{(c a,4,1.0,1.0,0L)})
(c d,{(c d,0,0.0,0.0,1L),(c d,4,2.0,4.0,0L)})
(d a,{(d a,0,0.0,0.0,1L),(d a,2,1.0,1.0,0L)})
(d c,{(d c,1,2.0,4.0,0L)})
(d d,{(d d,0,0.0,0.0,2L)})
(d e,{(d e,1,1.0,1.0,0L)})
and finally the collapsing using MAX…
> model_n =
foreach model_plus_seed2 generate
group as key,
MAX(model_plus_seed.n) as n,
MAX(model_plus_seed.m) as m,
MAX(model_plus_seed.ms) as ms,
MAX(model_plus_seed.f) as f;
> describe model_n;
model_n: {key: chararray, n: int, m: double, ms: double, f: long}
> dump model_n;
(a a,3,1.3333333333333333,2.0,0L)
(a b,6,1.3333333333333333,2.0,4L)
(a c,4,1.25,1.75,0L)
(a d,1,2.0,4.0,1L)
(b a,3,1.0,1.0,1L)
(b c,6,1.5,2.5,1L)
(b d,1,2.0,4.0,0L)
(c a,4,1.0,1.0,0L)
(c d,4,2.0,4.0,1L)
(d a,2,1.0,1.0,1L)
(d c,1,2.0,4.0,0L)
(d d,0,0.0,0.0,2L)
(d e,1,1.0,1.0,0L)
at this stage we have the original model weaved in with the new data but still need to update the mean and square of means for the values from the latest hour.
we can do this by first seperating out the values we need to update based on whether the frequency is non zero
(recall non zero frequencies represent ngrams from the latest hour)
> split model_n into to_update if f>0, not_to_update if f==0;
> dump to_update;
(a b,6,1.3333333333333333,2.0,4L)
(a d,1,2.0,4.0,1L)
(b a,3,1.0,1.0,1L)
(b c,6,1.5,2.5,1L)
(c d,4,2.0,4.0,1L)
(d a,2,1.0,1.0,1L)
(d d,0,0.0,0.0,2L)
> dump not_to_update;
(a a,3,1.3333333333333333,2.0,0L)
(a c,4,1.25,1.75,0L)
(b d,1,2.0,4.0,0L)
(c a,4,1.0,1.0,0L)
(d c,1,2.0,4.0,0L)
(d e,1,1.0,1.0,0L)
we can now update the mean and std deviations based on the new frequency values
> updated =
foreach to_update {
m2 = ((n*m)+f)/(n+1);
ms2 = ((n*ms)+(f*f))/(n+1);
generate key, n+1 as n, m2 as m, ms2 as ms, f;
}
> describe updated;
updated: {key: chararray, n: int, m: double, ms: double, f: long}
> dump updated;
(a b,7,1.7142857142857142,4.0,4L)
(a d,2,1.5,2.5,1L)
(b a,4,1.0,1.0,1L)
(b c,7,1.4285714285714286,2.2857142857142856,1L)
(c d,5,1.8,3.4,1L)
(d a,3,1.0,1.0,1L)
(d d,1,2.0,4.0,2L)
these new rows, along with the rows we didn’t update, can be stored as the model at time n+1 ready for the next hours chunk
> to_store = union model_n1, not_to_update;
> store to_store into 'model.007';
> dump to_store;
(a b,7,1.7142857142857142,4.0,4L)
(a a,3,1.3333333333333333,2.0,0L)
(a d,2,1.5,2.5,1L)
(a c,4,1.25,1.75,0L)
(b a,4,1.0,1.0,1L)
(b d,1,2.0,4.0,0L)
(b c,7,1.4285714285714286,2.2857142857142856,1L)
(c a,4,1.0,1.0,0L)
(c d,5,1.8,3.4,1L)
(d c,1,2.0,4.0,0L)
(d a,3,1.0,1.0,1L)
(d e,1,1.0,1.0,0L)
(d d,1,2.0,4.0,2L)
now that we’ve updated the model we can start making the trending check!
first step is to filter out entries that correspond to ngrams we are seeing for the first time
( an new item can’t be trending )
> requiring_trending_check = filter model_n1 by n>1;
> dump requiring_trending_check;
(a b,7,1.7142857142857142,4.0,4L)
(a d,2,1.5,2.5,1L)
(b a,4,1.0,1.0,1L)
(b c,7,1.4285714285714286,2.2857142857142856,1L)
(c d,5,1.8,3.4,1L)
(d a,3,1.0,1.0,1L)
and finally we can make the trending calculation!
we can calculate the minimum trending value, based on mean + twice std dev…
> calc_min_trending =
foreach requiring_trending_check {
sd_lhs = n * ms;
sd_rhs = n * (m*m);
sd = org.apache.pig.piggybank.evaluation.math.SQRT((sd_lhs-sd_rhs)/n);
min_trend_value = m + (2*sd);
generate key, f, m as mean, sd as std_dev,
min_trend_value as min_trend_value,
f / min_trend_value as percent_over_trend;
}
> describe calc_min_trending;
calc_min_trending: {key: chararray, f: long, mean: double, std_dev: double, min_trend_value: double, percent_over_trend: double}
> dump calc_min_trending;
(a b,4L,1.7142857142857142,1.0301575072754257,3.7746007288365657,1.059714732061981)
(a d,1L,1.5,0.5,2.5,0.4)
(b a,1L,1.0,0.0,1.0,1.0)
(b c,1L,1.4285714285714286,0.4948716593053934,2.4183147471822153,0.4135111036167584)
(c d,1L,1.8,0.4,2.6,0.3846153846153848)
(d a,1L,1.0,0.0,1.0,1.0)
… and any entries with a frequency over the min trending value are deemed trending!
( for this example it’s only the one )
> trending = filter calc_min_trending by percent_over_trend > 1;
> describe trending;
trending: {key: chararray, f: long, mean: double, std_dev: double, min_trend_value: double, percent_over_trend: double}
> dump trending;
(a b,4L,1.7142857142857142,1.0301575072754257,3.7746007288365657,1.059714732061981)
as a normalisation step i’ve been playing with also factoring in the frequency itself,
haven’t come to a conclusion on whether this is a better metric or not…
> trending2 =
foreach trending {
normalised_trend_value = org.apache.pig.piggybank.evaluation.math.LOG10(f) * percent_over_trend;
generate key, min_trend_value, percent_over_trend, normalised_trend_value as normalised_trend_value;
}
> describe trending2;
trending2: {key: chararray, min_trend_value: double, percent_over_trend: double, normalised_trend_value: double}
> dump trending2;
(a b,3.7746007288365657,1.059714732061981,0.6380118423953504)
and finally store the top trending values for processing!
> trending_sorted = order trending2 by normalised_trend_value desc;
> top_50 = limit trending_sorted 50;
> store trending_sorted into 'trending.model.006;