代写辅导接单-Efficient Parallel Set-Similarity Joins Using MapReduce

欢迎使用51辅导,51作业君孵化低价透明的学长辅导平台,服务保持优质,平均费用压低50%以上! 51fudao.top

Efficient Parallel Set-Similarity Joins Using MapReduce

Rares Vernica Michael J. Carey Chen Li

DepartmentofComputer DepartmentofComputer DepartmentofComputer

Science Science Science

UniversityofCalifornia,Irvine UniversityofCalifornia,Irvine UniversityofCalifornia,Irvine

[email protected] [email protected] [email protected]

ABSTRACT person. Asanotherexample,whenminingsocialnetworking

siteswhereusers’preferencesarestoredasbitvectors(where

In this paper we study how to efficiently perform set-simi-

a“1”bit means interest in a certain domain), applications

larityjoinsinparallelusingthepopularMapReduceframe-

want to use the fact that a user with preference bit vector

work. We propose a 3-stage approach for end-to-end set-

“[1,0,0,1,1,0,1,0,0,1]”possibly has similar interests to

similarityjoins. Wetakeasinputasetofrecordsandoutput

a user with preferences“[1,0,0,0,1,0,1,0,1,1]”.

a set of joined records based on a set-similarity condition.

Detectingsuchsimilarpairsischallengingtoday,asthere

We efficiently partition the data across nodes in order to

isanincreasingtrendofapplicationsbeingexpectedtodeal

balancetheworkloadandminimizetheneedforreplication.

with vast amounts of data that usually do not fit in the

Westudybothself-joinandR-Sjoincases,andshowhowto

main memory of one machine. For example, the Google

carefully control the amount of data kept in main memory

N-gram dataset [27] has 1 trillion records; the GeneBank

oneachnode. Wealsoproposesolutionsforthecasewhere,

dataset [11] contains 100 million records and has a size of

even if we use the most fine-grained partitioning, the data

416GB.Applicationswithsuchdatasetsusuallymakeuseof

still does not fit in the main memory of a node. We report

clustersofmachinesandemployparallelalgorithmsinorder

resultsfromextensiveexperimentsonrealdatasets,synthet-

to efficiently deal with this vast amount of data. For data-

icallyincreasedinsize,toevaluatethespeedupandscaleup

intensive applications,the MapReduce [7] paradigm has re-

properties of the proposed algorithms using Hadoop.

centlyreceivedalotofattentionforbeingascalableparallel

shared-nothingdata-processingplatform. Theframeworkis

CategoriesandSubjectDescriptors

able to scale to thousands of nodes [7]. In this paper, we

H.2.4[Database Management]: Systems—query process- useMapReduceastheparalleldata-processingparadigmfor

ing, parallel databases finding similar pairs of records.

Whendealingwithaverylargeamountofdata,detecting

GeneralTerms similarpairsofrecordsbecomesachallengingproblem,even

if a large computational cluster is available. Parallel data-

Algorithms, Performance processing paradigms rely on data partitioning and redis-

tribution for efficient query execution. Partitioning records

1. INTRODUCTION for finding similar pairs of records is challenging for string

There are many applications that require detecting simi- or set-based data as hash-based partitioning using the en-

lar pairs of records where the records contain string or set- tire string or set does not suffice. The contributions of this

based data. A list of possible applications includes: de- paper are as follows:

tecting near duplicate web-pages in web crawling [14], doc- • We describe efficient ways to partition a large dataset

ument clustering[5], plagiarism detection [15], master data acrossnodesinordertobalancetheworkloadandmini-

management, making recommendations to users based on mizetheneedforreplication. Comparedtotheequi-join

theirsimilaritytootherusersinqueryrefinement[22],min- case, the set-similarity joins case requires“partitioning”

inginsocialnetworkingsites[25],andidentifyingcoalitions the data based on set contents.

of click fraudsters in online advertising [20]. For example, • We describe efficient solutions that exploit the MapRe-

in master-data-management applications, a system has to

duce framework. We show how to efficiently deal with

identify that names“John W. Smith”,“Smith, John”, and

problems such as partitioning, replication, and multiple

“John William Smith”arepotentiallyreferringtothesame

inputs by manipulating the keys used to route the data

in the framework.

• We present methods for controlling the amount of data

keptinmemoryduringajoinbyexploitingtheproperties

Permissiontomakedigitalorhardcopiesofallorpartofthisworkfor

personalorclassroomuseisgrantedwithoutfeeprovidedthatcopiesare of the data that needs to be joined.

notmadeordistributedforprofitorcommercialadvantageandthatcopies • We provide algorithms for answering set-similarity self-

bearthisnoticeandthefullcitationonthefirstpage.Tocopyotherwise,to

joinqueriesend-to-end,wherewestartfromrecordscon-

republish,topostonserversortoredistributetolists,requirespriorspecific

taining more than just the join attribute and end with

permissionand/orafee.

SIGMOD’10,June6–11,2010,Indianapolis,Indiana,USA. actual pairs of joined records.

Copyright2010ACM978-1-4503-0032-2/10/06...$10.00.

• We show how our set-similarity self-join algorithms can

be extended to answer set-similarity R-S join queries.

• We present strategies for exceptional situations where,

evenifweusethefinest-granularitypartitioningmethod,

the data that needs to be held in the main memory of

one node is too large to fit.

The rest of the paper is structured as follows. In Sec-

tion 2 we introduce the problem and present the main idea

of our algorithms. In Section 3 we present set-similarity

join algorithms for the self-join case, while in Section 4 we

show how the algorithms can be extended to the R-S join Figure 1: Data flow in a MapReduce computation.

case. Next, in Section 5, we present strategies for handling

the insufficient-memory case. A performance evaluation is

presented in Section 6. Finally, we discuss related work in of the input data. The (key, value) pairs output by each

Section 7 and conclude in Section 8. A longer technical re- mapfunctionarehash-partitionedonthekey. Foreachpar-

port on this work is available in [26]. titionthepairsaresortedbytheirkeyandthensentacross

the cluster in a shuffle phase. At each receiving node, all

thereceivedpartitionsaremergedinasortedorderbytheir

2. PRELIMINARIES

key. All the pair values that share a certain key are passed

In this work we focus on the following set-similarity join to a single reduce call. The output of each reduce function

application: identifyingsimilarrecordsbasedonstringsim- is written to a distributed file in the DFS.

ilarity. Ourresultscanbegeneralizedtootherset-similarity Besidesthemapandreducefunctions,theframeworkalso

join applications. allows the user to provide a combine function that is ex-

Problem statement: Giventwofilesofrecords,R andS, ecuted on the same nodes as mappers right after the map

a set-similarity function, sim, and a similarity threshold τ, functionshavefinished. Thisfunctionactsasalocalreducer,

we define the set-similarity join of R and S on R.a and S.a operatingonthelocal(key, value)pairs. Thisfunctional-

as finding and combining all pairs of records from R and S lows the user to decrease the amount of data sent through

where sim(R.a,S.a)≥τ. the network. The signature of the combine function is:

Wemapstringsintosetsbytokenizingthem. Examplesof

tokensarewordsorq-grams(overlappingsub-stringsoffixed combine (k2,list(v2)) → list(k2,list(v2)).

length). For example, the string“I will call back”can

Finally,theframeworkalsoallowstheusertoprovideinitial-

be tokenized into the word set [I, will, call, back]. In

izationandtear-downfunctionforeachMapReducefunction

ordertomeasurethesimilaritybetweenstrings,weuseaset-

andcustomizehashingandcomparisonfunctionstobeused

similarity function such as Jaccard or Tanimoto coefficient,

cosinecoefficient,etc.1. Forexample,theJaccardsimilarity when partitioning and sorting the keys. From Figure 1 one

