brain of mat kelcey

trending topics in tweets about cheese; part2

May 01, 2010 | View Comments

prototyping in ruby was a great way to prove the concept but my main motivation for this project was to play some more with pig.

the main approach will be

  1. maintain a relation with one record per token
  2. fold 1 hours worth of new data at a time into the model
  3. check the entries for the latest hour for any trends

the full version is on github. read on for a line by line walkthrough!

the ruby impl used the simplest approach possible for calculating mean and standard deviation; just keep a record of all the values seen so far and recalculate for each new value.

for our pig version we'll take a fixed space approach. rather than keep all the values for each time series it turns out we can get away with storing just 3...

  1. num_occurences: the number of values
  2. mean: the current mean of all values
  3. mean_sqrs: the current mean of the squares of all values

the idea is that the meann+1 = ( n * meann + new value ) / n+1

and that the standard deviationn+1 can be calculated from n, the meann and the mean of the squaresn as we'll see below

let's say we've already run it 6 times and we're now folding in the 7th chunk of per-hour data

the data up to now shows the following
token 'a' has been seen in all 6 chunks with frequencies [1,2,1,2,1,1]; μ=1.33 ρ=0.51
token 'b' has been seen in 3 chunks with frequencies [1,2,2]; μ=1.66 ρ=0.57
token 'c' has been seen in 3 chunks with frequencies [3,4,2]; μ=3 ρ=1

the first thing is to load the existing version of the model, in this case stored in the file 'data/model/006' it contains everything we need for checking the trending for each token

> model = load 'data/model/006' as (token:chararray, num_occurences:int, mean:float, mean_sqrs:float);
> describe model;
model: {token: chararray, 
        num_occurences: int, 
        mean: float, 
        mean_sqrs: float}
> dump model;

this tells us we've seen the token 'a' in 6 previous chunks, the average time we saw it was 1.3 times per chunk and the mean_sqrs (for the standard deviation calculation) is 2

( as a reminder of how we're using these values to calculate a trending score see part 1 )

next we load the new hour's worth of data, in this case contained in 'data/chunks/006'

> next_chunk = load 'data/chunks/006';
> dump next_chunk;
(a b a a)
(d b d a d)

from the text we want to get the frequency of the tokens and we do this using which utilises the uber awesome NLTK

> define tokenizer `python` cache('data/');
> tokens = stream next_chunk through tokenizer as (token:chararray);
> describe tokens;   
tokens: {token: chararray}
> dump tokens;

calculating the frequencies of the tokens is a simple two step process of first grouping by the key...

> tokens_grouped = group tokens by token PARALLEL 1;
> describe tokens_grouped;
tokens_grouped: {group: chararray,
                 tokens: {token: chararray}}
> dump tokens_grouped;

...and then generating the key, frequency pairs

> chunk = foreach tokens_grouped generate group as token, SIZE(tokens) as freq;
> dump chunk;

next we join the model with this latest chunk

> cogrouped = cogroup model by token, chunk by token;
> describe cogrouped;
cogrouped: {group: chararray,
            model: {token: chararray,
                    num_occurences: int,
                    mean: float,
                    mean_sqrs: float},
            chunk: {token: chararray,
                    freq: long}}
> dump cogrouped;

and doing this allows us to break the data into three distinct relations...

  1. entries where token was just in the model; these continue to the next iteration untouched as there is nothing to update
  2. entries where token was just in the chunk; these are being seen for the first time and contribute new model entries
  3. entries where token was in both; these need a trending check and will require the chunk being folded into the model
> split cogrouped into
        just_model_grped if IsEmpty(chunk),
        just_chunk_grped if IsEmpty(model),
        in_both_grped    if not IsEmpty(chunk) and not IsEmpty(model);
> dump just_model_grped;
> dump just_chunk_grped;
> dump in_both_grped;

each of these can be processed in turn.

firstly entries where the token was only the model (ie not in the chunk) pass to next generation of model untouched

> model_n1__just_model = foreach just_model_grped generate flatten(model);
> dump model_n1__just_model;

secondly entries where the token was only in the chunk (ie not in the model) contribute new model entries for the next generation

> just_chunk_entries = foreach just_chunk_grped generate flatten(chunk);
> model_n1__just_chunk = foreach just_chunk_entries generate token, 1, freq, freq*freq;
> dump model_n1__just_chunk;

finally, and the most interestingly, when the token was in both the model and the chunk we need to....

flatten the data out a bit

> describe in_both_grped;
in_both_grped: {group: chararray,
                model: {token: chararray,
                        num_occurences: int,
                        mean: float,
                        mean_sqrs: float},
                chunk: {token: chararray,
                        freq: long}}
> in_both_flat = foreach in_both_grped generate flatten(model), flatten(chunk);
> describe in_both_flat;
in_both_flat: {model::token: chararray,
               model::num_occurences: int,
               model::mean: float,
               model::mean_sqrs: float,
               chunk::token: chararray,
               chunk::freq: long}
> dump in_both_flat;

do a trending check (note the comparison of freq of iter:n is done against mean/sd of iter:n-1)

> trending = foreach in_both_flat {
               sd_lhs = num_occurences * mean_sqrs;
               sd_rhs = num_occurences * (mean*mean);
               sd = sqrt( (sd_lhs-sd_rhs) / num_occurences ); 
               fraction_of_sd_over_mean = ( sd==0 ? 0 : (freq-mean)/sd );
               generate model::token as token, fraction_of_sd_over_mean as trending_score; 
> describe trending;
trending: {token: chararray,
           trending_score: double}
> dump trending;

this result tells us that token 'a' is well over what was expected and is seriously trending
with a frequency of 4 in this hour's chunk it's 5.6 times the standard deviation (ρ=0.51) over it's mean frequency (μ=1.33)

token 'b' isn't really trending
with a frequency of 2 in this hour's chunk it's not even one (0.7) standard deviation (ρ=0.57) over it's mean frequency (μ=1.66)

at this stage we can do whatever we want with the trending scores, perhaps save the top 10 off.

trending_sorted = order trending by trending_score desc PARALLEL 1;
top_trending = limit trending_sorted 10 PARALLEL 1;
store top_trending into 'data/trending/006';

after the trending check we need to fold the chunk into the existing model

> model_n1__folded = foreach in_both_flat {
                       new_total = (mean * num_occurences) + freq;
                       new_total_sqrs = (mean_sqrs * num_occurences) + (freq*freq);
                       num_occurences = num_occurences + 1;
                       mean = new_total / num_occurences;
                       mean_sqrs = new_total_sqrs / num_occurences;
                       generate model::token, num_occurences, mean, mean_sqrs;
> dump model_n1__folded;

and finally combine with the previous parts we had broken out before to make the new generation of the model!

> model_n1 = union model_n1__just_model, 
> store model_n1 into 'data/model/007';
blog comments powered by Disqus