Efficient Parallel Set-Similarity Joins Using MapReduce
Rares Vernica Michael J. Carey Chen Li
DepartmentofComputer DepartmentofComputer DepartmentofComputer
Science Science Science
UniversityofCalifornia,Irvine UniversityofCalifornia,Irvine UniversityofCalifornia,Irvine
[email protected] [email protected] [email protected]
ABSTRACT person. Asanotherexample,whenminingsocialnetworking
siteswhereusers’preferencesarestoredasbitvectors(where
In this paper we study how to efficiently perform set-simi-
a“1”bit means interest in a certain domain), applications
larityjoinsinparallelusingthepopularMapReduceframe-
want to use the fact that a user with preference bit vector
work. We propose a 3-stage approach for end-to-end set-
“[1,0,0,1,1,0,1,0,0,1]”possibly has similar interests to
similarityjoins. Wetakeasinputasetofrecordsandoutput
a user with preferences“[1,0,0,0,1,0,1,0,1,1]”.
a set of joined records based on a set-similarity condition.
Detectingsuchsimilarpairsischallengingtoday,asthere
We efficiently partition the data across nodes in order to
isanincreasingtrendofapplicationsbeingexpectedtodeal
balancetheworkloadandminimizetheneedforreplication.
with vast amounts of data that usually do not fit in the
Westudybothself-joinandR-Sjoincases,andshowhowto
main memory of one machine. For example, the Google
carefully control the amount of data kept in main memory
N-gram dataset [27] has 1 trillion records; the GeneBank
oneachnode. Wealsoproposesolutionsforthecasewhere,
dataset [11] contains 100 million records and has a size of
even if we use the most fine-grained partitioning, the data
416GB.Applicationswithsuchdatasetsusuallymakeuseof
still does not fit in the main memory of a node. We report
clustersofmachinesandemployparallelalgorithmsinorder
resultsfromextensiveexperimentsonrealdatasets,synthet-
to efficiently deal with this vast amount of data. For data-
icallyincreasedinsize,toevaluatethespeedupandscaleup
intensive applications,the MapReduce [7] paradigm has re-
properties of the proposed algorithms using Hadoop.
centlyreceivedalotofattentionforbeingascalableparallel
shared-nothingdata-processingplatform. Theframeworkis
CategoriesandSubjectDescriptors
able to scale to thousands of nodes [7]. In this paper, we
H.2.4[Database Management]: Systems—query process- useMapReduceastheparalleldata-processingparadigmfor
ing, parallel databases finding similar pairs of records.
Whendealingwithaverylargeamountofdata,detecting
GeneralTerms similarpairsofrecordsbecomesachallengingproblem,even
if a large computational cluster is available. Parallel data-
Algorithms, Performance processing paradigms rely on data partitioning and redis-
tribution for efficient query execution. Partitioning records
1. INTRODUCTION for finding similar pairs of records is challenging for string
There are many applications that require detecting simi- or set-based data as hash-based partitioning using the en-
lar pairs of records where the records contain string or set- tire string or set does not suffice. The contributions of this
based data. A list of possible applications includes: de- paper are as follows:
tecting near duplicate web-pages in web crawling [14], doc- • We describe efficient ways to partition a large dataset
ument clustering[5], plagiarism detection [15], master data acrossnodesinordertobalancetheworkloadandmini-
management, making recommendations to users based on mizetheneedforreplication. Comparedtotheequi-join
theirsimilaritytootherusersinqueryrefinement[22],min- case, the set-similarity joins case requires“partitioning”
inginsocialnetworkingsites[25],andidentifyingcoalitions the data based on set contents.
of click fraudsters in online advertising [20]. For example, • We describe efficient solutions that exploit the MapRe-
in master-data-management applications, a system has to
duce framework. We show how to efficiently deal with
identify that names“John W. Smith”,“Smith, John”, and
problems such as partitioning, replication, and multiple
“John William Smith”arepotentiallyreferringtothesame
inputs by manipulating the keys used to route the data
in the framework.
• We present methods for controlling the amount of data
keptinmemoryduringajoinbyexploitingtheproperties
Permissiontomakedigitalorhardcopiesofallorpartofthisworkfor
personalorclassroomuseisgrantedwithoutfeeprovidedthatcopiesare of the data that needs to be joined.
notmadeordistributedforprofitorcommercialadvantageandthatcopies • We provide algorithms for answering set-similarity self-
bearthisnoticeandthefullcitationonthefirstpage.Tocopyotherwise,to
joinqueriesend-to-end,wherewestartfromrecordscon-
republish,topostonserversortoredistributetolists,requirespriorspecific
taining more than just the join attribute and end with
permissionand/orafee.
SIGMOD’10,June6–11,2010,Indianapolis,Indiana,USA. actual pairs of joined records.
Copyright2010ACM978-1-4503-0032-2/10/06...$10.00.
• We show how our set-similarity self-join algorithms can
be extended to answer set-similarity R-S join queries.
• We present strategies for exceptional situations where,
evenifweusethefinest-granularitypartitioningmethod,
the data that needs to be held in the main memory of
one node is too large to fit.
The rest of the paper is structured as follows. In Sec-
tion 2 we introduce the problem and present the main idea
of our algorithms. In Section 3 we present set-similarity
join algorithms for the self-join case, while in Section 4 we
show how the algorithms can be extended to the R-S join Figure 1: Data flow in a MapReduce computation.
case. Next, in Section 5, we present strategies for handling
the insufficient-memory case. A performance evaluation is
presented in Section 6. Finally, we discuss related work in of the input data. The (key, value) pairs output by each
Section 7 and conclude in Section 8. A longer technical re- mapfunctionarehash-partitionedonthekey. Foreachpar-
port on this work is available in [26]. titionthepairsaresortedbytheirkeyandthensentacross
the cluster in a shuffle phase. At each receiving node, all
thereceivedpartitionsaremergedinasortedorderbytheir
2. PRELIMINARIES
key. All the pair values that share a certain key are passed
In this work we focus on the following set-similarity join to a single reduce call. The output of each reduce function
application: identifyingsimilarrecordsbasedonstringsim- is written to a distributed file in the DFS.
ilarity. Ourresultscanbegeneralizedtootherset-similarity Besidesthemapandreducefunctions,theframeworkalso
join applications. allows the user to provide a combine function that is ex-
Problem statement: Giventwofilesofrecords,R andS, ecuted on the same nodes as mappers right after the map
a set-similarity function, sim, and a similarity threshold τ, functionshavefinished. Thisfunctionactsasalocalreducer,
we define the set-similarity join of R and S on R.a and S.a operatingonthelocal(key, value)pairs. Thisfunctional-
as finding and combining all pairs of records from R and S lows the user to decrease the amount of data sent through
where sim(R.a,S.a)≥τ. the network. The signature of the combine function is:
Wemapstringsintosetsbytokenizingthem. Examplesof
tokensarewordsorq-grams(overlappingsub-stringsoffixed combine (k2,list(v2)) → list(k2,list(v2)).
length). For example, the string“I will call back”can
Finally,theframeworkalsoallowstheusertoprovideinitial-
be tokenized into the word set [I, will, call, back]. In
izationandtear-downfunctionforeachMapReducefunction
ordertomeasurethesimilaritybetweenstrings,weuseaset-
andcustomizehashingandcomparisonfunctionstobeused
similarity function such as Jaccard or Tanimoto coefficient,
cosinecoefficient,etc.1. Forexample,theJaccardsimilarity when partitioning and sorting the keys. From Figure 1 one
can notice the similarity between the MapReduceapproach
function for two sets x and y is defined as: jaccard(x,y)=
and query-processingtechniques for parallel DBMS [8, 21].
|x∩y|. Thus,theJaccardsimilaritybetweenstrings“I will
|x∪y|
call back”and“I will call you soon”is 3 =0.5. 2.2 ParallelSet-SimilarityJoins
6
In the remainder of the section, we provide an introduc-
Oneofthemainissueswhenansweringset-similarityjoins
tion to the MapReduce paradigm, present the main idea of
usingtheMapReduceparadigm,istodecidehowdatashould
our parallel set-similarity join algorithms, and provide an
be partitioned and replicated. The main idea of our al-
overviewoffilteringmethodsfordetectingset-similarpairs.
gorithms is the following. The framework hash-partitions
2.1 MapReduce the dataacrossthenetworkbasedonkeys; dataitems with
the same key are grouped together. In our case, the join-
MapReduce [7] is a popular paradigm for data-intensive attribute value cannot be directly used as a partitioning
parallel computation in shared-nothing clusters. Example key. Instead,weuse(possiblymultiple)signaturesgenerated
applications for the MapReduce paradigm include process- from the value as partitioning keys. Signatures are defined
ingcrawleddocuments,Web requestlogs,etc. Inthe open- suchthatsimilarattributevalueshaveatleastonesignature
sourcecommunity,Hadoop[1]isapoplarimplementationof incommon. Possibleexamplesignaturesinclude: thelistof
this paradigm. In MapReduce, data is initially partitioned wordtokensofastringandrangesofsimilarstringlengths.
acrossthe nodesof a clusterandstoredin a distributedfile For instance, the string“I will call back”would have 4
system (DFS). Data is represented as (key, value) pairs. word-based signatures:“I”,“will”,“call”, and“back”.
The computationis expressed using two functions: We divide the processing into three stages:
map (k1,v1) → list(k2,v2); • Stage 1: Computes data statistics in order to generate
reduce (k2,list(v2)) → list(k3,v3). good signatures. The techniques in later stages utilize
these statistics.
Figure 1 shows the data flow in a MapReduce computa- • Stage 2: ExtractstherecordIDs(“RID”)andthe join-
tion. Thecomputationstartswithamapphaseinwhichthe
attributevaluefromeachrecordanddistributestheRID
map functions are applied in parallel on different partitions
andthejoin-attributevaluepairssothatthepairsshar-
1The techniques described in this paper can also be used ing a signature go to at least one common reducer. The
for approximate string search using the edit or Levenshtein reducerscomputethesimilarityofthejoin-attributeval-
distance [13]. ues and output RID pairs of similar records.
• Stage 3: Generates actual pairs of joined records. It unbalanced workload due to token-frequency skew. Each
uses the list of RID pairs from the secondstage and the grouprepresentsasetofcandidatesthatarecrosspairedand
original data to build the pairs of similar records. verified. The third stage uses the list of similar-RID pairs
An alternative to using the second and third stages is to and the original data to generate pairs of similar records.
use one stage in which we let key-value pairs carry com-
3.1 Stage1: TokenOrdering
plete records, instead of projecting records on their RIDs
and join-attribute values. We implemented this alternative We consider two methods for ordering the tokens in the
and noticed a much worse performance, so we do not con- firststage. Bothapproachestakeasinputtheoriginalrecords
sider this option in this paper. andproducealistoftokensthatappearinthejoin-attribute
value ordered increasingly by frequency.
2.3 Set-SimilarityFiltering
3.1.1 BasicTokenOrdering(BTO)
Efficient set-similarity join algorithms rely on effective
filters, which can decrease the number of candidate pairs Ourfirstapproach,calledBasicTokenOrdering(“BTO”),
whosesimilarityneedstobeverified. Inthepastfewyears, relies on two MapReduce phases. The first phase computes
there have been several studies involving a technique called the frequency of each token and the second phase sorts the
prefix filtering [6, 4, 29], which is based on the pigeonhole tokens based on their frequencies. In the first phase, the
principle and works as follows. The tokens of strings are map function gets as input the original records. For each
ordered based on a global token ordering. For each string, record, the function extracts the value of the join attribute
we define its prefix of length n as the first n tokens of the and tokenizes it. Each token produces a (token, 1) pair.
ordered set of tokens. The required length of the prefix de- Tominimizethenetworktrafficbetweenthemapandreduce
pends on the size of the token set, the similarity function, functions, we use a combine function to aggregates the 1’s
andthe similaritythreshold. For example, given the string, output by the map function into partial counts. Figure 2(a)
s,“I will call back”andtheglobaltokenordering{back, showsthedataflowforanexampledataset,self-joinedonan
call, will, I},theprefixoflength2ofsis[back, call]. attributecalled“a”. Inthefigure,fortherecordwithRID1,
Theprefixfilteringprinciplestatesthatsimilarstringsneed thejoin-attributevalueis“A B C”,whichistokenizedas“A”,
toshareat leastone commontoken in their prefixes. Using “B”, and“C”. Subsequently, the reduce function computes
thisprinciple,recordsofonerelationareorganizedbasedon thetotalcountforeachtokenandoutputs(token, count)
the tokens in their prefixes. Then, using the prefix tokens pairs, where“count”is the total frequency for the token.
of the records in the second relation, we can probe the first ThesecondphaseusesMapReducetosortthepairsofto-
relation and generate candidate pairs. The prefix filtering kensandfrequenciesfromthefirstphase. Themapfunction
principle gives a necessary condition for similar records, so swapstheinputkeysandvaluessothattheinputpairsofthe
the generated candidate pairs need to be verified. A good reducefunctionare sortedbasedon their frequencies. This
performancecanbeachievedwhentheglobaltokenordering phaseusesexactlyonereducersothattheresultisatotally
correspondstotheirincreasingtoken-frequencyorder,since ordered list of tokens. The pseudo-code of this algorithm
fewer candidate pairs will be generated. and other algorithms presented is available in [26].
Astate-of-the-artalgorithmintheset-similarityjoinliter-
3.1.2 UsingOnePhasetoOrderTokens(OPTO)
atureisthePPJoin+techniquepresentedin[29]. Itusesthe
prefixfilteralongwithalengthfilter(similarstringsneedto An alternative approach to token ordering is to use one
have similar lengths [3]). It also proposed two other filters: MapReducephase. Thisapproach,calledOne-PhaseToken
apositionalfilterandasuffixfilter. ThePPJoin+technique Ordering(“OPTO”),exploitsthefactthatthelistoftokens
provides a good solution for answering such queries on one couldbemuchsmallerthantheoriginaldatasize. Insteadof
node. One of our approaches is to use PPJoin+ in parallel using MapReduce to sort the tokens, we can explicitly sort
on multiple nodes. the tokens in memory. We use the same map and combine
functionsasinthefirstphaseoftheBTOalgorithm. Similar
toBTOweuseonlyonereducer. Figure2(b)showsthedata
3. SELF-JOINCASE
flow of this approach for our example dataset. The reduce
Inthissectionwepresenttechniquesfortheset-similarity function in OPTO gets as input a list of tokens and their
self-join case. As outlined in the previous section, the solu- partial counts. For each token, the function computes its
tion is divided into three stages. The first stage builds the total count and stores the information locally. When there
globaltokenordering necessarytoapplytheprefix-filter.2 It isnomoreinputforthereducefunction,thereducercallsa
scans the data, computes the frequency of each token, and tear-downfunctiontosortthetokensbasedontheircounts,
sorts the tokens based on frequency. The second stage uses and to output the tokens in an increasing order of their
theprefix-filteringprincipletoproducealistofsimilar-RID counts.
pairs. The algorithm extracts the RID and join-attribute
value of each record, and replicates and re-partitions the 3.2 Stage2: RID-PairGeneration
recordsbasedontheirprefixtokens. TheMapReduceframe-
The second stage of the join, called“Kernel”, scans the
work groups the RID and join-attribute value pairs based
original input data and extracts the prefix of each record
ontheprefixtokens. Itisworthnotingthatusingtheinfre-
usingthetokenordercomputedbythefirststage. Ingeneral
quent prefix tokens to redistribute the data helps us avoid
the list of unique tokens is much smaller and grows much
2An alternative would be to apply the length filter. We slowerthanthelistofrecords. Wethusassumethatthelist
exploredthisalternativebuttheperformancewasnotgood of tokens fits in memory. Based on the prefix tokens, we
because it suffered from the skewed distribution of string extracttheRIDandthejoin-attributevalueofeachrecord,
lengths. and distribute these record projections to reducers. The
(a) Basic Token Ordering (BTO) (b) One-Phase Token Ordering
(OPTO)
Figure 2: Example data flow of Stage 1. (Token ordering for a self-join on attribute“a”.)
join-attributevaluesthatshareatleastoneprefixtokenare retrieves the original records one by one, and extracts the
verified at a reducer. RID and the join-attribute value for each record. It tok-
Routing Records to Reducers. We first take a look enizes the join attribute and reorders the tokens based on
at two possible ways to generate (key, value) pairs in the their frequencies. Next, the function computes the prefix
map function. (1) Using Individual Tokens: This method length and extracts the prefix tokens. Finally, the function
treats each token as a key. Thus, for each record, we would useseithertheindividualtokensorthegroupedtokensrout-
generate a (key, value) pair for each of its prefix tokens. ingstrategytogeneratetheoutputpairs. Figure3(a)shows
Thus,arecordprojectionisreplicatedasmanytimesasthe the data flow for our example dataset using individual to-
numberofitsprefixtokens. Forexample,iftherecordvalue kens to do the routing. The prefix tokens of each value are
is“A B C D”and the prefix tokens are“A”,“B”, and“C”, we in bold face. The record with RID 1 has prefix tokens“A”
would output three (key, value) pairs, corresponding to and“B”, so its projection is output twice.
the three tokens. In the reducer, as the values get grouped In the reduce function, for each pair of record projec-
byprefixtokens,allthevaluespassedinareducecallshare tions, the reducer applies the additional filters (e.g., length
the same prefix token. filter, positionalfilter, andsuffixfilter)andverifiesthe pair
(2) Using Grouped Tokens: This method maps multiple if it survives. If a pair passes the similarity threshold, the
tokens to one synthetic key, thus can map different tokens reducer outputs RID pairs and their similarity values.
tothesamekey. Foreachrecord,themapfunctiongenerates
one (key, value) pair for each of the groups of the prefix 3.2.2 IndexedKernel(PK)
tokens. In our running example of a record“A B C D”, if AnotherapproachonfindingRIDpairsofsimilarrecords
tokens“A”and“B”belong to one group (denoted by“X”), is to use existing set-similarity join algorithms from the
and token“C”belongs to another group (denoted by“Y”), literature [23, 3, 4, 29]. Here we use the PPJoin+ algo-
weoutputtwo(key, value)pairs,oneforkey“X”andone rithmfrom[29]. WecallthisapproachthePPJoin+Kernel
for key“Y”. Two records that share the same token group (“PK”).
do not necessarily share any prefix token. Continuing our Using this method, the map function is the same as in
running example, for record“E F G”, if its prefix token“E” the BK algorithm. Figure 3(b) shows the data flow for our
belongs to group“Y”, then the records“A B C D”and“E F exampledatasetusinggroupedtokenstodotherouting. In
G”share token group“Y”but do not share any prefix token. the figure, the record with RID 1 has prefix tokens“A”and
So, in the reducer, as the values get groupedby their token “B”,whichbelongtogroups“X”and“Y”,respectively. Inthe
group,notwovaluesshareaprefixtoken. Thismethodcan reducefunction,weusethePPJoin+algorithmtoindexthe
help us have fewer replications of record projections. One data,applyallthefilters,andoutputtheresultingpairs. For
way to define the token groups in order to balance data each input record projection, the function first probes the
across reducers is the following. We use the token ordering index using the join-attribute value. The probe generates a
producedin the first stage, and assign the tokens to groups listofRIDsofrecordsthataresimilartothecurrentrecord.
inaRound-Robinorder. Inthiswaywebalancethesumof The current record is then added to the index as well.
token frequencies across groups. We study the effect of the The PPJoin+ algorithm achieves an optimized memory
number of groups in Section 6. For both routing strategies, footprintbecausetheinputstringsaresortedincreasinglyby
since two records might share more that one prefix token, their lengths [29]. This works in the following way. The in-
the same pair may be verified multiple times at different dexknowsthelowerboundonthelengthoftheunseendata
reducers, thus it could be output multiple times. This is elements. Using this bound and the length filter, PPJoin+
dealt with in the third stage. discards from the index the data elements below the min-
imum length given by the filter. In order to obtain this
3.2.1 BasicKernel(BK)
ordering of data elements, we use a composite MapReduce
In our first approach to finding the RID pairs of simi- keythatalsoincludesthelengthofthejoin-attributevalue.
lar records,called BasicKernel (“BK”), eachreducer uses a Weprovidetheframeworkwithacustompartitioningfunc-
nested loop approach to compute the similarity of the join- tionsothatthepartitioningisdoneonlyonthegroupvalue.
attributevalues. Beforethemapfunctionsbegintheirexecu- In this way, when data is transferred from map to reduce,
tions,aninitializationfunctioniscalledtoloadtheordered it gets partitioned just by group value, and is then locally
tokens produced by the first stage. The map function then sorted on both group and length.
(a)BasicKernel(BK)usingindividualtokensforrout- (b) PPJoin+ Kernel (PK) using grouped tokens for
ing routing
Figure 3: Example data flow of Stage 2. (Kernel for a self-join on attribute“a”.)
3.3 Stage3: RecordJoin the two records, appends their similarity, and outputs the
In the final stage of our algorithm, we use the RID pairs constructed pair. In Figure 4, the output of the second set
generatedinthesecondstagetojointheirrecords. Wepro- of mappers contains two (key, value) pairs with the RID
posetwoapproachesforthisstage. Themainideaistofirst pair(1,21)asthekey,onecontainingrecord1andtheother
fill in the record information for each half of the pair and containing record 21. They are grouped in a reducer that
then use the two halves to build the full record pair. The outputs the pair of records (1,21).
twoapproachesdifferinthewaythelistofRIDpairsispro-
3.3.2 One-PhaseRecordJoin(OPRJ)
vided as input. In the first approach, called Basic Record
Join (“BRJ”), the list of RID pairs is treated as a normal ThesecondapproachtorecordjoinusesonlyoneMapRe-
MapReduceinput,andisprovidedasinputtothemapfunc- duce phase. Instead of sending the RID pairs through the
tions. Inthesecondapproach,calledOne-PhaseRecordJoin MapReduce pipeline to group them with the records in the
(“OPRJ”), the list is broadcast to all the maps and loaded reducephase(aswedointheBRJapproach),webroadcast
beforereadingtheinputdata. DuplicateRIDpairsfromthe andloadtheRIDpairsateachmapfunctionbeforetheinput
previous stage are eliminated in this stage. data is consumed by the function. The map function then
getstheoriginalrecordsasinput. Foreachrecord,thefunc-
tion outputsas many (key, value)pairs as the number of
3.3.1 BasicRecordJoin(BRJ)
RID pairs containing the RID of the current record. The
The Basic Record Join algorithm uses two MapReduce output key is the RID pair. Essentially, the output of the
phases. In the first phase, the algorithm fills in the record map function is the same as the output of the reduce func-
information for each half of each pair. In the second phase,
tion in the first phase of the BRJ algorithm. The idea of
it brings together the half-filled pairs. The map function in
joiningthedatainthemapperswasalsousedin[10]forthe
thefirstphasegets asinputboththe set of originalrecords case of equi-joins. The reduce function is the same as the
andtheRIDpairsfromthesecondstage. (Thefunctioncan reduce function in the second phase of the BRJ algorithm.
differentiate between the two types of inputs by looking at
Figure 5 shows the data flow for our example dataset. In
the input file name.) For each original record, the function
the figure, the first mapper gets as input the record with
outputsa(RID, record)pair. ForeachRIDpair,itoutputs RID1andoutputsone(key, value)pair,wherethekeyis
two(key, value)pairs. ThefirstpairusesthefirstRIDas
theRIDpair(1,21)andthevalueisrecord1. Ontheother
itskey,whilethesecondpairusesthesecondRIDasitskey.
hand, the third mapper outputs a pair with the same key,
Both pairs have the entire RID pair and their similarity as
and the value is the record 21. The two pairs get grouped
their value. Figure 4 shows the data flow for our example
in the second reducer, where the pair of records (1,21) is
dataset. Inthefigure,thefirsttwomapperstakerecordsas
output.
their input, while the third mapper takes RID pairs as its
input. (Mappersdonotspanacrossfiles.) FortheRIDpair
4. R-SJOINCASE
(2,11), the mapper outputs two pairs, one with key 2 and
one with key 11. In Section 3 we described how to compute set-similarity
Thereducefunctionofthefirstphasethenreceivesalist self-joins using the MapReduce framework. In this section
ofvaluescontainingexactlyonerecordandotherRIDpairs. wepresentoursolutionsfortheset-similarityR-Sjoinscase.
For each RID pair, the function outputs a (key, value) We highlightthe differences between the two cases and dis-
pair, where the key is the RID pair, and the value is the cussanoptimizationforcarefullycontrollingmemoryusage
record itself and the similarity of the RID pair. Continuing in the second stage.
ourexampleinFigure4,forkey2,thefirstreducergetsthe Themaindifferencesbetweenthetwojoincasesareinthe
record with RID 2 and one RID pair (2,11), and outputs secondandthethirdstageswherewehaverecordsfromtwo
one(key, value)pairwiththeRIDpair(2,11)asthekey. datasetsastheinput. Dealingwiththebinaryjoinoperator
Thesecondphaseusesanidentitymapthatdirectlyout- ischallenginginMapReduce,astheframeworkwasdesigned
putsitsinput. Thereducefunctionthereforegetsasinput, toonlyacceptasingleinputstream. Asdiscussedin[10,21],
for each key (which is a RID pair), a list of values contain- inordertodifferentiatebetweentwodifferentinputstreams
ingexactlytwoelements. Eachelementconsistsofarecord inMapReduce,weextendthekeyofthe(key, value)pairs
andacommonsimilarityvalue. Thereducerformsapairof so that it includes a relation tag for each record. We also
Figure 4: Example data flow of Stage 3 using Basic Record Join (BRJ) for a self-join case. “a1” and “a2”
correspond to the original attribute“a”, while“b1”and“b2”correspond to attribute“b”.
Figure 5: Example data flow of Stage 3 using One-Phase Record Join (OPRJ) for a self-join case. “a1”and
“a2”correspond to the original attribute“a”while,“b1”and“b2”correspond to attribute“b”.
modifythepartitioningfunctionsothatpartitioningisdone
on the part of the key that does not include the relation
name. (However, the sorting is still done on the full key.)
We now explain the three stages of an R-S join.
Stage 1: Token Ordering. In the first stage, we use
the same algorithms as in the self-join case, only on the
relationwithfewerrecords,sayR. Inthesecondstage,when
tokenizing the other relation, S, we discard the tokens that
do not appear in the token list, since they cannot generate
candidate pairs with R records.
Stage 2: Basic Kernel. First, the mappers tag the
record projections with their relation name. Thus, the re- Figure 6: Example of the order in which records
ducers receive a list of record projections grouped by re- need to arrive at the reducer in the PK kernel of
lation. In the reduce function, we then store the records the R-S join case, assuming that for each length, l,
from the first relation (as they arrive first), and stream the the lower-boundis l−1 and the upper-boundis l+1.
records from the second relation (as they arrive later). For
each record in the second relation, we verify it against all
the records in the first relation.
creasingbytheirlengths. PPJoin+onlyconsideredthisim-
Stage2: IndexedKernel. Weusethesamemappersas
provementforself-joins. ForR-Sjoins,thechallengeisthat
fortheBasicKernel. Thereducersindextherecordprojec-
weneedtomakesurethatwefirststreamalltherecordpro-
tionsofthefirstrelationandprobetheindexfortherecord
jections from R that might join with a particular S record
projections of the second relation.
before we stream this record. Specifically, given the length
Asintheself-joincase,wecanimprovethememoryfoot-
of a set, we can define a lower-bound and an upper-bound
print of the reduce function by having the data sorted in-
on the lengths of the sets that might join with it [3]. Be-
fore we stream a particular record projection from S, we
need to have seen all the record projections from R with a
lengthsmallerthanorequaltotheupper-bound lengthofthe
recordfromS. We forcethisarrivalorderby extendingthe
keys with a length class assigned in the following way. For
records from S, the length class is their actual length. For
records from R, the length class is the lower-bound length
correspondingtotheirlength. Figure6showsanexampleof
theorderinwhichrecordswillarriveatthereducer,assum-
ing that for each length l, the lower-bound is l−1 and the
upper-boundisl+1. Inthefigure,therecordsfromRwith
length 5 get length class 4 and are streamed to the reducer
before those records from S with lengths between [4,6].
Stage3: RecordJoin. FortheBRJalgorithmthemap-
pers first tag their outputs with the relation name. Then,
thereducersgetarecordandtheircorrespondingRIDpairs (a) Map-based block (b) Reduce-based block
groupedbyrelationandoutputhalf-filledpairstaggedwith processing processing
the relation name. Finally, the second-phase reducers use
the relation name to build record pairs having the record
Figure 7: Data flow in the reducer for two block
form R first and the record form S second. In the OPRJ
processing approaches.
algorithm, for each input record from R, the mappers out-
putasmany(key,value)pairsasthenumberofRIDpairs
containingtherecord’sRIDintheRcolumn(andsimilarfor
After that, the next two blocks, B and C, are read from
S records). For each pair, the key is the RID pair plus the
theinputstreamandjoinedwithA. Finally,Aisdiscarded
relationname. Thereducersproceedasdothesecond-phase
frommemoryandtheprocesscontinuesforblocksB andC.
reducers for the BRJ algorithm.
In order to achieve the interleaving and replications of the
blocks, the map functiondoes the following. For each (key,
5. HANDLINGINSUFFICIENTMEMORY
value)outputpair,thefunctiondeterminesthepair’sblock,
AswesawinSection3.2,reducersinthesecondstagere- andoutputsthepairasmanytimesastheblockneedstobe
ceive as input a list of record projections to be verified. In replicated. Every copy is output with a different composite
the BK approach, the entire list of projections needs to fit key, which includes its position in the stream, so that after
in memory. (For the R-S join case, only the projections of sortingthepairs,theyareintherightblocksandtheblocks
one relation must fit.) In the PK approach, because we are are in the right order.
exploitingthelengthfilter,onlythefragmentcorresponding Reduce-Based Block Processing. In this approach,
toacertainlengthrangeneedstofitinmemory. (FortheR- the map function sends each block exactly once. On the
Sjoincase,onlythefragmentbelongingtoonlyonerelation otherhand,thereducefunctionneedstostorealltheblocks
must fit.) It is worth noting that we already decreased the except the first one on its local disk, and reload the blocks
amountofmemoryneededbygroupingtherecordsonthein- laterfromthediskforjoining. Figure7(b)showsanexample
frequent prefix tokens. Moreover, we can exploit the length ofhowblocksareprocessedinthereducerforthesamethree
filter even in the BK algorithm, by using the length filter blocksA,B,andC. Inthefirststep,blockAisloadedinto
asasecondaryrecord-routingcriterion. Inthisway,records memory and self-joined. After that, the next two blocks, B
areroutedontoken-length-basedkeys. Theadditionalrout- and C, are read from the input stream and joined with A
ingcriterionpartitionsthedataevenfurther,decreasingthe and also stored on the local disk. In the second step, A is
amountofdatathatneedstofitinmemory. Thistechnique discarded from memory and block B is read from disk and
can be generalized and additional filters can be appended self-joined. Then, block C is read from the disk and joined
to the routing criteria. In this section we present two ex- with B. The process ends with reading C from disk and
tensions of our algorithms for the case where there are just self-joining it.
no more filters to be used but the data still does not fit in Handling R-S Joins. In the R-S join case the reduce
memory. The challenge is how to compute the cross prod- functionneedstodealwithapartitionfromRthatdoesnot
uct of a list of elements in MapReduce. We sub-partition fit in memory, while it streams a partition coming from S.
thedatasothateachblockfitsinmemoryandproposetwo We only need to sub-partitionthe R partition. The reduce
approachesforprocessingtheblocks. Firstwelookhowthe function loads one block from R into memory and streams
twomethodsworkintheself-joincaseandthendiscussthe the entire S partition against it. In the map-based block
differences for the R-S join case. processingapproach,theblocksfromRareinterleavedwith
Map-Based Block Processing. In this approach, the multiplecopiesoftheSpartition. Inthereduce-basedblock
map function replicates the blocks and interleaves them in processing approach all the R blocks (except the first one)
the order they will be processed by the reducer. For each andtheentireS partitionarestoredandreadfromthelocal
block sent by the map function, the reducer either loads the disk later.
blockinmemoryorstreamstheblockagainstablockalready
loaded in memory. Figure 7(a) shows an example of how
6. EXPERIMENTALEVALUATION
blocksareprocessedinthereducer,inwhichthedataissub-
partitionedintothreeblocksA,B,andC. Inthefirststep, In this section we describe the performance evaluation of
the first block, A, is loaded into memory and self-joined. the proposed algorithms. To understand the performance
of parallel algorithms we need to measure absolute running fertotheincreaseddatasetsas“DBLP×n”or“CITESEERX
time as well as relative speedup and scaleup [8]. ×n”, where n ∈ [5,25] and represents the increase factor.
We ran experiments on a 10-node IBM x3650 cluster. For example,“DBLP×5”represents the DBLP dataset in-
Each node had one Intel Xeon processor E5520 2.26GHz creased five times.
withfourcores,12GBofRAM,andfour300GBharddisks. Before starting each experiment we balanced its input
Thus the cluster consists of 40 cores and 40 disks. We used datasets across the ten nodes in HDFS and the four hard
an extra node for running the master daemons to manage drives of each node in the following way. We formatted the
the Hadoop jobs and the Hadoop distributed file system. distributed file system before each experiment. We created
On each node, we installed the Ubuntu 9.04, 64-bit, server an identity MapReduce job with as many reducers running
editionoperatingsystem,Java1.6witha64-bitserverJVM, in parallel as the number of hard disks in the cluster. We
andHadoop0.20.1. Inordertomaximizetheparallelismand exploited the fact that reducers write their output data to
minimize the running time, we made the following changes the local node and also the fact that Hadoop chooses the
to the default Hadoop configuration: we set the block size disk to write the data using a Round-Robinorder.
of the distributed file system to 128MB, allocated 1GB of For all the experiments, we tokenized the data by word.
virtualmemorytoeachdaemonand2.5GBofvirtualmem- We usedtheconcatenationof thepapertitle andthe listof
orytoeachmap/reducetask,ranfourmapandfourreduce authorsasthejoinattribute,andusedtheJaccardsimilarity
tasks in parallel on each node, set the replication factor to functionwithasimilaritythresholdof0.80. The0.80thresh-
1, and disabled the speculative task execution feature. old is usually the lower bound on the similarity threshold
We used the following two datasets and increased their usedintheliterature[3,6,29],andhighersimilaritythresh-
sizes as needed: olds decreased the running time. A more complete set of
• DBLP3 It had approximately 1.2M publications. We figures for the experiments is contained in [26]. The source
preprocessedtheoriginalXMLfilebyremovingthetags, code is available at http://asterix.ics.uci.edu/.
and output one line per publication that contained a
6.1 Self-Join Performance
unique integer (RID), a title, a list of authors, and the
restofthecontent(publicationdate,publicationjournal We did a self-join on the DBLP×n datasets, where n ∈
or conference, and publication medium). The average [5,25]. Figure 8 shows the total running time of the three
length of a record was 259 bytes. A copy of the entire stagesonthe10-nodeclusterfordifferentdatasetsizes(rep-
dataset has around 300MB before we increased its size. resented by the factor n). The running time consisted of
Itisworthnotingthatwedidnotcleantherecordsbefore the times of the three stages: token ordering, kernel, and
running our algorithms, i.e., we did not remove punctu- record join. For each dataset size, we used three combina-
ations or change the letter cases. We did the cleaning tionsoftheapproachesofthestages. Forexample,“1-BTO”
inside our algorithms. means we use BTO in the first stage. The second stage is
• CITESEERX4 It had about 1.3M publications. We the most expensive step, and its time increased the fastest
with the increase of the dataset size. The best algorithm is
preprocessed the original XML file in the same way as
BTO-PK-OPRJ, i.e., with BTO for the first stage, PK for
for the DBLP dataset. Each publication included an
the second stage, and OPRJ for the third stage. This com-
abstractandURLstoitsreferences. Theaveragelength
bination could self-join 25 times the original DBLP dataset
of a record was 1374 bytes, and the size of one copy of
in around 650 seconds.
the entire dataset is around 1.8GB.
Increasing Dataset Sizes. Toevaluateourparallelset- 6.1.1 Self-JoinSpeedup
similarity join algorithms on large datasets, we increased
In order to evaluate the speedup of the approaches, we
each dataset while maintaining its set-similarity join prop-
fixed the dataset size and varied the cluster size. Figure 9
erties. We maintained a roughly constant token dictionary,
showstherunningtimeforself-joiningtheDBLP×10dataset
andwantedthecardinalityofjoinresultstoincreaselinearly
on clusters of 2 to 10 nodes. We used the same three com-
withtheincreaseofthedataset. Increasingthedatasizeby
binations for the three stages. For each approach, we also
duplicatingitsoriginalrecordswouldonlypreservethetoken
show its ideal speedup curve (with a thin black line). For
dictionary-size,butwouldblowupthesizeofthejoinresult.
instance,iftheclusterhastwiceasmanynodesandthedata
Toachievethegoal,weincreasedthesizeofeachdatasetby
size does not change, the approach should be twice as fast.
generating new records as follows. We first computed the
In Figure 10 we show the same numbers, but plotted on a
frequencies of the tokens appearing in the title and the list
“relative scale”. That is, for each cluster size, we plot the
of authors in the original dataset, and sorted the tokens in
ratio between the running time for the smallest cluster size
their increasingorder of frequencies. For each record in the
and the running time of the current cluster size. For exam-
original dataset, we created a new record by replacing each
ple, for the 10-node cluster, we plot the ratio between the
token in the title or the list of authors with the token after
runningtimeonthe2-nodeclusterandtherunningtimeon
itinthetokenorder. Forexample,ifthetokenorderis(A,
the10-nodecluster. Wecanseethatallthreecombinations
B, C, D, E, F) and the original record is“B A C E”, then
havesimilarspeedupcurves,butnoneofthemspeeduplin-
thenewrecordis“C B D F.”Weevaluatedthecardinalityof
early. In all the settings the BTO-PK-OPRJ combination
the join result after increasing the dataset in this manner,
is the fastest (Figure 9). In the following experiments we
andnoticeditindeedincreasedlinearlywiththeincreaseof
looked at the speedup characteristics for each of the three
the dataset size.
stages in order to understand the overall speedup. Table 1
Weincreasedthesizeofeachdataset5to25times. Were-
shows the running time for each stage in the self-join.
3http://dblp.uni-trier.de/xml/dblp.xml.gz Stage 1: Token Ordering. WecanseethattheOPTO
4http://citeseerx.ist.psu.edu/about/metadata approach was the fastest for the settings of 2 nodes and 4
800
600
400
200
0
5 10 25
Dataset Size (times the original)
)sdnoces(
emiT
1-BTO 1200
2-BK
3-BRJ 1000
2-PK
3-OPRJ 800
600
400
200
0
2 3 4 5 6 7 8 9 10
# Nodes
Figure 8: Running time for self-
joiningDBLP×ndatasets(where
n∈[5,25]) on a 10-node cluster.
)sdnoces(
emiT
5
BTO-BK-BRJ
BTO-PK-BRJ 4
BTO-PK-OPRJ
Ideal
3
2
1
2 3 4 5 6 7 8 9 10
# Nodes
Figure 9: Running time for self-
joining the DBLP×10 dataset on
different cluster sizes.
emiT
weN
/
emiT
dlO
=
pudeepS
BTO-BK-BRJ
BTO-PK-BRJ
BTO-PK-OPRJ
Ideal
Figure10: Relativerunningtime
for self-joining the DBLP×10
datasetondifferentclustersizes.
Stage Alg. # Nodes pairs that join, which affected the workload balance. For
2 4 8 10 analysis purposes, we computed the frequency of each RID
1 BTO 191.98 125.51 91.85 84.02 appearinginatleastoneRIDpair. OntheaverageanRID
OPTO 175.39 115.36 94.82 92.80 appeared on 3.74 RID pairs, with a standard deviation of
2 BK 753.39 371.08 198.70 164.57 14.85andamaximumof187. Additionally,wecountedhow
PK 682.51 330.47 178.88 145.01 many recordswere processedby each reduceinstance. The
3 BRJ 255.35 162.53 107.28 101.54 minimum number of recordsprocessedin the 10-nodescase
OPRJ 97.11 74.32 58.35 58.11 was81,662andthemaximumwas90,560,withanaverageof
87,166.55 and a standard deviation of 2,519.30. No matter
Table 1: Running time (seconds) of each stage for howmanynodesweaddedtothecluster,asingleRIDcould
self-joining the DBLP×10 dataset on different clus- notbeprocessedbymorethanonereduceinstance,andall
ter sizes the reducers had to wait for the slowest one to finish.
The speedup of the OPRJ approach was limited because
in the OPRJ approach, the list of RID pairs that joined
was broadcast to all the maps where they must be loaded
nodes. For the settings of 8 nodes and 10 nodes, the BTO
in memory and indexed. The elapsed time required for this
approachbecamethefastest. Theirlimitedspeedupwasdue
remained constant as the number of nodes increased. Ad-
totwomainreasons. (1)Asthenumberofnodesincreased,
ditional information about the total amount of data sent
theamountofinputdatafedtoeachcombinerdecreased. As
between map and reduce for each stage is included in [26].
thenumberofnodesincreased,moredatawassentthrough
the network and more data got merged and reduced. (A 6.1.2 Self-JoinScaleup
similar pattern was observed in [10] for the case of general
aggregations.) (2)Thefinaltokenorderingwasproducedby
only one reducer, and this step’s cost remained constant as 1200
the number of nodes increased. The speedup of the OPTO
1000 approach was even worse since as the number of nodes in-
creased, the extra data that was sent through the network 800
had to be aggregated at only one reducer. Because BTO
600
was the fastest for settings of 8 nodes and 10 nodes, and it
spedupbetterthanOPTO,weonlyconsideredBTOforthe 400
end-to-end combinations. 200
Stage 2: Kernel. For the PK approach, an important
0
factor affecting the running time is the number of token
2 3 4 5 6 7 8 9 10
groups. We evaluated the running time for different num- # Nodes and Dataset Size
bersofgroups. Weobservedthatthebestperformancewas (times 2.5 x original)
achieved when there was one group per token. The reason
was that the reducefunctioncouldbenefitfrom the group-
ing conducted“for free”by the MapReduce framework. If
groupshadmorethanonetoken,theframeworkspendsthe
same amount of time on grouping, but the reducer benefits
less. Bothapproacheshadanalmostperfectspeedup. More-
over, in all the settings, the PK approachwas the fastest.
Stage3: RecordJoin. TheOPRJapproachwasalways
fasterthantheBRJapproach. Themainreasonforthepoor
speedup of the BRJ approach was due to skew in the RID
)sdnoces(
emiT
BTO-BK-BRJ
BTO-PK-BRJ
BTO-PK-OPRJ
Ideal
Figure 11: Running time for self-joining the
DBLP×n dataset (where n ∈ [5,25]) increased pro-
portionally with the increase of the cluster size.
Inordertoevaluatethescaleupoftheproposedapproaches
we increased the dataset size and the cluster size together
by the same factor. A perfect scaleup could be achieved if
the running time remained constant. Figure 11 shows the
running time for self-joining the DBLP dataset, increased
from 5 to 25 times, on a cluster with 2 to 10 nodes, respec- size increased. The OPRJ approach did not scale up well
tively. We can see that the fastest combined algorithm was since the list of RID pairs that needed to be loaded and in-
BTO-PK-OPRJ. We can also see that all three combina- dexed by each map function increased linearly with the size
tions scaled up well. BTO-PK-BRJ had the best scaleup. of the dataset.
In the following, we look at the scaleup characteristics of
6.1.3 Self-JoinSummary
eachstage. Table2showstherunningtime(inseconds)for
each of the self-join stages. We have the following observations:
• For the first stage, BTO was the best choice.
Stage Alg. # Nodes/DatasetSize
• For the second stage, PK was the best choice.
2/x5 4/x10 8/x20 10/x25
• Forthethirdstage,thebestchoicedependsontheamount
1 BTO 124.05 125.51 127.73 128.84
of data and the size of the cluster. In our experiments,
OPTO 107.21 115.36 136.75 149.40
OPRJ was somewhat faster, but the cost of loading the
2 BK 328.26 371.08 470.84 522.88
similar-RID pairs in memory was constant as the the
PK 311.19 330.47 375.72 401.03
clustersizeincreased,andthecostincreasedasthedata
3 BRJ 156.33 162.53 166.66 168.08
sizeincreased. Forthesereasons,werecommendBRJas
OPRJ 60.61 74.32 102.44 117.15
a good alternative.
Table 2: Running time (seconds) of each stage for • The three combinations had similar speedups, but the
self-joining the DBLP×n dataset (n ∈ [5,25]) in- best scaleup was achieved by BTO-PK-BRJ.
creased proportionally with the increase of the clus- • Ouralgorithmsdistributedthedatawellinthefirstand
ter size. the second stages. For the third stage, the algorithms
were affected by the fact that some records produced
Stage 1: Token Ordering. We can see in Table 2 morejoinresultsthatothers,andtheamountofworkto
that the BTO approach scaled up almost perfectly, while be done was not well balanced across nodes. This skew
theOPTOapproachdidnotscaleupaswell. Moreover,the heavily depends on the characteristics of the data and
OPTOapproachbecamemoreexpensivethantheBTOap- we plan to study this issue in future work.
proach as the number of nodes increased. The reason why
6.2 R-SJoin Performance
theOPTOapproachdidnotscaleupaswellwasbecauseit
usedonlyonereduceinstancetoaggregatethetokencounts To evaluate the performance of the algorithms for the R-
(instead of using multiple reduce functions as in the BTO S-joincase,wedidajoinbetweentheDBLPandtheCITE-
case). Both approachesused a single reduceto sort the to- SEERX datasets. We increased both datasets at the same
kens by frequency. As the data increased, the time needed time by a factor between 5 and 25. Figure 12 shows the
to finish the one reduce function increased. runningtimeforthejoinona10-nodecluster. Weusedthe
Stage2: Kernel. WecanseethatthePKapproachwas same combinations for each stage as in the self-join case.
always faster and scaled up better than the BK approach. Moreover, the first stage was identical to the first stage of
To understand why the BK approachdid not scale up well, the self-join case, as this stage was run on only one of the
let us take a look at the complexity of the reducers. The datasets, in this case, DBLP. The running time for the sec-
reduce function was called for each prefix token. For each ond stage (kernel) increased the fastest compared with the
token, the reduce function received a list of record projec- other stages, but for the 5 and 10 dataset-increase factors,
tions, and had to verify the self-cross-product of this list. the third stage (record join) became the most expensive.
Moreover, a reducer processed a certain number of tokens. Themainreasonforthisbehavior,comparedtotheself-join
Thus,ifthelengthoftherecordprojectionslistisnandthe case,wasthatthisstagehadtoscantwodatasetsinsteadof
numberoftokensthateachreducerhastoprocessism,the one,andtherecordlengthoftheCITESEERXdatasetwas
complexity of each reducer is O(m·n2). As the dataset in- much larger than the record length of the DBLP dataset.
creases,thenumberofuniquetokensremainsconstant,but For the 25 dataset-increase factor, the OPRJ approach ran
the number of records having a particular prefix token in- outofmemorywhenitloadedthelistofRIDpairs,making
creased by the same factor as the dataset size. Thus, when BRJ the only option.
thedatasetisincreasedttimesthelengthoftherecordpro-
6.2.1 R-SJoinSpeedup
jections list increases t times. Moreover, as the number of
nodes increases t times, the number of tokens that each re- Asintheself-joincase,weevaluatedthespeedupoftheal-
ducerhastoprocessdecreasesttimes. Thus,thecomplexity gorithmsbykeepingthedatasetsizeconstantandincreasing
ofeachreducerbecomesO`m/t·(n·t)2 ´=O(t·m·n2). De- thenumberofnodesinthecluster. Figure13showstherun-
spite the fact the reduce function had a running time that ning times for the same three combinations of approaches.
grew proportional with the dataset size, the scaleup of the We can see that the BTO-PK-OPRJ combination was ini-
BK approach was still acceptable because the map function tially the fastest, but for the 10-node cluster, it became
scaled up well. In the case of PK, the quadratic increase slightly slower than the BTO-BK-BRJ and BTO-PK-BRJ
when the data linearly increased was alleviated because an combinations. Moreover,BTO-BK-BRJandBTO-PK-BRJ
index was used to decide which pairs are verified. sped up better than BTO-PK-OPRJ.
Stage 3: Record Join. We can see that the BRJ Tobetterunderstandthespeedupbehavior,welookedat
approach had an almost perfect scaleup, while the OPRJ eachindividualstage. Thefirststageperformancewasiden-
approach did not scale up well. For our 10-node cluster, tical to the first stage in the self-join case. For the second
theOPRJapproachwasfasterthantheBRJapproach,but stage we noticed a similar speedup (almost perfect) as for
OPRJcouldbecomeslowerasthenumberofnodesanddata the self-join case. Regarding the third stage, we noticed
2400
2000
1600
1200
800
400
0
5 10 25
Dataset Size (times the original)
)sdnoces(
emiT
4000
1-BTO
2-BK
3-BRJ 3000
2-PK
3-OPRJ
2000
1000
0
2 3 4 5 6 7 8 9 10
# Nodes
Figure 12: Running time for
joining the DBLP×n and the
CITESEERX×n datasets (where
n∈[5,25]) on a 10-node cluster.
)sdnoces(
emiT
3500
BTO-BK-BRJ 3000
BTO-PK-BRJ
BTO-PK-OPRJ 2500
Ideal
2000
1500
1000
500
0
2 3 4 5 6 7 8 9 10
# Nodes and Dataset Size
(times 2.5 x original)
Figure 13: Running time for
joining the DBLP×10 and the
CITESEERX×10 datasets on
different cluster sizes.
)sdnoces(
emiT
BTO-BK-BRJ
BTO-PK-BRJ
BTO-PK-OPRJ
Ideal
Figure 14: Running time for
joining the DBLP×n and the
CITESEERX×n datasets (where
n ∈ [5,25]) increased proportion-
ally with the cluster size.
that the OPRJ approach was initially the fastest (for the • For both self-join and R-S join cases, we recommend
2 and 4 node case), but it eventually became slower than BTO-PK-BRJ as a robust and scalable method.
the BRJ approach. Additionally, the BRJ approach sped
up better than the OPRJ approach. The poor performance
7. RELATEDWORK
oftheOPRJapproachwasduetothefactthatallthemap
instances had to load the list of RID pairs that join. Set-similarity joins on a single machine have been widely
studiedintheliterature[23,6,3,4,29]. Inverted-list-based
6.2.2 R-SJoinScaleup algorithms for finding pairs of strings that share a certain
number of tokens in common have been proposed in [23].
WealsoevaluatedthescaleupoftheR-Sjoinapproaches.
Later work has proposed various filters that help decrease
The evaluation was similar to the one done in the self-join
the number of pairs that need to be verified. The prefix
case. In Figure 14 we plot the running time of three com-
filter has been proposed in [6]. The length filter has been
binations for the three stages as we increased the dataset
studied in [3, 4]. Two other filters, namely the positional
size and the cluster size by the same factor. We can see
filter and the suffix filter, were proposed in [29]. In partic-
that BTO-BK-BRJ and BTO-PK-BRJ scaled up well. The
ular, for edit distance, two more filters based on mismatch
BTO-PK-BRJ combination scaled up the best. BTO-PK-
have been proposed in [28]. Instead of directly using the
OPRJ ran out of memory in the third stage for the case
tokens in the strings, the approach in [3] generates a set of
where the datasets were increased 8 times the original size.
signaturesbasedonthetokensinthestringandreliesonthe
The third stage ran out of memory when it tried to load in
fact that similar strings need to have a common signature.
memory the list of RID pairs that join. Before running out
Adifferentwayofformulatingset-similarityjoinproblemis
of memory, though, BTO-PK-OPRJ was the fastest.
toreturnpartialanswers,byusingtheideaoflocalitysensi-
To better understand the behavior of our approaches, we
tivehashing[12]. Itisworthnotingmostofworkdealswith
again analyzed the scaleup of each individual stage. The
valuesalreadyprojectedonthesimilarityattributeandpro-
first stage performance was identical with its counter-part
ducesonlythelistofRIDsthatjoin. Computingsuchalist
in the self-join case. Additionally, the second stage had a
is the goal of our second stage and most algorithms could
similar scaleup performance as its counterpart in the self-
successfully replace PPJoin+ in our second stage. To the
join case. Regarding the third stage, we observed that the
bestofourknowledge,thereisnopreviousworkonparallel
BRJ approach scaled up well. We also observed that even
set-similarity joins.
before running out of memory, the OPRJ approach did not
Paralleljoinalgorithmsforlargedatasetshadbeenwidely
scale up well, but for the case where it did not run our of
studied since the early 1980’s (e.g., [19, 24]). Moreover,
memory, it was faster than the BRJ approach.
the ideas presented in Section 5 bear resemblance with the
bucket-size-tuning ideas presented in [18]. Data partition
6.2.3 R-SJoinSummary
and replication techniques have been studied in [9] for the
We have the following observations: problem of numeric band joins.
• The recommendations for the best choice from the self- The MapReduce paradigm was initially presented in [7].
join case also hold for the R-S join case. Since then, it has gained a lot of attention in academia [30,
21, 10] and industry [2, 16]. In [30] the authors proposed
• The third stage of the join became a significant part of
extending the interface with a new function called“merge”
the execution due to the increased amount of data.
in order to facilitate joins. A comparison of the MapRe-
• Thethreealgorithmcombinationspreservedtheirspeedup duce paradigm with parallel DBMS has been done in [21].
and scaleup characteristicsas for the self-join case. Higher-levellanguagesontopofMapReducehavebeenpro-
• We also observed the same data distribution character- posed in [2, 16, 10]. All these languages could benefit from
istics as for the self-join case. the addition of a set-similarity join operator based on the
techniques proposed here. In the context of JAQL [16] a [13] L. Gravano, P. G. Ipeirotis, H. V. Jagadish,
tutorial on fuzzy joins was presented in [17]. N. Koudas, S. Muthukrishnan,and D. Srivastava.
Approximate string joins in a database (almost) for
8. CONCLUSIONS free. In VLDB, pages 491–500, 2001.
[14] M. R. Henzinger. Finding near-duplicateweb pages: a
In this paper we studied the problem of answering set-
large-scale evaluation of algorithms. In SIGIR, pages
similarityjoinqueriesinparallelusingtheMapReduceframe-
284–291, 2006.
work. Weproposedathree-stageapproachandexploredsev-
[15] T. C. Hoad and J. Zobel. Methods for identifying
eral solutions for each stage. We showed how to partition
versioned and plagiarized documents. JASIST,
the data across nodes in order to balance the workload and
54(3):203–215,2003.
minimize the need for replication. We discussed ways to
[16] Jaql. http://www.jaql.org.
efficiently deal with partitioning, replication, and multiple
inputs by exploiting the characteristics of the MapReduce [17] Jaql - Fuzzy join tutorial. http://code.google.com/
framework. We also described how to control the amount p/jaql/wiki/fuzzyJoinTutorial.
of data that needs to be kept in memory during join by ex- [18] M. Kitsuregawa and Y. Ogawa. Bucket spreading
ploitingthedataproperties. Westudiedbothself-joinsand parallel hash: A new, robust, parallel hash join
R-Sjoins,end-to-end,bystartingfromcompleterecordsand method for data skew in the super database computer
producing complete record pairs. Moreover, we discussed (sdc). In VLDB, pages 210–221, 1990.
strategies for dealing with extreme situations where, even [19] M. Kitsuregawa, H. Tanaka, and T. Moto-Oka.
after the data is partitioned to the finest granularity, the Application of hash to data base machine and its
amount of data that needs to be in the main memory of architecture. New Generation Comput., 1(1):63–74,
onenodeistoolargetofit. Givenourproposedalgorithms, 1983.
we implemented them in Hadoop and analyzed their per- [20] A. Metwally, D. Agrawal, and A. E. Abbadi.
formance characteristics on real datasets (synthetically in- Detectives: detecting coalition hit inflation attacks in
creased). advertising networks streams. In WWW, pages
Acknowledgments: We would like to thank Vuk Ercego- 241–250, 2007.
vac for a helpful discussion that inspired the stage variants [21] A. Pavlo, E. Paulson, A. Rasin, D. J. Abadi, D. J.
in Section 3.1.2 and 3.3.2. This study is supported by NSF DeWitt, S. Madden, and M. Stonebraker. A
IISawards0844574and0910989,aswellasagrantfromthe comparison of approachesto large-scale data analysis.
UC Discovery program and a donation from eBay. In SIGMOD Conference, pages 165–178, 2009.
[22] M. Sahami and T. D. Heilman. A web-based kernel
9. REFERENCES function for measuring the similarity of short text
snippets. In WWW, pages 377–386, 2006.
[1] Apache Hadoop. http://hadoop.apache.org.
[23] S. Sarawagi and A. Kirpal. Efficient set joins on
[2] Apache Hive. http://hadoop.apache.org/hive.
similarity predicates. In SIGMOD Conference, pages
[3] A. Arasu, V. Ganti, and R. Kaushik. Efficient exact
743–754, 2004.
set-similarity joins. In VLDB, pages 918–929, 2006.
[24] D. A. Schneider and D. J. DeWitt. A performance
[4] R. J. Bayardo, Y. Ma, and R. Srikant. Scaling up all
evaluation of four parallel join algorithms in a
pairssimilaritysearch.InWWW,pages131–140,2007.
shared-nothingmultiprocessor environment. In
[5] A. Z. Broder, S. C. Glassman, M. S. Manasse, and SIGMOD Conference, pages 110–121, 1989.
G. Zweig. Syntactic clustering of the web. Computer
[25] E. Spertus, M. Sahami, and O. Buyukkokten.
Networks, 29(8-13):1157–1166,1997.
Evaluating similarity measures: a large-scale study in
[6] S. Chaudhuri,V. Ganti, and R. Kaushik. A primitive the orkut social network. In KDD, pages 678–684,
operatorforsimilarityjoinsindatacleaning.InICDE, 2005.
page 5, 2006.
[26] R. Vernica, M. Carey, and C. Li. Efficient parallel
[7] J. Dean and S. Ghemawat. MapReduce: simplified set-similarity joins using MapReduce. Technical
data processing on large clusters. Commun. ACM, report, Department of Computer Science, UC Irvine,
51(1):107–113,2008. March 2010. http://asterix.ics.uci.edu.
[8] D. J. DeWitt and J. Gray. Parallel database systems: [27] Web 1t 5-gram version 1. http://www.ldc.upenn.
The future of high performance database systems. edu/Catalog/CatalogEntry.jsp?
Commun. ACM, 35(6):85–98,1992. catalogId=LDC2006T13.
[9] D. J. DeWitt, J. F. Naughton,and D. A. Schneider. [28] C. Xiao, W. Wang, and X. Lin. Ed-join: An efficient
An evaluation of non-equijoin algorithms. In VLDB, algorithm for similarity joins with edit distance
pages 443–452, 1991. constraints. In VLDB, 2008.
[10] A. Gates, O. Natkovich, S. Chopra, P. Kamath, [29] C. Xiao, W. Wang, X. Lin, and J. X. Yu. Efficient
S. Narayanam, C. Olston, B. Reed, S. Srinivasan, and similarityjoinsfornearduplicatedetection.InWWW,
U. Srivastava. Building a highlevel dataflow system on pages 131–140, 2008.
top of MapReduce: the Pig experience. PVLDB,
[30] H. Yang, A. Dasdan, R.-L. Hsiao, and D. S. P. Jr.
2(2):1414–1425,2009.
Map-Reduce-Merge: simplified relational data
[11] Genbank. http://www.ncbi.nlm.nih.gov/Genbank. processing on large clusters. In SIGMOD Conference,
[12] A. Gionis, P. Indyk, and R. Motwani. Similarity pages 1029–1040,2007.
search in high dimensions via hashing. In VLDB,
pages 518–529, 1999.