can notice the similarity between the MapReduceapproach

function for two sets x and y is defined as: jaccard(x,y)=

and query-processingtechniques for parallel DBMS [8, 21].

|x∩y|. Thus,theJaccardsimilaritybetweenstrings“I will

|x∪y|

call back”and“I will call you soon”is 3 =0.5. 2.2 ParallelSet-SimilarityJoins

6

In the remainder of the section, we provide an introduc-

Oneofthemainissueswhenansweringset-similarityjoins

tion to the MapReduce paradigm, present the main idea of

usingtheMapReduceparadigm,istodecidehowdatashould

our parallel set-similarity join algorithms, and provide an

be partitioned and replicated. The main idea of our al-

overviewoffilteringmethodsfordetectingset-similarpairs.

gorithms is the following. The framework hash-partitions

2.1 MapReduce the dataacrossthenetworkbasedonkeys; dataitems with

the same key are grouped together. In our case, the join-

MapReduce [7] is a popular paradigm for data-intensive attribute value cannot be directly used as a partitioning

parallel computation in shared-nothing clusters. Example key. Instead,weuse(possiblymultiple)signaturesgenerated

applications for the MapReduce paradigm include process- from the value as partitioning keys. Signatures are defined

ingcrawleddocuments,Web requestlogs,etc. Inthe open- suchthatsimilarattributevalueshaveatleastonesignature

sourcecommunity,Hadoop[1]isapoplarimplementationof incommon. Possibleexamplesignaturesinclude: thelistof

this paradigm. In MapReduce, data is initially partitioned wordtokensofastringandrangesofsimilarstringlengths.

acrossthe nodesof a clusterandstoredin a distributedfile For instance, the string“I will call back”would have 4

system (DFS). Data is represented as (key, value) pairs. word-based signatures:“I”,“will”,“call”, and“back”.

The computationis expressed using two functions: We divide the processing into three stages:

map (k1,v1) → list(k2,v2); • Stage 1: Computes data statistics in order to generate

reduce (k2,list(v2)) → list(k3,v3). good signatures. The techniques in later stages utilize

these statistics.

Figure 1 shows the data flow in a MapReduce computa- • Stage 2: ExtractstherecordIDs(“RID”)andthe join-

tion. Thecomputationstartswithamapphaseinwhichthe

attributevaluefromeachrecordanddistributestheRID

map functions are applied in parallel on different partitions

andthejoin-attributevaluepairssothatthepairsshar-

1The techniques described in this paper can also be used ing a signature go to at least one common reducer. The

for approximate string search using the edit or Levenshtein reducerscomputethesimilarityofthejoin-attributeval-

distance [13]. ues and output RID pairs of similar records.

• Stage 3: Generates actual pairs of joined records. It unbalanced workload due to token-frequency skew. Each

uses the list of RID pairs from the secondstage and the grouprepresentsasetofcandidatesthatarecrosspairedand

original data to build the pairs of similar records. verified. The third stage uses the list of similar-RID pairs

An alternative to using the second and third stages is to and the original data to generate pairs of similar records.

use one stage in which we let key-value pairs carry com-

3.1 Stage1: TokenOrdering

plete records, instead of projecting records on their RIDs

and join-attribute values. We implemented this alternative We consider two methods for ordering the tokens in the

and noticed a much worse performance, so we do not con- firststage. Bothapproachestakeasinputtheoriginalrecords

sider this option in this paper. andproducealistoftokensthatappearinthejoin-attribute

value ordered increasingly by frequency.

2.3 Set-SimilarityFiltering

3.1.1 BasicTokenOrdering(BTO)

Efficient set-similarity join algorithms rely on effective

filters, which can decrease the number of candidate pairs Ourfirstapproach,calledBasicTokenOrdering(“BTO”),

whosesimilarityneedstobeverified. Inthepastfewyears, relies on two MapReduce phases. The first phase computes

there have been several studies involving a technique called the frequency of each token and the second phase sorts the

prefix filtering [6, 4, 29], which is based on the pigeonhole tokens based on their frequencies. In the first phase, the

principle and works as follows. The tokens of strings are map function gets as input the original records. For each

ordered based on a global token ordering. For each string, record, the function extracts the value of the join attribute

we define its prefix of length n as the first n tokens of the and tokenizes it. Each token produces a (token, 1) pair.

ordered set of tokens. The required length of the prefix de- Tominimizethenetworktrafficbetweenthemapandreduce

pends on the size of the token set, the similarity function, functions, we use a combine function to aggregates the 1’s

andthe similaritythreshold. For example, given the string, output by the map function into partial counts. Figure 2(a)

s,“I will call back”andtheglobaltokenordering{back, showsthedataflowforanexampledataset,self-joinedonan

call, will, I},theprefixoflength2ofsis[back, call]. attributecalled“a”. Inthefigure,fortherecordwithRID1,

Theprefixfilteringprinciplestatesthatsimilarstringsneed thejoin-attributevalueis“A B C”,whichistokenizedas“A”,

toshareat leastone commontoken in their prefixes. Using “B”, and“C”. Subsequently, the reduce function computes

thisprinciple,recordsofonerelationareorganizedbasedon thetotalcountforeachtokenandoutputs(token, count)

the tokens in their prefixes. Then, using the prefix tokens pairs, where“count”is the total frequency for the token.

of the records in the second relation, we can probe the first ThesecondphaseusesMapReducetosortthepairsofto-

relation and generate candidate pairs. The prefix filtering kensandfrequenciesfromthefirstphase. Themapfunction

principle gives a necessary condition for similar records, so swapstheinputkeysandvaluessothattheinputpairsofthe

the generated candidate pairs need to be verified. A good reducefunctionare sortedbasedon their frequencies. This

performancecanbeachievedwhentheglobaltokenordering phaseusesexactlyonereducersothattheresultisatotally

correspondstotheirincreasingtoken-frequencyorder,since ordered list of tokens. The pseudo-code of this algorithm

fewer candidate pairs will be generated. and other algorithms presented is available in [26].

Astate-of-the-artalgorithmintheset-similarityjoinliter-

3.1.2 UsingOnePhasetoOrderTokens(OPTO)

atureisthePPJoin+techniquepresentedin[29]. Itusesthe

prefixfilteralongwithalengthfilter(similarstringsneedto An alternative approach to token ordering is to use one

have similar lengths [3]). It also proposed two other filters: MapReducephase. Thisapproach,calledOne-PhaseToken

apositionalfilterandasuffixfilter. ThePPJoin+technique Ordering(“OPTO”),exploitsthefactthatthelistoftokens

provides a good solution for answering such queries on one couldbemuchsmallerthantheoriginaldatasize. Insteadof

node. One of our approaches is to use PPJoin+ in parallel using MapReduce to sort the tokens, we can explicitly sort

on multiple nodes. the tokens in memory. We use the same map and combine

functionsasinthefirstphaseoftheBTOalgorithm. Similar

toBTOweuseonlyonereducer. Figure2(b)showsthedata

3. SELF-JOINCASE

flow of this approach for our example dataset. The reduce

Inthissectionwepresenttechniquesfortheset-similarity function in OPTO gets as input a list of tokens and their

self-join case. As outlined in the previous section, the solu- partial counts. For each token, the function computes its

tion is divided into three stages. The first stage builds the total count and stores the information locally. When there

globaltokenordering necessarytoapplytheprefix-filter.2 It isnomoreinputforthereducefunction,thereducercallsa

scans the data, computes the frequency of each token, and tear-downfunctiontosortthetokensbasedontheircounts,

sorts the tokens based on frequency. The second stage uses and to output the tokens in an increasing order of their

theprefix-filteringprincipletoproducealistofsimilar-RID counts.

pairs. The algorithm extracts the RID and join-attribute

