# brain of mat kelcey

## trending topics in tweets about cheese; part2

May 01, 2010 | View Comments

prototyping in ruby was a great way to prove the concept but my main motivation for this project was to play some more with pig.

the main approach will be

- maintain a relation with one record per token
- fold 1 hours worth of new data at a time into the model
- check the entries for the latest hour for any trends

the full version is on github. read on for a line by line walkthrough!

the ruby impl used the simplest approach possible for calculating mean and standard deviation; just keep a record of all the values seen so far and recalculate for each new value.

for our pig version we'll take a fixed space approach. rather than keep *all* the values for
each time series it turns out we can get away with storing just 3...

- num_occurences: the number of values
- mean: the current mean of all values
- mean_sqrs: the current mean of the squares of all values

the idea is that the mean_{n+1} = ( n * mean_{n} + new value ) / n+1

and that the standard deviation_{n+1} can be calculated from n, the mean_{n} and the mean of the squares_{n} as we'll see below

let's say we've already run it 6 times and we're now folding in the 7th chunk of per-hour data

the data up to now shows the following

token 'a' has been seen in all 6 chunks with frequencies [1,2,1,2,1,1]; μ=1.33 ρ=0.51

token 'b' has been seen in 3 chunks with frequencies [1,2,2]; μ=1.66 ρ=0.57

token 'c' has been seen in 3 chunks with frequencies [3,4,2]; μ=3 ρ=1

the first thing is to load the existing version of the model, in this case stored in the file 'data/model/006' it contains everything we need for checking the trending for each token

> model = load 'data/model/006' as (token:chararray, num_occurences:int, mean:float, mean_sqrs:float); > describe model; model: {token: chararray, num_occurences: int, mean: float, mean_sqrs: float} > dump model; (a,6,1.333333F,2.0F) (b,3,1.666666F,3.0F) (c,3,3.0F,9.666666F)

this tells us we've seen the token 'a' in 6 previous chunks, the average time we saw it was 1.3 times per chunk and the mean_sqrs (for the standard deviation calculation) is 2

