值去重 (對唯一項(xiàng)計數(shù))
問題陳述: 記錄包含值域F和值域 G,要分別統(tǒng)計相同G值的記錄中不同的F值的數(shù)目 (相當(dāng)于按照 G分組)。
這個問題可以推而廣之應(yīng)用于分面搜索(某些電子商務(wù)網(wǎng)站稱之為Narrow Search)
Record 1: F=1, G={a, b}
Record 2: F=2, G={a, d, e}
Record 3: F=1, G=
Record 4: F=3, G={a, b}
Result:
a -》 3 // F=1, F=2, F=3
b -》 2 // F=1, F=3
d -》 1 // F=2
e -》 1 // F=2
解決方案 I:
第一種方法是分兩個階段來解決這個問題。第一階段在Mapper中使用F和G組成一個復(fù)合值對,然后在Reducer中輸出每個值對,目的是為了保證F值的唯一性。在第二階段,再將值對按照G值來分組計算每組中的條目數(shù)。
第一階段:
class Mapper
method Map(null, record [value f, categories [g1, g2,。..]])
for all category g in [g1, g2,。..]
Emit(record [g, f], count 1)
class Reducer
method Reduce(record [g, f], counts [n1, n2, 。..])
Emit(record [g, f], null )
第二階段:
class Mapper
method Map(record [f, g], null)
Emit(value g, count 1)
class Reducer
method Reduce(value g, counts [n1, n2,。..])
Emit(value g, sum( [n1, n2,。..] ) )
解決方案 II:
第二種方法只需要一次MapReduce 即可實(shí)現(xiàn),但擴(kuò)展性不強(qiáng)。算法很簡單-Mapper 輸出值和分類,在Reducer里為每個值對應(yīng)的分類去重然后給每個所屬的分類計數(shù)加1,最后再在Reducer結(jié)束后將所有計數(shù)加和。這種方法適用于只有有限個分類,而且擁有相同F(xiàn)值的記錄不是很多的情況。例如網(wǎng)絡(luò)日志處理和用戶分類,用戶的總數(shù)很多,但是每個用戶的事件是有限的,以此分類得到的類別也是有限的。值得一提的是在這種模式下可以在數(shù)據(jù)傳輸?shù)絉educer之前使用Combiner來去除分類的重復(fù)值。
class Mapper
method Map(null, record [value f, categories [g1, g2,。..] )
for all category g in [g1, g2,。..]
Emit(value f, category g)
class Reducer
method Initialize
H = new AssociativeArray : category -》 count
method Reduce(value f, categories [g1, g2,。..])
[g1‘, g2’,。.] = ExcludeDuplicates( [g1, g2,。.] )
for all category g in [g1‘, g2’,。..]
H{g} = H{g} + 1
method Close
for all category g in H do
Emit(category g, count H{g})
應(yīng)用:
日志分析,用戶計數(shù)
互相關(guān)
問題陳述:有多個各由若干項(xiàng)構(gòu)成的組,計算項(xiàng)兩兩共同出現(xiàn)于一個組中的次數(shù)。假如項(xiàng)數(shù)是N,那么應(yīng)該計算N*N。
這種情況常見于文本分析(條目是單詞而元組是句子),市場分析(購買了此物的客戶還可能購買什么)。如果N*N小到可以容納于一臺機(jī)器的內(nèi)存,實(shí)現(xiàn)起來就比較簡單了。
配對法
第一種方法是在Mapper中給所有條目配對,然后在Reducer中將同一條目對的計數(shù)加和。但這種做法也有缺點(diǎn):
· 使用 combiners 帶來的的好處有限,因?yàn)楹芸赡芩许?xiàng)對都是唯一的
· 不能有效利用內(nèi)存
class Mapper
method Map(null, items [i1, i2,。..] )
for all item i in [i1, i2,。..]
for all item j in [i1, i2,。..]
Emit(pair [i j], count 1)
class Reducer
method Reduce(pair [i j], counts [c1, c2,。..])
s = sum([c1, c2,。..])
Emit(pair[i j], count s)
Stripes Approach(條方法?不知道這個名字怎么理解)
第二種方法是將數(shù)據(jù)按照pair中的第一項(xiàng)來分組,并維護(hù)一個關(guān)聯(lián)數(shù)組,數(shù)組中存儲的是所有關(guān)聯(lián)項(xiàng)的計數(shù)。The second approach is to group data by the first item in pair and maintain an associative array (“stripe”) where counters for all adjacent items are accumulated. Reducer receives all stripes for leading item i, merges them, and emits the same result as in the Pairs approach.
· 中間結(jié)果的鍵數(shù)量相對較少,因此減少了排序消耗。
· 可以有效利用 combiners。
· 可在內(nèi)存中執(zhí)行,不過如果沒有正確執(zhí)行的話也會帶來問題。
· 實(shí)現(xiàn)起來比較復(fù)雜。
· 一般來說, “stripes” 比 “pairs” 更快
class Mapper
method Map(null, items [i1, i2,。..] )
for all item i in [i1, i2,。..]
H = new AssociativeArray : item -》 counter
for all item j in [i1, i2,。..]
H{j} = H{j} + 1
Emit(item i, stripe H)
class Reducer
method Reduce(item i, stripes [H1, H2,。..])
H = new AssociativeArray : item -》 counter
H = merge-sum( [H1, H2,。..] )
for all item j in H.keys()
Emit(pair [i j], H{j})
應(yīng)用:
文本分析,市場分析
References:
1. Lin J. Dyer C. Hirst G. Data Intensive Processing MapReduce
用MapReduce 表達(dá)關(guān)系模式
在這部分我們會討論一下怎么使用MapReduce來進(jìn)行主要的關(guān)系操作。
篩選(Selection)
class Mapper
method Map(rowkey key, tuple t)
if t satisfies the predicate
Emit(tuple t, null)
投影(Projection)
投影只比篩選稍微復(fù)雜一點(diǎn),在這種情況下我們可以用Reducer來消除可能的重復(fù)值
class Mapper
method Map(rowkey key, tuple t)
tuple g = project(t) // extract required fields to tuple g
Emit(tuple g, null)
class Reducer
method Reduce(tuple t, array n) // n is an array of nulls
Emit(tuple t, null)
合并(Union)
兩個數(shù)據(jù)集中的所有記錄都送入Mapper,在Reducer里消重。
class Mapper
method Map(rowkey key, tuple t)
Emit(tuple t, null)
class Reducer
method Reduce(tuple t, array n) // n is an array of one or two nulls
Emit(tuple t, null)
交集(Intersection)
將兩個數(shù)據(jù)集中需要做交叉的記錄輸入Mapper,Reducer 輸出出現(xiàn)了兩次的記錄。因?yàn)槊織l記錄都有一個主鍵,在每個數(shù)據(jù)集中只會出現(xiàn)一次,所以這樣做是可行的。
差異(Difference)
假設(shè)有兩個數(shù)據(jù)集R和S,我們要找出R與S的差異。Mapper將所有的元組做上標(biāo)記,表明他們來自于R還是S,Reducer只輸出那些存在于R中而不在S中的記錄。
class Mapper
method Map(rowkey key, tuple t)
Emit(tuple t, string t.SetName) // t.SetName is either ‘R’ or ‘S’
class Reducer
method Reduce(tuple t, array n) // array n can be [‘R’], [‘S’], [‘R’ ‘S’], or [‘S’, ‘R’]
if n.size() = 1 and n[1] = ‘R’
Emit(tuple t, null)
分組聚合(GroupBy and Aggregation)
分組聚合可以在如下的一個MapReduce中完成。Mapper抽取數(shù)據(jù)并將之分組聚合,Reducer 中對收到的數(shù)據(jù)再次聚合。典型的聚合應(yīng)用比如求和與最值可以以流的方式進(jìn)行計算,因而不需要同時保有所有的值。但是另外一些情景就必須要兩階段MapReduce,前面提到過的惟一值模式就是一個這種類型的例子。
連接(Joining)
MapperReduce框架可以很好地處理連接,不過在面對不同的數(shù)據(jù)量和處理效率要求的時候還是有一些技巧。在這部分我們會介紹一些基本方法,在后面的參考文檔中還列出了一些關(guān)于這方面的專題文章。
分配后連接 (Reduce端連接,排序-合并連接)
這個算法按照鍵K來連接數(shù)據(jù)集R和L。Mapper 遍歷R和L中的所有元組,以K為鍵輸出每一個標(biāo)記了來自于R還是L的元組,Reducer把同一個K的數(shù)據(jù)分裝入兩個容器(R和L),然后嵌套循環(huán)遍歷兩個容器中的數(shù)據(jù)以得到交集,最后輸出的每一條結(jié)果都包含了R中的數(shù)據(jù)、L中的數(shù)據(jù)和K。這種方法有以下缺點(diǎn):
· Mapper要輸出所有的數(shù)據(jù),即使一些key只會在一個集合中出現(xiàn)。
· Reducer 要在內(nèi)存中保有一個key的所有數(shù)據(jù),如果數(shù)據(jù)量打過了內(nèi)存,那么就要緩存到硬盤上,這就增加了硬盤IO的消耗。
盡管如此,再分配連接方式仍然是最通用的方法,特別是其他優(yōu)化技術(shù)都不適用的時候。
class Mapper
method Map(null, tuple [join_key k, value v1, value v2,。..])
Emit(join_key k, tagged_tuple [set_name tag, values [v1, v2, 。..] ] )
class Reducer
method Reduce(join_key k, tagged_tuples [t1, t2,。..])
H = new AssociativeArray : set_name -》 values
for all tagged_tuple t in [t1, t2,。..] // separate values into 2 arrays
H{t.tag}.add(t.values)
for all values r in H{‘R’} // produce a cross-join of the two arrays
for all values l in H{‘L’}
Emit(null, [k r l] )
復(fù)制鏈接Replicated Join (Mapper端連接, Hash 連接)
在實(shí)際應(yīng)用中,將一個小數(shù)據(jù)集和一個大數(shù)據(jù)集連接是很常見的(如用戶與日志記錄)。假定要連接兩個集合R和L,其中R相對較小,這樣,可以把R分發(fā)給所有的Mapper,每個Mapper都可以載入它并以連接鍵來索引其中的數(shù)據(jù),最常用和有效的索引技術(shù)就是哈希表。之后,Mapper遍歷L,并將其與存儲在哈希表中的R中的相應(yīng)記錄連接,。這種方法非常高效,因?yàn)椴恍枰獙中的數(shù)據(jù)排序,也不需要通過網(wǎng)絡(luò)傳送L中的數(shù)據(jù),但是R必須足夠小到能夠分發(fā)給所有的Mapper。
class Mapper
method Initialize
H = new AssociativeArray : join_key -》 tuple from R
R = loadR()
for all [ join_key k, tuple [r1, r2,。..] ] in R
H{k} = H{k}.append( [r1, r2,。..] )
method Map(join_key k, tuple l)
for all tuple r in H{k}
Emit(null, tuple [k r l] )
評論