value of each record, and replicates and re-partitions the 3.2 Stage2: RID-PairGeneration

recordsbasedontheirprefixtokens. TheMapReduceframe-

The second stage of the join, called“Kernel”, scans the

work groups the RID and join-attribute value pairs based

original input data and extracts the prefix of each record

ontheprefixtokens. Itisworthnotingthatusingtheinfre-

usingthetokenordercomputedbythefirststage. Ingeneral

quent prefix tokens to redistribute the data helps us avoid

the list of unique tokens is much smaller and grows much

2An alternative would be to apply the length filter. We slowerthanthelistofrecords. Wethusassumethatthelist

exploredthisalternativebuttheperformancewasnotgood of tokens fits in memory. Based on the prefix tokens, we

because it suffered from the skewed distribution of string extracttheRIDandthejoin-attributevalueofeachrecord,

lengths. and distribute these record projections to reducers. The

(a) Basic Token Ordering (BTO) (b) One-Phase Token Ordering

(OPTO)

Figure 2: Example data flow of Stage 1. (Token ordering for a self-join on attribute“a”.)

join-attributevaluesthatshareatleastoneprefixtokenare retrieves the original records one by one, and extracts the

verified at a reducer. RID and the join-attribute value for each record. It tok-

Routing Records to Reducers. We first take a look enizes the join attribute and reorders the tokens based on

at two possible ways to generate (key, value) pairs in the their frequencies. Next, the function computes the prefix

map function. (1) Using Individual Tokens: This method length and extracts the prefix tokens. Finally, the function

treats each token as a key. Thus, for each record, we would useseithertheindividualtokensorthegroupedtokensrout-

generate a (key, value) pair for each of its prefix tokens. ingstrategytogeneratetheoutputpairs. Figure3(a)shows

Thus,arecordprojectionisreplicatedasmanytimesasthe the data flow for our example dataset using individual to-

numberofitsprefixtokens. Forexample,iftherecordvalue kens to do the routing. The prefix tokens of each value are

is“A B C D”and the prefix tokens are“A”,“B”, and“C”, we in bold face. The record with RID 1 has prefix tokens“A”

would output three (key, value) pairs, corresponding to and“B”, so its projection is output twice.

the three tokens. In the reducer, as the values get grouped In the reduce function, for each pair of record projec-

byprefixtokens,allthevaluespassedinareducecallshare tions, the reducer applies the additional filters (e.g., length

the same prefix token. filter, positionalfilter, andsuffixfilter)andverifiesthe pair

(2) Using Grouped Tokens: This method maps multiple if it survives. If a pair passes the similarity threshold, the

tokens to one synthetic key, thus can map different tokens reducer outputs RID pairs and their similarity values.

tothesamekey. Foreachrecord,themapfunctiongenerates

one (key, value) pair for each of the groups of the prefix 3.2.2 IndexedKernel(PK)

tokens. In our running example of a record“A B C D”, if AnotherapproachonfindingRIDpairsofsimilarrecords

tokens“A”and“B”belong to one group (denoted by“X”), is to use existing set-similarity join algorithms from the

and token“C”belongs to another group (denoted by“Y”), literature [23, 3, 4, 29]. Here we use the PPJoin+ algo-

weoutputtwo(key, value)pairs,oneforkey“X”andone rithmfrom[29]. WecallthisapproachthePPJoin+Kernel

for key“Y”. Two records that share the same token group (“PK”).

do not necessarily share any prefix token. Continuing our Using this method, the map function is the same as in

running example, for record“E F G”, if its prefix token“E” the BK algorithm. Figure 3(b) shows the data flow for our

belongs to group“Y”, then the records“A B C D”and“E F exampledatasetusinggroupedtokenstodotherouting. In

G”share token group“Y”but do not share any prefix token. the figure, the record with RID 1 has prefix tokens“A”and

So, in the reducer, as the values get groupedby their token “B”,whichbelongtogroups“X”and“Y”,respectively. Inthe

group,notwovaluesshareaprefixtoken. Thismethodcan reducefunction,weusethePPJoin+algorithmtoindexthe

help us have fewer replications of record projections. One data,applyallthefilters,andoutputtheresultingpairs. For

way to define the token groups in order to balance data each input record projection, the function first probes the

across reducers is the following. We use the token ordering index using the join-attribute value. The probe generates a

producedin the first stage, and assign the tokens to groups listofRIDsofrecordsthataresimilartothecurrentrecord.

inaRound-Robinorder. Inthiswaywebalancethesumof The current record is then added to the index as well.

token frequencies across groups. We study the effect of the The PPJoin+ algorithm achieves an optimized memory

number of groups in Section 6. For both routing strategies, footprintbecausetheinputstringsaresortedincreasinglyby

since two records might share more that one prefix token, their lengths [29]. This works in the following way. The in-

the same pair may be verified multiple times at different dexknowsthelowerboundonthelengthoftheunseendata

reducers, thus it could be output multiple times. This is elements. Using this bound and the length filter, PPJoin+

dealt with in the third stage. discards from the index the data elements below the min-

imum length given by the filter. In order to obtain this

3.2.1 BasicKernel(BK)

ordering of data elements, we use a composite MapReduce

In our first approach to finding the RID pairs of simi- keythatalsoincludesthelengthofthejoin-attributevalue.

lar records,called BasicKernel (“BK”), eachreducer uses a Weprovidetheframeworkwithacustompartitioningfunc-

nested loop approach to compute the similarity of the join- tionsothatthepartitioningisdoneonlyonthegroupvalue.

attributevalues. Beforethemapfunctionsbegintheirexecu- In this way, when data is transferred from map to reduce,

tions,aninitializationfunctioniscalledtoloadtheordered it gets partitioned just by group value, and is then locally

tokens produced by the first stage. The map function then sorted on both group and length.

(a)BasicKernel(BK)usingindividualtokensforrout- (b) PPJoin+ Kernel (PK) using grouped tokens for

ing routing

Figure 3: Example data flow of Stage 2. (Kernel for a self-join on attribute“a”.)

3.3 Stage3: RecordJoin the two records, appends their similarity, and outputs the

In the final stage of our algorithm, we use the RID pairs constructed pair. In Figure 4, the output of the second set

generatedinthesecondstagetojointheirrecords. Wepro- of mappers contains two (key, value) pairs with the RID

posetwoapproachesforthisstage. Themainideaistofirst pair(1,21)asthekey,onecontainingrecord1andtheother

fill in the record information for each half of the pair and containing record 21. They are grouped in a reducer that

then use the two halves to build the full record pair. The outputs the pair of records (1,21).

twoapproachesdifferinthewaythelistofRIDpairsispro-

3.3.2 One-PhaseRecordJoin(OPRJ)

vided as input. In the first approach, called Basic Record

Join (“BRJ”), the list of RID pairs is treated as a normal ThesecondapproachtorecordjoinusesonlyoneMapRe-

MapReduceinput,andisprovidedasinputtothemapfunc- duce phase. Instead of sending the RID pairs through the

tions. Inthesecondapproach,calledOne-PhaseRecordJoin MapReduce pipeline to group them with the records in the

(“OPRJ”), the list is broadcast to all the maps and loaded reducephase(aswedointheBRJapproach),webroadcast

beforereadingtheinputdata. DuplicateRIDpairsfromthe andloadtheRIDpairsateachmapfunctionbeforetheinput

previous stage are eliminated in this stage. data is consumed by the function. The map function then

getstheoriginalrecordsasinput. Foreachrecord,thefunc-

tion outputsas many (key, value)pairs as the number of

3.3.1 BasicRecordJoin(BRJ)

RID pairs containing the RID of the current record. The

The Basic Record Join algorithm uses two MapReduce output key is the RID pair. Essentially, the output of the