( as a reminder of how we're using these values to calculate a trending score see part 1 )

next we load the new hour's worth of data, in this case contained in 'data/chunks/006'

> next_chunk = load 'data/chunks/006'; > dump next_chunk; (a b a a) (d b d a d)

from the text we want to get the frequency of the tokens and we do this using tokenizer.py which utilises the uber awesome NLTK

> define tokenizer `python tokenizer.py` cache('data/tokenizer.py#tokenizer.py'); > tokens = stream next_chunk through tokenizer as (token:chararray); > describe tokens; tokens: {token: chararray} > dump tokens; (a) (b) (a) (a) (d) (b) (d) (a) (d)

calculating the frequencies of the tokens is a simple two step process of first grouping by the key...

> tokens_grouped = group tokens by token PARALLEL 1; > describe tokens_grouped; tokens_grouped: {group: chararray, tokens: {token: chararray}} > dump tokens_grouped; (a,{(a),(a),(a),(a)}) (b,{(b),(b)}) (d,{(d),(d),(d)})

...and then generating the key, frequency pairs

> chunk = foreach tokens_grouped generate group as token, SIZE(tokens) as freq; > dump chunk; (a,4L) (b,2L) (d,3L)

next we join the model with this latest chunk

> cogrouped = cogroup model by token, chunk by token; > describe cogrouped; cogrouped: {group: chararray, model: {token: chararray, num_occurences: int, mean: float, mean_sqrs: float}, chunk: {token: chararray, freq: long}} > dump cogrouped; (a,{(a,6,1.333333F,2.0F)},{(a,4L)}) (b,{(b,3,1.666666F,3.0F)},{(b,2L)}) (c,{(c,3,3.0F,9.666666F)},{}) (d,{},{(d,3L)})

and doing this allows us to break the data into three distinct relations...

- entries where token was just in the model; these continue to the next iteration untouched as there is nothing to update
- entries where token was just in the chunk; these are being seen for the first time and contribute new model entries
- entries where token was in both; these need a trending check and will require the chunk being folded into the model

> split cogrouped into just_model_grped if IsEmpty(chunk), just_chunk_grped if IsEmpty(model), in_both_grped if not IsEmpty(chunk) and not IsEmpty(model); > dump just_model_grped; (c,{(c,3,3.0F,9.666666F)},{}) > dump just_chunk_grped; (d,{},{(d,3L)}) > dump in_both_grped; (a,{(a,6,1.333333F,2.0F)},{(a,4L)}) (b,{(b,3,1.666666F,3.0F)},{(b,2L)})

each of these can be processed in turn.

firstly entries where the token was only the model (ie not in the chunk) pass to next generation of model untouched

> model_n1__just_model = foreach just_model_grped generate flatten(model); > dump model_n1__just_model; (c,3,3.0F,9.666666F)

secondly entries where the token was only in the chunk (ie not in the model) contribute new model entries for the next generation

> just_chunk_entries = foreach just_chunk_grped generate flatten(chunk); > model_n1__just_chunk = foreach just_chunk_entries generate token, 1, freq, freq*freq; > dump model_n1__just_chunk; (d,1,3L,9L)

finally, and the most interestingly, when the token was in both the model and the chunk we need to....

flatten the data out a bit

> describe in_both_grped; in_both_grped: {group: chararray, model: {token: chararray, num_occurences: int, mean: float, mean_sqrs: float}, chunk: {token: chararray, freq: long}} > in_both_flat = foreach in_both_grped generate flatten(model), flatten(chunk); > describe in_both_flat; in_both_flat: {model::token: chararray, model::num_occurences: int, model::mean: float, model::mean_sqrs: float, chunk::token: chararray, chunk::freq: long} > dump in_both_flat; (a,6,1.333333F,2.0F,a,4L) (b,3,1.666666F,3.0F,b,2L)

do a trending check (note the comparison of freq of iter:n is done against mean/sd of iter:n-1)

> trending = foreach in_both_flat { sd_lhs = num_occurences * mean_sqrs; sd_rhs = num_occurences * (mean*mean); sd = sqrt( (sd_lhs-sd_rhs) / num_occurences ); fraction_of_sd_over_mean = ( sd==0 ? 0 : (freq-mean)/sd ); generate model::token as token, fraction_of_sd_over_mean as trending_score; } > describe trending; trending: {token: chararray, trending_score: double} > dump trending; (a,5.656845419750436) (b,0.7071049267408686)

this result tells us that token 'a' is well over what was expected and is seriously trending

with a frequency of 4 in this hour's chunk it's 5.6 times the standard deviation (ρ=0.51) over it's mean frequency (μ=1.33)

token 'b' isn't really trending

with a frequency of 2 in this hour's chunk it's not even one (0.7) standard deviation (ρ=0.57) over it's mean frequency (μ=1.66)

at this stage we can do whatever we want with the trending scores, perhaps save the top 10 off.

trending_sorted = order trending by trending_score desc PARALLEL 1; top_trending = limit trending_sorted 10 PARALLEL 1; store top_trending into 'data/trending/006';

after the trending check we need to fold the chunk into the existing model

> model_n1__folded = foreach in_both_flat { new_total = (mean * num_occurences) + freq; new_total_sqrs = (mean_sqrs * num_occurences) + (freq*freq); num_occurences = num_occurences + 1; mean = new_total / num_occurences; mean_sqrs = new_total_sqrs / num_occurences; generate model::token, num_occurences, mean, mean_sqrs; }; > dump model_n1__folded; (a,7,1.7142855F,4.0F) (b,4,1.7499995F,3.25F)

and finally combine with the previous parts we had broken out before to make the new generation of the model!

> model_n1 = union model_n1__just_model, model_n1__just_chunk, model_n1__folded; > store model_n1 into 'data/model/007';