brain of mat kelcey...
trending topics in tweets about cheese; part2
May 01, 2010 at 04:54 PM | categories: Uncategorizedprototyping in ruby was a great way to prove the concept but my main motivation for this project was to play some more with pig.
the main approach will be
- maintain a relation with one record per token
- fold 1 hours worth of new data at a time into the model
- check the entries for the latest hour for any trends
the full version is on github. read on for a line by line walkthrough!
the ruby impl used the simplest approach possible for calculating mean and standard deviation; just keep a record of all the values seen so far and recalculate for each new value.
for our pig version we'll take a fixed space approach. rather than keep all the values for each time series it turns out we can get away with storing just 3...
- num_occurences: the number of values
- mean: the current mean of all values
- mean_sqrs: the current mean of the squares of all values
the idea is that the meann+1 = ( n * meann + new value ) / n+1
and that the standard deviationn+1 can be calculated from n, the meann and the mean of the squaresn as we'll see below
let's say we've already run it 6 times and we're now folding in the 7th chunk of per-hour data
the data up to now shows the following token 'a' has been seen in all 6 chunks with frequencies [1,2,1,2,1,1]; μ=1.33 ρ=0.51 token 'b' has been seen in 3 chunks with frequencies [1,2,2]; μ=1.66 ρ=0.57 token 'c' has been seen in 3 chunks with frequencies [3,4,2]; μ=3 ρ=1
the first thing is to load the existing version of the model, in this case stored in the file 'data/model/006' it contains everything we need for checking the trending for each token
> model = load 'data/model/006' as (token:chararray, num_occurences:int, mean:float, mean_sqrs:float); > describe model; model: {token: chararray, num_occurences: int, mean: float, mean_sqrs: float} > dump model; (a,6,1.333333F,2.0F) (b,3,1.666666F,3.0F) (c,3,3.0F,9.666666F)
this tells us we've seen the token 'a' in 6 previous chunks, the average time we saw it was 1.3 times per chunk and the mean_sqrs (for the standard deviation calculation) is 2
( as a reminder of how we're using these values to calculate a trending score see part 1 )
next we load the new hour's worth of data, in this case contained in 'data/chunks/006'
> next_chunk = load 'data/chunks/006'; > dump next_chunk; (a b a a) (d b d a d)
from the text we want to get the frequency of the tokens and we do this using tokenizer.py which utilises the uber awesome NLTK
> define tokenizer `python tokenizer.py` cache('data/tokenizer.py#tokenizer.py'); > tokens = stream next_chunk through tokenizer as (token:chararray); > describe tokens; tokens: {token: chararray} > dump tokens; (a) (b) (a) (a) (d) (b) (d) (a) (d)
calculating the frequencies of the tokens is a simple two step process of first grouping by the key...
> tokens_grouped = group tokens by token PARALLEL 1; > describe tokens_grouped; tokens_grouped: {group: chararray, tokens: {token: chararray}} > dump tokens_grouped; (a,{(a),(a),(a),(a)}) (b,{(b),(b)}) (d,{(d),(d),(d)})
...and then generating the key, frequency pairs
> chunk = foreach tokens_grouped generate group as token, SIZE(tokens) as freq; > dump chunk; (a,4L) (b,2L) (d,3L)
next we join the model with this latest chunk
> cogrouped = cogroup model by token, chunk by token; > describe cogrouped; cogrouped: {group: chararray, model: {token: chararray, num_occurences: int, mean: float, mean_sqrs: float}, chunk: {token: chararray, freq: long}} > dump cogrouped; (a,{(a,6,1.333333F,2.0F)},{(a,4L)}) (b,{(b,3,1.666666F,3.0F)},{(b,2L)}) (c,{(c,3,3.0F,9.666666F)},{}) (d,{},{(d,3L)})
and doing this allows us to break the data into three distinct relations...
- entries where token was just in the model; these continue to the next iteration untouched as there is nothing to update
- entries where token was just in the chunk; these are being seen for the first time and contribute new model entries
- entries where token was in both; these need a trending check and will require the chunk being folded into the model
> split cogrouped into just_model_grped if IsEmpty(chunk), just_chunk_grped if IsEmpty(model), in_both_grped if not IsEmpty(chunk) and not IsEmpty(model); > dump just_model_grped; (c,{(c,3,3.0F,9.666666F)},{}) > dump just_chunk_grped; (d,{},{(d,3L)}) > dump in_both_grped; (a,{(a,6,1.333333F,2.0F)},{(a,4L)}) (b,{(b,3,1.666666F,3.0F)},{(b,2L)})
each of these can be processed in turn.
firstly entries where the token was only the model (ie not in the chunk) pass to next generation of model untouched
> model_n1__just_model = foreach just_model_grped generate flatten(model); > dump model_n1__just_model; (c,3,3.0F,9.666666F)
secondly entries where the token was only in the chunk (ie not in the model) contribute new model entries for the next generation
> just_chunk_entries = foreach just_chunk_grped generate flatten(chunk); > model_n1__just_chunk = foreach just_chunk_entries generate token, 1, freq, freq*freq; > dump model_n1__just_chunk; (d,1,3L,9L)
finally, and the most interestingly, when the token was in both the model and the chunk we need to....
flatten the data out a bit
> describe in_both_grped; in_both_grped: {group: chararray, model: {token: chararray, num_occurences: int, mean: float, mean_sqrs: float}, chunk: {token: chararray, freq: long}} > in_both_flat = foreach in_both_grped generate flatten(model), flatten(chunk); > describe in_both_flat; in_both_flat: {model::token: chararray, model::num_occurences: int, model::mean: float, model::mean_sqrs: float, chunk::token: chararray, chunk::freq: long} > dump in_both_flat; (a,6,1.333333F,2.0F,a,4L) (b,3,1.666666F,3.0F,b,2L)
do a trending check (note the comparison of freq of iter:n is done against mean/sd of iter:n-1)
> trending = foreach in_both_flat { sd_lhs = num_occurences * mean_sqrs; sd_rhs = num_occurences * (mean*mean); sd = sqrt( (sd_lhs-sd_rhs) / num_occurences ); fraction_of_sd_over_mean = ( sd==0 ? 0 : (freq-mean)/sd ); generate model::token as token, fraction_of_sd_over_mean as trending_score; } > describe trending; trending: {token: chararray, trending_score: double} > dump trending; (a,5.656845419750436) (b,0.7071049267408686)
this result tells us that token 'a' is well over what was expected and is seriously trending with a frequency of 4 in this hour's chunk it's 5.6 times the standard deviation (ρ=0.51) over it's mean frequency (μ=1.33)
token 'b' isn't really trending with a frequency of 2 in this hour's chunk it's not even one (0.7) standard deviation (ρ=0.57) over it's mean frequency (μ=1.66)
at this stage we can do whatever we want with the trending scores, perhaps save the top 10 off.
trending_sorted = order trending by trending_score desc PARALLEL 1; top_trending = limit trending_sorted 10 PARALLEL 1; store top_trending into 'data/trending/006';
after the trending check we need to fold the chunk into the existing model
> model_n1__folded = foreach in_both_flat { new_total = (mean * num_occurences) + freq; new_total_sqrs = (mean_sqrs * num_occurences) + (freq*freq); num_occurences = num_occurences + 1; mean = new_total / num_occurences; mean_sqrs = new_total_sqrs / num_occurences; generate model::token, num_occurences, mean, mean_sqrs; }; > dump model_n1__folded; (a,7,1.7142855F,4.0F) (b,4,1.7499995F,3.25F)
and finally combine with the previous parts we had broken out before to make the new generation of the model!
> model_n1 = union model_n1__just_model, model_n1__just_chunk, model_n1__folded; > store model_n1 into 'data/model/007';