phases. In the first phase, the algorithm fills in the record map function is the same as the output of the reduce func-

information for each half of each pair. In the second phase,

tion in the first phase of the BRJ algorithm. The idea of

it brings together the half-filled pairs. The map function in

joiningthedatainthemapperswasalsousedin[10]forthe

thefirstphasegets asinputboththe set of originalrecords case of equi-joins. The reduce function is the same as the

andtheRIDpairsfromthesecondstage. (Thefunctioncan reduce function in the second phase of the BRJ algorithm.

differentiate between the two types of inputs by looking at

Figure 5 shows the data flow for our example dataset. In

the input file name.) For each original record, the function

the figure, the first mapper gets as input the record with

outputsa(RID, record)pair. ForeachRIDpair,itoutputs RID1andoutputsone(key, value)pair,wherethekeyis

two(key, value)pairs. ThefirstpairusesthefirstRIDas

theRIDpair(1,21)andthevalueisrecord1. Ontheother

itskey,whilethesecondpairusesthesecondRIDasitskey.

hand, the third mapper outputs a pair with the same key,

Both pairs have the entire RID pair and their similarity as

and the value is the record 21. The two pairs get grouped

their value. Figure 4 shows the data flow for our example

in the second reducer, where the pair of records (1,21) is

dataset. Inthefigure,thefirsttwomapperstakerecordsas

output.

their input, while the third mapper takes RID pairs as its

input. (Mappersdonotspanacrossfiles.) FortheRIDpair

4. R-SJOINCASE

(2,11), the mapper outputs two pairs, one with key 2 and

one with key 11. In Section 3 we described how to compute set-similarity

Thereducefunctionofthefirstphasethenreceivesalist self-joins using the MapReduce framework. In this section

ofvaluescontainingexactlyonerecordandotherRIDpairs. wepresentoursolutionsfortheset-similarityR-Sjoinscase.

For each RID pair, the function outputs a (key, value) We highlightthe differences between the two cases and dis-

pair, where the key is the RID pair, and the value is the cussanoptimizationforcarefullycontrollingmemoryusage

record itself and the similarity of the RID pair. Continuing in the second stage.

ourexampleinFigure4,forkey2,thefirstreducergetsthe Themaindifferencesbetweenthetwojoincasesareinthe

record with RID 2 and one RID pair (2,11), and outputs secondandthethirdstageswherewehaverecordsfromtwo

one(key, value)pairwiththeRIDpair(2,11)asthekey. datasetsastheinput. Dealingwiththebinaryjoinoperator

Thesecondphaseusesanidentitymapthatdirectlyout- ischallenginginMapReduce,astheframeworkwasdesigned

putsitsinput. Thereducefunctionthereforegetsasinput, toonlyacceptasingleinputstream. Asdiscussedin[10,21],

for each key (which is a RID pair), a list of values contain- inordertodifferentiatebetweentwodifferentinputstreams

ingexactlytwoelements. Eachelementconsistsofarecord inMapReduce,weextendthekeyofthe(key, value)pairs

andacommonsimilarityvalue. Thereducerformsapairof so that it includes a relation tag for each record. We also

Figure 4: Example data flow of Stage 3 using Basic Record Join (BRJ) for a self-join case. “a1” and “a2”

correspond to the original attribute“a”, while“b1”and“b2”correspond to attribute“b”.

Figure 5: Example data flow of Stage 3 using One-Phase Record Join (OPRJ) for a self-join case. “a1”and

“a2”correspond to the original attribute“a”while,“b1”and“b2”correspond to attribute“b”.

modifythepartitioningfunctionsothatpartitioningisdone

on the part of the key that does not include the relation

name. (However, the sorting is still done on the full key.)

We now explain the three stages of an R-S join.

Stage 1: Token Ordering. In the first stage, we use

the same algorithms as in the self-join case, only on the

relationwithfewerrecords,sayR. Inthesecondstage,when

tokenizing the other relation, S, we discard the tokens that

do not appear in the token list, since they cannot generate

candidate pairs with R records.

Stage 2: Basic Kernel. First, the mappers tag the

record projections with their relation name. Thus, the re- Figure 6: Example of the order in which records

ducers receive a list of record projections grouped by re- need to arrive at the reducer in the PK kernel of

lation. In the reduce function, we then store the records the R-S join case, assuming that for each length, l,

from the first relation (as they arrive first), and stream the the lower-boundis l−1 and the upper-boundis l+1.

records from the second relation (as they arrive later). For

each record in the second relation, we verify it against all

the records in the first relation.

creasingbytheirlengths. PPJoin+onlyconsideredthisim-

Stage2: IndexedKernel. Weusethesamemappersas

provementforself-joins. ForR-Sjoins,thechallengeisthat

fortheBasicKernel. Thereducersindextherecordprojec-

weneedtomakesurethatwefirststreamalltherecordpro-

tionsofthefirstrelationandprobetheindexfortherecord

jections from R that might join with a particular S record

projections of the second relation.

before we stream this record. Specifically, given the length

Asintheself-joincase,wecanimprovethememoryfoot-

of a set, we can define a lower-bound and an upper-bound

print of the reduce function by having the data sorted in-

on the lengths of the sets that might join with it [3]. Be-

fore we stream a particular record projection from S, we

need to have seen all the record projections from R with a

lengthsmallerthanorequaltotheupper-bound lengthofthe

recordfromS. We forcethisarrivalorderby extendingthe

keys with a length class assigned in the following way. For

records from S, the length class is their actual length. For

records from R, the length class is the lower-bound length

correspondingtotheirlength. Figure6showsanexampleof

theorderinwhichrecordswillarriveatthereducer,assum-

ing that for each length l, the lower-bound is l−1 and the

upper-boundisl+1. Inthefigure,therecordsfromRwith

length 5 get length class 4 and are streamed to the reducer

before those records from S with lengths between [4,6].

Stage3: RecordJoin. FortheBRJalgorithmthemap-

pers first tag their outputs with the relation name. Then,

thereducersgetarecordandtheircorrespondingRIDpairs (a) Map-based block (b) Reduce-based block

groupedbyrelationandoutputhalf-filledpairstaggedwith processing processing

the relation name. Finally, the second-phase reducers use

the relation name to build record pairs having the record

Figure 7: Data flow in the reducer for two block

form R first and the record form S second. In the OPRJ

processing approaches.

algorithm, for each input record from R, the mappers out-

putasmany(key,value)pairsasthenumberofRIDpairs

containingtherecord’sRIDintheRcolumn(andsimilarfor

After that, the next two blocks, B and C, are read from

S records). For each pair, the key is the RID pair plus the

theinputstreamandjoinedwithA. Finally,Aisdiscarded

relationname. Thereducersproceedasdothesecond-phase

frommemoryandtheprocesscontinuesforblocksB andC.

reducers for the BRJ algorithm.

In order to achieve the interleaving and replications of the

blocks, the map functiondoes the following. For each (key,

5. HANDLINGINSUFFICIENTMEMORY

value)outputpair,thefunctiondeterminesthepair’sblock,

AswesawinSection3.2,reducersinthesecondstagere- andoutputsthepairasmanytimesastheblockneedstobe

ceive as input a list of record projections to be verified. In replicated. Every copy is output with a different composite

the BK approach, the entire list of projections needs to fit key, which includes its position in the stream, so that after

in memory. (For the R-S join case, only the projections of sortingthepairs,theyareintherightblocksandtheblocks

one relation must fit.) In the PK approach, because we are are in the right order.

exploitingthelengthfilter,onlythefragmentcorresponding Reduce-Based Block Processing. In this approach,

