brain of mat kelcey...
trending topics in tweets about cheese; part2
May 01, 2010 at 04:54 PM | categories: Uncategorizedprototyping in ruby was a great way to prove the concept but my main motivation for this project was to play some more with pig.
the main approach will be
- maintain a relation with one record per token
- fold 1 hours worth of new data at a time into the model
- check the entries for the latest hour for any trends
the full version is on github. read on for a line by line walkthrough!
the ruby impl used the simplest approach possible for calculating mean and standard deviation; just keep a record of all the values seen so far and recalculate for each new value.
for our pig version we'll take a fixed space approach. rather than keep all the values for each time series it turns out we can get away with storing just 3...
- num_occurences: the number of values
- mean: the current mean of all values
- mean_sqrs: the current mean of the squares of all values
the idea is that the meann+1 = ( n * meann + new value ) / n+1
and that the standard deviationn+1 can be calculated from n, the meann and the mean of the squaresn as we'll see below
let's say we've already run it 6 times and we're now folding in the 7th chunk of per-hour data
the data up to now shows the following token 'a' has been seen in all 6 chunks with frequencies [1,2,1,2,1,1]; μ=1.33 ρ=0.51 token 'b' has been seen in 3 chunks with frequencies [1,2,2]; μ=1.66 ρ=0.57 token 'c' has been seen in 3 chunks with frequencies [3,4,2]; μ=3 ρ=1
the first thing is to load the existing version of the model, in this case stored in the file 'data/model/006' it contains everything we need for checking the trending for each token
> model = load 'data/model/006' as (token:chararray, num_occurences:int, mean:float, mean_sqrs:float);
> describe model;
model: {token: chararray,
num_occurences: int,
mean: float,
mean_sqrs: float}
> dump model;
(a,6,1.333333F,2.0F)
(b,3,1.666666F,3.0F)
(c,3,3.0F,9.666666F)
this tells us we've seen the token 'a' in 6 previous chunks, the average time we saw it was 1.3 times per chunk and the mean_sqrs (for the standard deviation calculation) is 2
( as a reminder of how we're using these values to calculate a trending score see part 1 )
next we load the new hour's worth of data, in this case contained in 'data/chunks/006'
> next_chunk = load 'data/chunks/006'; > dump next_chunk; (a b a a) (d b d a d)
from the text we want to get the frequency of the tokens and we do this using tokenizer.py which utilises the uber awesome NLTK
> define tokenizer `python tokenizer.py` cache('data/tokenizer.py#tokenizer.py');
> tokens = stream next_chunk through tokenizer as (token:chararray);
> describe tokens;
tokens: {token: chararray}
> dump tokens;
(a)
(b)
(a)
(a)
(d)
(b)
(d)
(a)
(d)
calculating the frequencies of the tokens is a simple two step process of first grouping by the key...
> tokens_grouped = group tokens by token PARALLEL 1;
> describe tokens_grouped;
tokens_grouped: {group: chararray,
tokens: {token: chararray}}
> dump tokens_grouped;
(a,{(a),(a),(a),(a)})
(b,{(b),(b)})
(d,{(d),(d),(d)})
...and then generating the key, frequency pairs
> chunk = foreach tokens_grouped generate group as token, SIZE(tokens) as freq; > dump chunk; (a,4L) (b,2L) (d,3L)
next we join the model with this latest chunk
> cogrouped = cogroup model by token, chunk by token;
> describe cogrouped;
cogrouped: {group: chararray,
model: {token: chararray,
num_occurences: int,
mean: float,
mean_sqrs: float},
chunk: {token: chararray,
freq: long}}
> dump cogrouped;
(a,{(a,6,1.333333F,2.0F)},{(a,4L)})
(b,{(b,3,1.666666F,3.0F)},{(b,2L)})
(c,{(c,3,3.0F,9.666666F)},{})
(d,{},{(d,3L)})
and doing this allows us to break the data into three distinct relations...
- entries where token was just in the model; these continue to the next iteration untouched as there is nothing to update
- entries where token was just in the chunk; these are being seen for the first time and contribute new model entries
- entries where token was in both; these need a trending check and will require the chunk being folded into the model
> split cogrouped into
just_model_grped if IsEmpty(chunk),
just_chunk_grped if IsEmpty(model),
in_both_grped if not IsEmpty(chunk) and not IsEmpty(model);
> dump just_model_grped;
(c,{(c,3,3.0F,9.666666F)},{})
> dump just_chunk_grped;
(d,{},{(d,3L)})
> dump in_both_grped;
(a,{(a,6,1.333333F,2.0F)},{(a,4L)})
(b,{(b,3,1.666666F,3.0F)},{(b,2L)})
each of these can be processed in turn.
firstly entries where the token was only the model (ie not in the chunk) pass to next generation of model untouched
> model_n1__just_model = foreach just_model_grped generate flatten(model); > dump model_n1__just_model; (c,3,3.0F,9.666666F)
secondly entries where the token was only in the chunk (ie not in the model) contribute new model entries for the next generation
> just_chunk_entries = foreach just_chunk_grped generate flatten(chunk); > model_n1__just_chunk = foreach just_chunk_entries generate token, 1, freq, freq*freq; > dump model_n1__just_chunk; (d,1,3L,9L)
finally, and the most interestingly, when the token was in both the model and the chunk we need to....
flatten the data out a bit
> describe in_both_grped;
in_both_grped: {group: chararray,
model: {token: chararray,
num_occurences: int,
mean: float,
mean_sqrs: float},
chunk: {token: chararray,
freq: long}}
> in_both_flat = foreach in_both_grped generate flatten(model), flatten(chunk);
> describe in_both_flat;
in_both_flat: {model::token: chararray,
model::num_occurences: int,
model::mean: float,
model::mean_sqrs: float,
chunk::token: chararray,
chunk::freq: long}
> dump in_both_flat;
(a,6,1.333333F,2.0F,a,4L)
(b,3,1.666666F,3.0F,b,2L)
do a trending check (note the comparison of freq of iter:n is done against mean/sd of iter:n-1)
> trending = foreach in_both_flat {
sd_lhs = num_occurences * mean_sqrs;
sd_rhs = num_occurences * (mean*mean);
sd = sqrt( (sd_lhs-sd_rhs) / num_occurences );
fraction_of_sd_over_mean = ( sd==0 ? 0 : (freq-mean)/sd );
generate model::token as token, fraction_of_sd_over_mean as trending_score;
}
> describe trending;
trending: {token: chararray,
trending_score: double}
> dump trending;
(a,5.656845419750436)
(b,0.7071049267408686)
this result tells us that token 'a' is well over what was expected and is seriously trending with a frequency of 4 in this hour's chunk it's 5.6 times the standard deviation (ρ=0.51) over it's mean frequency (μ=1.33)
token 'b' isn't really trending with a frequency of 2 in this hour's chunk it's not even one (0.7) standard deviation (ρ=0.57) over it's mean frequency (μ=1.66)
at this stage we can do whatever we want with the trending scores, perhaps save the top 10 off.
trending_sorted = order trending by trending_score desc PARALLEL 1; top_trending = limit trending_sorted 10 PARALLEL 1; store top_trending into 'data/trending/006';
after the trending check we need to fold the chunk into the existing model
> model_n1__folded = foreach in_both_flat {
new_total = (mean * num_occurences) + freq;
new_total_sqrs = (mean_sqrs * num_occurences) + (freq*freq);
num_occurences = num_occurences + 1;
mean = new_total / num_occurences;
mean_sqrs = new_total_sqrs / num_occurences;
generate model::token, num_occurences, mean, mean_sqrs;
};
> dump model_n1__folded;
(a,7,1.7142855F,4.0F)
(b,4,1.7499995F,3.25F)
and finally combine with the previous parts we had broken out before to make the new generation of the model!
> model_n1 = union model_n1__just_model,
model_n1__just_chunk,
model_n1__folded;
> store model_n1 into 'data/model/007';





