# brain of mat kelcey

## trending topics in tweets about cheese; part2

May 01, 2010 | View Comments

prototyping in ruby was a great way to prove the concept but my main motivation for this project was to play some more with pig.

the main approach will be

1. maintain a relation with one record per token
2. fold 1 hours worth of new data at a time into the model
3. check the entries for the latest hour for any trends

the full version is on github. read on for a line by line walkthrough!

the ruby impl used the simplest approach possible for calculating mean and standard deviation; just keep a record of all the values seen so far and recalculate for each new value.

for our pig version we'll take a fixed space approach. rather than keep all the values for each time series it turns out we can get away with storing just 3...

1. num_occurences: the number of values
2. mean: the current mean of all values
3. mean_sqrs: the current mean of the squares of all values

the idea is that the meann+1 = ( n * meann + new value ) / n+1

and that the standard deviationn+1 can be calculated from n, the meann and the mean of the squaresn as we'll see below

let's say we've already run it 6 times and we're now folding in the 7th chunk of per-hour data

the data up to now shows the following
token 'a' has been seen in all 6 chunks with frequencies [1,2,1,2,1,1]; μ=1.33 ρ=0.51
token 'b' has been seen in 3 chunks with frequencies [1,2,2]; μ=1.66 ρ=0.57
token 'c' has been seen in 3 chunks with frequencies [3,4,2]; μ=3 ρ=1

the first thing is to load the existing version of the model, in this case stored in the file 'data/model/006' it contains everything we need for checking the trending for each token

> model = load 'data/model/006' as (token:chararray, num_occurences:int, mean:float, mean_sqrs:float);
> describe model;
model: {token: chararray,
num_occurences: int,
mean: float,
mean_sqrs: float}
> dump model;
(a,6,1.333333F,2.0F)
(b,3,1.666666F,3.0F)
(c,3,3.0F,9.666666F)


this tells us we've seen the token 'a' in 6 previous chunks, the average time we saw it was 1.3 times per chunk and the mean_sqrs (for the standard deviation calculation) is 2