toacertainlengthrangeneedstofitinmemory. (FortheR- the map function sends each block exactly once. On the

Sjoincase,onlythefragmentbelongingtoonlyonerelation otherhand,thereducefunctionneedstostorealltheblocks

must fit.) It is worth noting that we already decreased the except the first one on its local disk, and reload the blocks

amountofmemoryneededbygroupingtherecordsonthein- laterfromthediskforjoining. Figure7(b)showsanexample

frequent prefix tokens. Moreover, we can exploit the length ofhowblocksareprocessedinthereducerforthesamethree

filter even in the BK algorithm, by using the length filter blocksA,B,andC. Inthefirststep,blockAisloadedinto

asasecondaryrecord-routingcriterion. Inthisway,records memory and self-joined. After that, the next two blocks, B

areroutedontoken-length-basedkeys. Theadditionalrout- and C, are read from the input stream and joined with A

ingcriterionpartitionsthedataevenfurther,decreasingthe and also stored on the local disk. In the second step, A is

amountofdatathatneedstofitinmemory. Thistechnique discarded from memory and block B is read from disk and

can be generalized and additional filters can be appended self-joined. Then, block C is read from the disk and joined

to the routing criteria. In this section we present two ex- with B. The process ends with reading C from disk and

tensions of our algorithms for the case where there are just self-joining it.

no more filters to be used but the data still does not fit in Handling R-S Joins. In the R-S join case the reduce

memory. The challenge is how to compute the cross prod- functionneedstodealwithapartitionfromRthatdoesnot

uct of a list of elements in MapReduce. We sub-partition fit in memory, while it streams a partition coming from S.

thedatasothateachblockfitsinmemoryandproposetwo We only need to sub-partitionthe R partition. The reduce

approachesforprocessingtheblocks. Firstwelookhowthe function loads one block from R into memory and streams

twomethodsworkintheself-joincaseandthendiscussthe the entire S partition against it. In the map-based block

differences for the R-S join case. processingapproach,theblocksfromRareinterleavedwith

Map-Based Block Processing. In this approach, the multiplecopiesoftheSpartition. Inthereduce-basedblock

map function replicates the blocks and interleaves them in processing approach all the R blocks (except the first one)

the order they will be processed by the reducer. For each andtheentireS partitionarestoredandreadfromthelocal

block sent by the map function, the reducer either loads the disk later.

blockinmemoryorstreamstheblockagainstablockalready

loaded in memory. Figure 7(a) shows an example of how

6. EXPERIMENTALEVALUATION

blocksareprocessedinthereducer,inwhichthedataissub-

partitionedintothreeblocksA,B,andC. Inthefirststep, In this section we describe the performance evaluation of

the first block, A, is loaded into memory and self-joined. the proposed algorithms. To understand the performance

of parallel algorithms we need to measure absolute running fertotheincreaseddatasetsas“DBLP×n”or“CITESEERX

time as well as relative speedup and scaleup [8]. ×n”, where n ∈ [5,25] and represents the increase factor.

We ran experiments on a 10-node IBM x3650 cluster. For example,“DBLP×5”represents the DBLP dataset in-

Each node had one Intel Xeon processor E5520 2.26GHz creased five times.

withfourcores,12GBofRAM,andfour300GBharddisks. Before starting each experiment we balanced its input

Thus the cluster consists of 40 cores and 40 disks. We used datasets across the ten nodes in HDFS and the four hard

an extra node for running the master daemons to manage drives of each node in the following way. We formatted the

the Hadoop jobs and the Hadoop distributed file system. distributed file system before each experiment. We created

On each node, we installed the Ubuntu 9.04, 64-bit, server an identity MapReduce job with as many reducers running

editionoperatingsystem,Java1.6witha64-bitserverJVM, in parallel as the number of hard disks in the cluster. We

andHadoop0.20.1. Inordertomaximizetheparallelismand exploited the fact that reducers write their output data to

minimize the running time, we made the following changes the local node and also the fact that Hadoop chooses the

to the default Hadoop configuration: we set the block size disk to write the data using a Round-Robinorder.

of the distributed file system to 128MB, allocated 1GB of For all the experiments, we tokenized the data by word.

virtualmemorytoeachdaemonand2.5GBofvirtualmem- We usedtheconcatenationof thepapertitle andthe listof

orytoeachmap/reducetask,ranfourmapandfourreduce authorsasthejoinattribute,andusedtheJaccardsimilarity

tasks in parallel on each node, set the replication factor to functionwithasimilaritythresholdof0.80. The0.80thresh-

1, and disabled the speculative task execution feature. old is usually the lower bound on the similarity threshold

We used the following two datasets and increased their usedintheliterature[3,6,29],andhighersimilaritythresh-

sizes as needed: olds decreased the running time. A more complete set of

• DBLP3 It had approximately 1.2M publications. We figures for the experiments is contained in [26]. The source

preprocessedtheoriginalXMLfilebyremovingthetags, code is available at http://asterix.ics.uci.edu/.

and output one line per publication that contained a

6.1 Self-Join Performance

unique integer (RID), a title, a list of authors, and the

restofthecontent(publicationdate,publicationjournal We did a self-join on the DBLP×n datasets, where n ∈

or conference, and publication medium). The average [5,25]. Figure 8 shows the total running time of the three

length of a record was 259 bytes. A copy of the entire stagesonthe10-nodeclusterfordifferentdatasetsizes(rep-

dataset has around 300MB before we increased its size. resented by the factor n). The running time consisted of

Itisworthnotingthatwedidnotcleantherecordsbefore the times of the three stages: token ordering, kernel, and

running our algorithms, i.e., we did not remove punctu- record join. For each dataset size, we used three combina-

ations or change the letter cases. We did the cleaning tionsoftheapproachesofthestages. Forexample,“1-BTO”

inside our algorithms. means we use BTO in the first stage. The second stage is

• CITESEERX4 It had about 1.3M publications. We the most expensive step, and its time increased the fastest

with the increase of the dataset size. The best algorithm is

preprocessed the original XML file in the same way as

BTO-PK-OPRJ, i.e., with BTO for the first stage, PK for

for the DBLP dataset. Each publication included an

the second stage, and OPRJ for the third stage. This com-

abstractandURLstoitsreferences. Theaveragelength

bination could self-join 25 times the original DBLP dataset

of a record was 1374 bytes, and the size of one copy of

in around 650 seconds.

the entire dataset is around 1.8GB.

Increasing Dataset Sizes. Toevaluateourparallelset- 6.1.1 Self-JoinSpeedup

similarity join algorithms on large datasets, we increased

In order to evaluate the speedup of the approaches, we

each dataset while maintaining its set-similarity join prop-

fixed the dataset size and varied the cluster size. Figure 9

erties. We maintained a roughly constant token dictionary,

showstherunningtimeforself-joiningtheDBLP×10dataset

andwantedthecardinalityofjoinresultstoincreaselinearly

on clusters of 2 to 10 nodes. We used the same three com-

withtheincreaseofthedataset. Increasingthedatasizeby

binations for the three stages. For each approach, we also

duplicatingitsoriginalrecordswouldonlypreservethetoken

show its ideal speedup curve (with a thin black line). For

dictionary-size,butwouldblowupthesizeofthejoinresult.

instance,iftheclusterhastwiceasmanynodesandthedata

Toachievethegoal,weincreasedthesizeofeachdatasetby

size does not change, the approach should be twice as fast.

generating new records as follows. We first computed the

In Figure 10 we show the same numbers, but plotted on a

frequencies of the tokens appearing in the title and the list

“relative scale”. That is, for each cluster size, we plot the

of authors in the original dataset, and sorted the tokens in

ratio between the running time for the smallest cluster size

their increasingorder of frequencies. For each record in the

and the running time of the current cluster size. For exam-

original dataset, we created a new record by replacing each

ple, for the 10-node cluster, we plot the ratio between the