( as a reminder of how we're using these values to calculate a trending score see part 1 )

next we load the new hour's worth of data, in this case contained in 'data/chunks/006'

> next_chunk = load 'data/chunks/006';
> dump next_chunk;
(a b a a)
(d b d a d)


from the text we want to get the frequency of the tokens and we do this using tokenizer.py which utilises the uber awesome NLTK

> define tokenizer python tokenizer.py cache('data/tokenizer.py#tokenizer.py');
> tokens = stream next_chunk through tokenizer as (token:chararray);
> describe tokens;
tokens: {token: chararray}
> dump tokens;
(a)
(b)
(a)
(a)
(d)
(b)
(d)
(a)
(d)


calculating the frequencies of the tokens is a simple two step process of first grouping by the key...

> tokens_grouped = group tokens by token PARALLEL 1;
> describe tokens_grouped;
tokens_grouped: {group: chararray,
tokens: {token: chararray}}
> dump tokens_grouped;
(a,{(a),(a),(a),(a)})
(b,{(b),(b)})
(d,{(d),(d),(d)})


...and then generating the key, frequency pairs

> chunk = foreach tokens_grouped generate group as token, SIZE(tokens) as freq;
> dump chunk;
(a,4L)
(b,2L)
(d,3L)


next we join the model with this latest chunk

> cogrouped = cogroup model by token, chunk by token;
> describe cogrouped;
cogrouped: {group: chararray,
model: {token: chararray,
num_occurences: int,
mean: float,
mean_sqrs: float},
chunk: {token: chararray,
freq: long}}
> dump cogrouped;
(a,{(a,6,1.333333F,2.0F)},{(a,4L)})
(b,{(b,3,1.666666F,3.0F)},{(b,2L)})
(c,{(c,3,3.0F,9.666666F)},{})
(d,{},{(d,3L)})


and doing this allows us to break the data into three distinct relations...

1. entries where token was just in the model; these continue to the next iteration untouched as there is nothing to update
2. entries where token was just in the chunk; these are being seen for the first time and contribute new model entries
3. entries where token was in both; these need a trending check and will require the chunk being folded into the model
> split cogrouped into
just_model_grped if IsEmpty(chunk),
just_chunk_grped if IsEmpty(model),
in_both_grped    if not IsEmpty(chunk) and not IsEmpty(model);
> dump just_model_grped;
(c,{(c,3,3.0F,9.666666F)},{})
> dump just_chunk_grped;
(d,{},{(d,3L)})
> dump in_both_grped;
(a,{(a,6,1.333333F,2.0F)},{(a,4L)})
(b,{(b,3,1.666666F,3.0F)},{(b,2L)})


each of these can be processed in turn.

firstly entries where the token was only the model (ie not in the chunk) pass to next generation of model untouched

> model_n1__just_model = foreach just_model_grped generate flatten(model);
> dump model_n1__just_model;
(c,3,3.0F,9.666666F)


secondly entries where the token was only in the chunk (ie not in the model) contribute new model entries for the next generation

> just_chunk_entries = foreach just_chunk_grped generate flatten(chunk);
> model_n1__just_chunk = foreach just_chunk_entries generate token, 1, freq, freq*freq;
> dump model_n1__just_chunk;
(d,1,3L,9L)


finally, and the most interestingly, when the token was in both the model and the chunk we need to....

flatten the data out a bit

> describe in_both_grped;
in_both_grped: {group: chararray,
model: {token: chararray,
num_occurences: int,
mean: float,
mean_sqrs: float},
chunk: {token: chararray,
freq: long}}
> in_both_flat = foreach in_both_grped generate flatten(model), flatten(chunk);
> describe in_both_flat;
in_both_flat: {model::token: chararray,
model::num_occurences: int,
model::mean: float,
model::mean_sqrs: float,
chunk::token: chararray,
chunk::freq: long}
> dump in_both_flat;
(a,6,1.333333F,2.0F,a,4L)
(b,3,1.666666F,3.0F,b,2L)


do a trending check (note the comparison of freq of iter:n is done against mean/sd of iter:n-1)

> trending = foreach in_both_flat {
sd_lhs = num_occurences * mean_sqrs;
sd_rhs = num_occurences * (mean*mean);
sd = sqrt( (sd_lhs-sd_rhs) / num_occurences );
fraction_of_sd_over_mean = ( sd==0 ? 0 : (freq-mean)/sd );
generate model::token as token, fraction_of_sd_over_mean as trending_score;
}
> describe trending;
trending: {token: chararray,
trending_score: double}
> dump trending;
(a,5.656845419750436)
(b,0.7071049267408686)


this result tells us that token 'a' is well over what was expected and is seriously trending
with a frequency of 4 in this hour's chunk it's 5.6 times the standard deviation (ρ=0.51) over it's mean frequency (μ=1.33)

token 'b' isn't really trending
with a frequency of 2 in this hour's chunk it's not even one (0.7) standard deviation (ρ=0.57) over it's mean frequency (μ=1.66)

at this stage we can do whatever we want with the trending scores, perhaps save the top 10 off.

trending_sorted = order trending by trending_score desc PARALLEL 1;
top_trending = limit trending_sorted 10 PARALLEL 1;
store top_trending into 'data/trending/006';


after the trending check we need to fold the chunk into the existing model

> model_n1__folded = foreach in_both_flat {
new_total = (mean * num_occurences) + freq;
new_total_sqrs = (mean_sqrs * num_occurences) + (freq*freq);
num_occurences = num_occurences + 1;
mean = new_total / num_occurences;
mean_sqrs = new_total_sqrs / num_occurences;
generate model::token, num_occurences, mean, mean_sqrs;
};
> dump model_n1__folded;
(a,7,1.7142855F,4.0F)
(b,4,1.7499995F,3.25F)


and finally combine with the previous parts we had broken out before to make the new generation of the model!

> model_n1 = union model_n1__just_model,
model_n1__just_chunk,
model_n1__folded;
> store model_n1 into 'data/model/007';