token in the title or the list of authors with the token after

runningtimeonthe2-nodeclusterandtherunningtimeon

itinthetokenorder. Forexample,ifthetokenorderis(A,

the10-nodecluster. Wecanseethatallthreecombinations

B, C, D, E, F) and the original record is“B A C E”, then

havesimilarspeedupcurves,butnoneofthemspeeduplin-

thenewrecordis“C B D F.”Weevaluatedthecardinalityof

early. In all the settings the BTO-PK-OPRJ combination

the join result after increasing the dataset in this manner,

is the fastest (Figure 9). In the following experiments we

andnoticeditindeedincreasedlinearlywiththeincreaseof

looked at the speedup characteristics for each of the three

the dataset size.

stages in order to understand the overall speedup. Table 1

Weincreasedthesizeofeachdataset5to25times. Were-

shows the running time for each stage in the self-join.

3http://dblp.uni-trier.de/xml/dblp.xml.gz Stage 1: Token Ordering. WecanseethattheOPTO

4http://citeseerx.ist.psu.edu/about/metadata approach was the fastest for the settings of 2 nodes and 4

800

600

400

200

0

5 10 25

Dataset Size (times the original)

)sdnoces(

emiT

1-BTO 1200

2-BK

3-BRJ 1000

2-PK

3-OPRJ 800

600

400

200

0

2 3 4 5 6 7 8 9 10

# Nodes

Figure 8: Running time for self-

joiningDBLP×ndatasets(where

n∈[5,25]) on a 10-node cluster.

)sdnoces(

emiT

5

BTO-BK-BRJ

BTO-PK-BRJ 4

BTO-PK-OPRJ

Ideal

3

2

1

2 3 4 5 6 7 8 9 10

# Nodes

Figure 9: Running time for self-

joining the DBLP×10 dataset on

different cluster sizes.

emiT

weN

/

emiT

dlO

=

pudeepS

BTO-BK-BRJ

BTO-PK-BRJ

BTO-PK-OPRJ

Ideal

Figure10: Relativerunningtime

for self-joining the DBLP×10

datasetondifferentclustersizes.

Stage Alg. # Nodes pairs that join, which affected the workload balance. For

2 4 8 10 analysis purposes, we computed the frequency of each RID

1 BTO 191.98 125.51 91.85 84.02 appearinginatleastoneRIDpair. OntheaverageanRID

OPTO 175.39 115.36 94.82 92.80 appeared on 3.74 RID pairs, with a standard deviation of

2 BK 753.39 371.08 198.70 164.57 14.85andamaximumof187. Additionally,wecountedhow

PK 682.51 330.47 178.88 145.01 many recordswere processedby each reduceinstance. The

3 BRJ 255.35 162.53 107.28 101.54 minimum number of recordsprocessedin the 10-nodescase

OPRJ 97.11 74.32 58.35 58.11 was81,662andthemaximumwas90,560,withanaverageof

87,166.55 and a standard deviation of 2,519.30. No matter

Table 1: Running time (seconds) of each stage for howmanynodesweaddedtothecluster,asingleRIDcould

self-joining the DBLP×10 dataset on different clus- notbeprocessedbymorethanonereduceinstance,andall

ter sizes the reducers had to wait for the slowest one to finish.

The speedup of the OPRJ approach was limited because

in the OPRJ approach, the list of RID pairs that joined

was broadcast to all the maps where they must be loaded

nodes. For the settings of 8 nodes and 10 nodes, the BTO

in memory and indexed. The elapsed time required for this

approachbecamethefastest. Theirlimitedspeedupwasdue

remained constant as the number of nodes increased. Ad-

totwomainreasons. (1)Asthenumberofnodesincreased,

ditional information about the total amount of data sent

theamountofinputdatafedtoeachcombinerdecreased. As

between map and reduce for each stage is included in [26].

thenumberofnodesincreased,moredatawassentthrough

the network and more data got merged and reduced. (A 6.1.2 Self-JoinScaleup

similar pattern was observed in [10] for the case of general

aggregations.) (2)Thefinaltokenorderingwasproducedby

only one reducer, and this step’s cost remained constant as 1200

the number of nodes increased. The speedup of the OPTO

1000 approach was even worse since as the number of nodes in-

creased, the extra data that was sent through the network 800

had to be aggregated at only one reducer. Because BTO

600

was the fastest for settings of 8 nodes and 10 nodes, and it

spedupbetterthanOPTO,weonlyconsideredBTOforthe 400

end-to-end combinations. 200

Stage 2: Kernel. For the PK approach, an important

0

factor affecting the running time is the number of token

2 3 4 5 6 7 8 9 10

groups. We evaluated the running time for different num- # Nodes and Dataset Size

bersofgroups. Weobservedthatthebestperformancewas (times 2.5 x original)

achieved when there was one group per token. The reason

was that the reducefunctioncouldbenefitfrom the group-

ing conducted“for free”by the MapReduce framework. If

groupshadmorethanonetoken,theframeworkspendsthe

same amount of time on grouping, but the reducer benefits

less. Bothapproacheshadanalmostperfectspeedup. More-

over, in all the settings, the PK approachwas the fastest.

Stage3: RecordJoin. TheOPRJapproachwasalways

fasterthantheBRJapproach. Themainreasonforthepoor

speedup of the BRJ approach was due to skew in the RID

)sdnoces(

emiT

BTO-BK-BRJ

BTO-PK-BRJ

BTO-PK-OPRJ

Ideal

Figure 11: Running time for self-joining the

DBLP×n dataset (where n ∈ [5,25]) increased pro-

portionally with the increase of the cluster size.

Inordertoevaluatethescaleupoftheproposedapproaches

we increased the dataset size and the cluster size together

by the same factor. A perfect scaleup could be achieved if

the running time remained constant. Figure 11 shows the

running time for self-joining the DBLP dataset, increased

from 5 to 25 times, on a cluster with 2 to 10 nodes, respec- size increased. The OPRJ approach did not scale up well

tively. We can see that the fastest combined algorithm was since the list of RID pairs that needed to be loaded and in-

BTO-PK-OPRJ. We can also see that all three combina- dexed by each map function increased linearly with the size

tions scaled up well. BTO-PK-BRJ had the best scaleup. of the dataset.

In the following, we look at the scaleup characteristics of

6.1.3 Self-JoinSummary

eachstage. Table2showstherunningtime(inseconds)for

each of the self-join stages. We have the following observations:

• For the first stage, BTO was the best choice.

Stage Alg. # Nodes/DatasetSize

• For the second stage, PK was the best choice.

2/x5 4/x10 8/x20 10/x25

• Forthethirdstage,thebestchoicedependsontheamount

1 BTO 124.05 125.51 127.73 128.84

of data and the size of the cluster. In our experiments,

OPTO 107.21 115.36 136.75 149.40

OPRJ was somewhat faster, but the cost of loading the

2 BK 328.26 371.08 470.84 522.88

similar-RID pairs in memory was constant as the the

PK 311.19 330.47 375.72 401.03

clustersizeincreased,andthecostincreasedasthedata

3 BRJ 156.33 162.53 166.66 168.08

sizeincreased. Forthesereasons,werecommendBRJas

OPRJ 60.61 74.32 102.44 117.15

a good alternative.

Table 2: Running time (seconds) of each stage for • The three combinations had similar speedups, but the

self-joining the DBLP×n dataset (n ∈ [5,25]) in- best scaleup was achieved by BTO-PK-BRJ.

creased proportionally with the increase of the clus- • Ouralgorithmsdistributedthedatawellinthefirstand

ter size. the second stages. For the third stage, the algorithms

were affected by the fact that some records produced

Stage 1: Token Ordering. We can see in Table 2 morejoinresultsthatothers,andtheamountofworkto

that the BTO approach scaled up almost perfectly, while be done was not well balanced across nodes. This skew

theOPTOapproachdidnotscaleupaswell. Moreover,the heavily depends on the characteristics of the data and

OPTOapproachbecamemoreexpensivethantheBTOap- we plan to study this issue in future work.

proach as the number of nodes increased. The reason why

6.2 R-SJoin Performance

theOPTOapproachdidnotscaleupaswellwasbecauseit

usedonlyonereduceinstancetoaggregatethetokencounts To evaluate the performance of the algorithms for the R-

(instead of using multiple reduce functions as in the BTO S-joincase,wedidajoinbetweentheDBLPandtheCITE-

case). Both approachesused a single reduceto sort the to- SEERX datasets. We increased both datasets at the same

kens by frequency. As the data increased, the time needed time by a factor between 5 and 25. Figure 12 shows the

to finish the one reduce function increased. runningtimeforthejoinona10-nodecluster. Weusedthe

Stage2: Kernel. WecanseethatthePKapproachwas same combinations for each stage as in the self-join case.

always faster and scaled up better than the BK approach. Moreover, the first stage was identical to the first stage of

To understand why the BK approachdid not scale up well, the self-join case, as this stage was run on only one of the

let us take a look at the complexity of the reducers. The datasets, in this case, DBLP. The running time for the sec-

reduce function was called for each prefix token. For each ond stage (kernel) increased the fastest compared with the

token, the reduce function received a list of record projec- other stages, but for the 5 and 10 dataset-increase factors,

tions, and had to verify the self-cross-product of this list. the third stage (record join) became the most expensive.

Moreover, a reducer processed a certain number of tokens. Themainreasonforthisbehavior,comparedtotheself-join

Thus,ifthelengthoftherecordprojectionslistisnandthe case,wasthatthisstagehadtoscantwodatasetsinsteadof

numberoftokensthateachreducerhastoprocessism,the one,andtherecordlengthoftheCITESEERXdatasetwas

complexity of each reducer is O(m·n2). As the dataset in- much larger than the record length of the DBLP dataset.

creases,thenumberofuniquetokensremainsconstant,but For the 25 dataset-increase factor, the OPRJ approach ran

the number of records having a particular prefix token in- outofmemorywhenitloadedthelistofRIDpairs,making

creased by the same factor as the dataset size. Thus, when BRJ the only option.

thedatasetisincreasedttimesthelengthoftherecordpro-

6.2.1 R-SJoinSpeedup

jections list increases t times. Moreover, as the number of

nodes increases t times, the number of tokens that each re- Asintheself-joincase,weevaluatedthespeedupoftheal-

ducerhastoprocessdecreasesttimes. Thus,thecomplexity gorithmsbykeepingthedatasetsizeconstantandincreasing

ofeachreducerbecomesO`m/t·(n·t)2 ´=O(t·m·n2). De- thenumberofnodesinthecluster. Figure13showstherun-

spite the fact the reduce function had a running time that ning times for the same three combinations of approaches.

grew proportional with the dataset size, the scaleup of the We can see that the BTO-PK-OPRJ combination was ini-

BK approach was still acceptable because the map function tially the fastest, but for the 10-node cluster, it became

scaled up well. In the case of PK, the quadratic increase slightly slower than the BTO-BK-BRJ and BTO-PK-BRJ

when the data linearly increased was alleviated because an combinations. Moreover,BTO-BK-BRJandBTO-PK-BRJ

index was used to decide which pairs are verified. sped up better than BTO-PK-OPRJ.

Stage 3: Record Join. We can see that the BRJ Tobetterunderstandthespeedupbehavior,welookedat

approach had an almost perfect scaleup, while the OPRJ eachindividualstage. Thefirststageperformancewasiden-

approach did not scale up well. For our 10-node cluster, tical to the first stage in the self-join case. For the second

theOPRJapproachwasfasterthantheBRJapproach,but stage we noticed a similar speedup (almost perfect) as for

OPRJcouldbecomeslowerasthenumberofnodesanddata the self-join case. Regarding the third stage, we noticed

2400

2000

1600

1200

800

400

0

5 10 25

Dataset Size (times the original)

)sdnoces(

emiT

4000

1-BTO

2-BK

3-BRJ 3000

2-PK

3-OPRJ

2000

1000

0

2 3 4 5 6 7 8 9 10

# Nodes

Figure 12: Running time for

joining the DBLP×n and the

CITESEERX×n datasets (where

n∈[5,25]) on a 10-node cluster.

)sdnoces(

emiT

3500

BTO-BK-BRJ 3000

BTO-PK-BRJ

BTO-PK-OPRJ 2500

Ideal

2000

1500

1000

500

0

2 3 4 5 6 7 8 9 10

# Nodes and Dataset Size

(times 2.5 x original)

Figure 13: Running time for

joining the DBLP×10 and the

CITESEERX×10 datasets on

different cluster sizes.

)sdnoces(

emiT

BTO-BK-BRJ

BTO-PK-BRJ

BTO-PK-OPRJ

Ideal

Figure 14: Running time for

joining the DBLP×n and the

CITESEERX×n datasets (where

n ∈ [5,25]) increased proportion-

ally with the cluster size.

that the OPRJ approach was initially the fastest (for the • For both self-join and R-S join cases, we recommend

2 and 4 node case), but it eventually became slower than BTO-PK-BRJ as a robust and scalable method.

the BRJ approach. Additionally, the BRJ approach sped

up better than the OPRJ approach. The poor performance

7. RELATEDWORK

oftheOPRJapproachwasduetothefactthatallthemap

instances had to load the list of RID pairs that join. Set-similarity joins on a single machine have been widely

studiedintheliterature[23,6,3,4,29]. Inverted-list-based

6.2.2 R-SJoinScaleup algorithms for finding pairs of strings that share a certain

number of tokens in common have been proposed in [23].

WealsoevaluatedthescaleupoftheR-Sjoinapproaches.

Later work has proposed various filters that help decrease

The evaluation was similar to the one done in the self-join

the number of pairs that need to be verified. The prefix

case. In Figure 14 we plot the running time of three com-

filter has been proposed in [6]. The length filter has been

binations for the three stages as we increased the dataset

studied in [3, 4]. Two other filters, namely the positional

size and the cluster size by the same factor. We can see

filter and the suffix filter, were proposed in [29]. In partic-

that BTO-BK-BRJ and BTO-PK-BRJ scaled up well. The

ular, for edit distance, two more filters based on mismatch

BTO-PK-BRJ combination scaled up the best. BTO-PK-

have been proposed in [28]. Instead of directly using the

OPRJ ran out of memory in the third stage for the case

tokens in the strings, the approach in [3] generates a set of

where the datasets were increased 8 times the original size.

signaturesbasedonthetokensinthestringandreliesonthe

The third stage ran out of memory when it tried to load in

fact that similar strings need to have a common signature.

memory the list of RID pairs that join. Before running out

Adifferentwayofformulatingset-similarityjoinproblemis

of memory, though, BTO-PK-OPRJ was the fastest.

toreturnpartialanswers,byusingtheideaoflocalitysensi-

To better understand the behavior of our approaches, we

tivehashing[12]. Itisworthnotingmostofworkdealswith

again analyzed the scaleup of each individual stage. The

valuesalreadyprojectedonthesimilarityattributeandpro-

first stage performance was identical with its counter-part

ducesonlythelistofRIDsthatjoin. Computingsuchalist

in the self-join case. Additionally, the second stage had a

is the goal of our second stage and most algorithms could

similar scaleup performance as its counterpart in the self-

successfully replace PPJoin+ in our second stage. To the

join case. Regarding the third stage, we observed that the

bestofourknowledge,thereisnopreviousworkonparallel

BRJ approach scaled up well. We also observed that even

set-similarity joins.

before running out of memory, the OPRJ approach did not

Paralleljoinalgorithmsforlargedatasetshadbeenwidely

scale up well, but for the case where it did not run our of

studied since the early 1980’s (e.g., [19, 24]). Moreover,

memory, it was faster than the BRJ approach.

the ideas presented in Section 5 bear resemblance with the

bucket-size-tuning ideas presented in [18]. Data partition

6.2.3 R-SJoinSummary

and replication techniques have been studied in [9] for the

We have the following observations: problem of numeric band joins.

• The recommendations for the best choice from the self- The MapReduce paradigm was initially presented in [7].

join case also hold for the R-S join case. Since then, it has gained a lot of attention in academia [30,

21, 10] and industry [2, 16]. In [30] the authors proposed

• The third stage of the join became a significant part of

extending the interface with a new function called“merge”

the execution due to the increased amount of data.

in order to facilitate joins. A comparison of the MapRe-

• Thethreealgorithmcombinationspreservedtheirspeedup duce paradigm with parallel DBMS has been done in [21].

and scaleup characteristicsas for the self-join case. Higher-levellanguagesontopofMapReducehavebeenpro-

• We also observed the same data distribution character- posed in [2, 16, 10]. All these languages could benefit from

istics as for the self-join case. the addition of a set-similarity join operator based on the

techniques proposed here. In the context of JAQL [16] a [13] L. Gravano, P. G. Ipeirotis, H. V. Jagadish,

tutorial on fuzzy joins was presented in [17]. N. Koudas, S. Muthukrishnan,and D. Srivastava.

Approximate string joins in a database (almost) for

8. CONCLUSIONS free. In VLDB, pages 491–500, 2001.

[14] M. R. Henzinger. Finding near-duplicateweb pages: a

In this paper we studied the problem of answering set-

large-scale evaluation of algorithms. In SIGIR, pages

similarityjoinqueriesinparallelusingtheMapReduceframe-

284–291, 2006.

work. Weproposedathree-stageapproachandexploredsev-

[15] T. C. Hoad and J. Zobel. Methods for identifying

eral solutions for each stage. We showed how to partition

versioned and plagiarized documents. JASIST,

the data across nodes in order to balance the workload and

54(3):203–215,2003.

minimize the need for replication. We discussed ways to

[16] Jaql. http://www.jaql.org.

efficiently deal with partitioning, replication, and multiple

inputs by exploiting the characteristics of the MapReduce [17] Jaql - Fuzzy join tutorial. http://code.google.com/

framework. We also described how to control the amount p/jaql/wiki/fuzzyJoinTutorial.

of data that needs to be kept in memory during join by ex- [18] M. Kitsuregawa and Y. Ogawa. Bucket spreading

ploitingthedataproperties. Westudiedbothself-joinsand parallel hash: A new, robust, parallel hash join

R-Sjoins,end-to-end,bystartingfromcompleterecordsand method for data skew in the super database computer

producing complete record pairs. Moreover, we discussed (sdc). In VLDB, pages 210–221, 1990.

strategies for dealing with extreme situations where, even [19] M. Kitsuregawa, H. Tanaka, and T. Moto-Oka.

after the data is partitioned to the finest granularity, the Application of hash to data base machine and its

amount of data that needs to be in the main memory of architecture. New Generation Comput., 1(1):63–74,

onenodeistoolargetofit. Givenourproposedalgorithms, 1983.

we implemented them in Hadoop and analyzed their per- [20] A. Metwally, D. Agrawal, and A. E. Abbadi.

formance characteristics on real datasets (synthetically in- Detectives: detecting coalition hit inflation attacks in

creased). advertising networks streams. In WWW, pages

Acknowledgments: We would like to thank Vuk Ercego- 241–250, 2007.

vac for a helpful discussion that inspired the stage variants [21] A. Pavlo, E. Paulson, A. Rasin, D. J. Abadi, D. J.

in Section 3.1.2 and 3.3.2. This study is supported by NSF DeWitt, S. Madden, and M. Stonebraker. A

IISawards0844574and0910989,aswellasagrantfromthe comparison of approachesto large-scale data analysis.

UC Discovery program and a donation from eBay. In SIGMOD Conference, pages 165–178, 2009.

[22] M. Sahami and T. D. Heilman. A web-based kernel

9. REFERENCES function for measuring the similarity of short text

snippets. In WWW, pages 377–386, 2006.

[1] Apache Hadoop. http://hadoop.apache.org.

[23] S. Sarawagi and A. Kirpal. Efficient set joins on

[2] Apache Hive. http://hadoop.apache.org/hive.

similarity predicates. In SIGMOD Conference, pages

[3] A. Arasu, V. Ganti, and R. Kaushik. Efficient exact

743–754, 2004.

set-similarity joins. In VLDB, pages 918–929, 2006.

[24] D. A. Schneider and D. J. DeWitt. A performance

[4] R. J. Bayardo, Y. Ma, and R. Srikant. Scaling up all

evaluation of four parallel join algorithms in a

pairssimilaritysearch.InWWW,pages131–140,2007.

shared-nothingmultiprocessor environment. In

[5] A. Z. Broder, S. C. Glassman, M. S. Manasse, and SIGMOD Conference, pages 110–121, 1989.

G. Zweig. Syntactic clustering of the web. Computer

[25] E. Spertus, M. Sahami, and O. Buyukkokten.

Networks, 29(8-13):1157–1166,1997.

Evaluating similarity measures: a large-scale study in

[6] S. Chaudhuri,V. Ganti, and R. Kaushik. A primitive the orkut social network. In KDD, pages 678–684,

operatorforsimilarityjoinsindatacleaning.InICDE, 2005.

page 5, 2006.

[26] R. Vernica, M. Carey, and C. Li. Efficient parallel

[7] J. Dean and S. Ghemawat. MapReduce: simplified set-similarity joins using MapReduce. Technical

data processing on large clusters. Commun. ACM, report, Department of Computer Science, UC Irvine,

51(1):107–113,2008. March 2010. http://asterix.ics.uci.edu.

[8] D. J. DeWitt and J. Gray. Parallel database systems: [27] Web 1t 5-gram version 1. http://www.ldc.upenn.

The future of high performance database systems. edu/Catalog/CatalogEntry.jsp?

Commun. ACM, 35(6):85–98,1992. catalogId=LDC2006T13.

[9] D. J. DeWitt, J. F. Naughton,and D. A. Schneider. [28] C. Xiao, W. Wang, and X. Lin. Ed-join: An efficient

An evaluation of non-equijoin algorithms. In VLDB, algorithm for similarity joins with edit distance

pages 443–452, 1991. constraints. In VLDB, 2008.

[10] A. Gates, O. Natkovich, S. Chopra, P. Kamath, [29] C. Xiao, W. Wang, X. Lin, and J. X. Yu. Efficient

S. Narayanam, C. Olston, B. Reed, S. Srinivasan, and similarityjoinsfornearduplicatedetection.InWWW,

U. Srivastava. Building a highlevel dataflow system on pages 131–140, 2008.

top of MapReduce: the Pig experience. PVLDB,

[30] H. Yang, A. Dasdan, R.-L. Hsiao, and D. S. P. Jr.

2(2):1414–1425,2009.

Map-Reduce-Merge: simplified relational data

[11] Genbank. http://www.ncbi.nlm.nih.gov/Genbank. processing on large clusters. In SIGMOD Conference,

[12] A. Gionis, P. Indyk, and R. Motwani. Similarity pages 1029–1040,2007.

search in high dimensions via hashing. In VLDB,

pages 518–529, 1999.

51作业君

Email:51zuoyejun

@gmail.com

添加客服微信: Fudaojun0228