程序代写案例-ECS765P

欢迎使用51辅导,51作业君孵化低价透明的学长辅导平台,服务保持优质,平均费用压低50%以上! 51fudao.top
SOLUTIONS AND MARKING SCHEME
January 2019-2020
ECS765P Big Data Processing Duration: 2 hours
YOU ARE NOT PERMITTED TO READ THE CONTENTS OF THIS QUESTION PAPER UN-
TIL INSTRUCTED TO DO SO BY AN INVIGILATOR.
Instructions:
This paper contains FOUR questions. Answer ALL questions
Calculators are permitted in this examination. Please state on your answer book the
name and type of machine used.
COMPLETE ALL ROUGH WORKINGS IN THE ANSWER BOOK AND CROSS THROUGH
ANY WORK WHICH IS NOT TO BE ASSESSED.
IMPORTANT NOTE:
THE ACADEMIC REGULATIONS STATE THAT POSSESSION OF UNAUTHORISED MA-
TERIAL AT ANY TIME WHEN A STUDENT IS UNDER EXAMINATION CONDITIONS IS AN
ASSESSMENT OFFENCE AND CAN LEAD TO EXPULSION FROM QMUL.
PLEASE CHECK NOW TO ENSURE YOU DO NOT HAVE ANY NOTES, MOBILE PHONES
OR UNATHORISED ELECTRONIC DEVICES ON YOUR PERSON. IF YOU HAVE ANY
THEN PLEASE RAISE YOUR HAND AND GIVE THEM TO AN INVIGILATOR IMMEDIATELY.
PLEASE BE AWARE THAT IF YOU ARE FOUND TO HAVE HIDDEN UNAUTHORISED MA-
TERIAL ELSEWHERE, INCLUDING TOILETS AND CLOAKROOMS IT WILL BE TREATED
AS BEING FOUND IN YOUR POSSESSION. UNAUTHORISED MATERIAL FOUND ON
YOUR MOBILE PHONE OR OTHER ELECTRONIC DEVICE WILL BE CONSIDERED THE
SAME AS BEING IN POSSESSION OF PAPER NOTES. MOBILE PHONES CAUSING A
DISRUPTION IS ALSO AN ASSESSMENT OFFENCE.
EXAM PAPERS MUST NOT BE REMOVED FROM THE EXAM ROOM.
Examiners:
Dr Felix Cuadrado and Dr Johan Pauwels
© Queen Mary University of London, 2019-2020
Page 2 ECS765P (2019-2020)
Question 1
You have a dataset from a non-profit organisation listing the full history of member contri-
butions for emergency appeals when e.g. a disaster happens.
For the next emergency appeals campaign, the organization is aiming to send postal
envelopes to the 500 highest contributors up to this point.
The dataset is a collection of rows, each one registering an individual contribution from a
member in the following format:
timestamp;memberId;isMultipleDonor;donationAmount;memberAge
The isMultipleDonor field is a boolean field that will be True if the member has made
more than one donation. donationAmount contains the value of that individual donation,
whereas memberAge includes for how many months they have been a member.
(a) (i) Design a combination of *two* MapReduce programs that computes the top 500
members with highest individual average contributions to these urgent appeal
campaigns. You should only consider members with more than one individual
contribution.
The code flow must be explained, discussing the input and output of each Map
and Reduce function that has been defined (without detailing the implementation).
You may use a diagram to illustrate the overall data flow.
Note: while a single MapReduce solution is technically possible, it would result
in potentially some scalability/performance issues. Single MapReduce solutions
should mention these aspects specifically.
(ii) Write the pseudocode for one of the two MapReduce jobs you designed in the
previous question. State in your solutions any assumptions that are made as
part of the program, as well as the behaviour of any custom function you deem
necessary.
[15 marks — basic]
Solution:
The first program will compute the average contribution of each member from
the dataset that is a multiple times donor 3 . That input will be fed to the second
program that will compute the global average (numerical summarisation) of all
these results 3 . First mapper will emit (memberId, donationAmount) 1 , First
reducer will emit (memberId, averageAmount) 1 Second mapper will emit (None,
(memberId,averageAmount)), 1 Second reducer will emit 500 x (memberId,
ranking, averageAmount) 1
Either of these two MR programs is sufficient for the 5 marks
Turn over
ECS765P (2019-2020) Page 3
def mapper1(self, _, line):\mk{1}
fields = line.split(";")
multipleDonor = fields[2]
if(multipleDonor):\mk{1}
memberId = fields[1]
amount = fields[3]
yield (memberId, amount)\mk{1}
def reducer1(self, memberId, values):\mk{1}
average = computeAverage(values)
yield(memberId, average) \mk{1}
def mapper2(self, memberId, average): \mk{1}
yield(None, (memberId, average)) \mk{1}
def reducer2(self, _, values): \mk{1}
sorted = values.sortByValue()
top500 = sorted.getTop(500)\mk{1}
for item in top500:
yield(item[0], item[1]) \mk{1}
(b) This question is about the Combiner in the context of MapReduce jobs.
(i) Explain who runs the Combiner function and at what point in time Combiner
functions execute during the shuffle and sort stage of MapReduce jobs.
(ii) Discuss whether the *second* job from 1a) would benefit from using a Combiner.
To do so, first list what will change when a Combiner runs, and then explain its
performance impact. If you completed 1a) with a single MapReduce job, then
answer the question referring to that job.
[10 marks — medium]
Solution:
A Combiner is an intermediate Reducer that is executed at each of the Mapper
nodes 2 . The Combiner runs after the Mappers have finished their execution,
and before data is transferred to the Reducers 2 . Combiner runs according to
Hadoop from 0 to multiple times, as it’s optional and idempotent 1 .
The second job can run the same Reducer as a Combiner 2 . This will prevent
memory issues when running the job 1 , and make it complete considerably
faster, as a single Reducer invocation would need to sort every single item that
Turn over
Page 4 ECS765P (2019-2020)
needs to be ranked. 2
Turn over
ECS765P (2019-2020) Page 5
Question 2
(a) You have a dataset of user ’reactions’ from the Instagram social media network. The
dataset contains one entry for each time a user liked a specific post, recording the
following information:
timestamp;userId;storyId;reactionType;
reactionType records the type of emoji users reacted to the message, with potential
types being ”like”, ”dislike”, or ”love”
This dataset is given as input to the following MapReduce job:
def mapper(self, _, line):
fields = line.split(";")
id = fields[1]
reactionType = fields[3]
yield (id, (reactionType,1))
def reducer(self, id, reactions):
reactionCounts = sumReactionsByType()
#sumReactionsByType() sums the partial totals
# for each Type, obtaining a dictionary of
# types and counts for that reaction
top = getMax(reactionCounts)
yield ( id, top.getType())
What will be the outcome of the MapReduce job? Explain the outcome as well as the
nature of information exchanged in the job execution.
[5 marks — basic]
Solution:
The outcome will be the most common type of reaction for each post that has
been posted in the dataset 3 . The mapper sends for each reaction the post id
as a key, and a pair with the type of reaction and one as a pair 1 . The reducer
receives all the reactions for a post, aggregates them together by type, and yields
the most common one 1 .
(b) Suppose the program presented in 2a) will be executed on a dataset of 5 billion
(5,000,000,000) reactions, collected over 3*365 days of data. In total there are
50,000,000 stories that have been reacted to. The total size of the dataset is 4*128
Gigabytes. The cluster has 5000 worker nodes (all of them idle), and HDFS is
configured with a block size of 128MB.
Using that information, provide a reasoned answer to the following questions. State
Turn over
Page 6 ECS765P (2019-2020)
any assumptions you feel necessary when presenting your answer.
Note: you can consider in your computations either 1kB = 1000 bytes, or 1 kB = 1024
bytes.
(i) How many physical nodes will be involved during the execution of the Map and
Reduce tasks of the job? How many input splits are processed at each physical
node?
(ii) In each single worker node, how many times does the Map method run?
(iii) How many times will the Reduce method be invoked at each reducer node?
[10 marks — medium]
Solution:
There is one data split per HDFS block: 4*128GB/128MB = 4000 splits that need
to be processed. The ApplicationMaster will attempt to assign one to a node,
taking 4000 nodes for Map (ideally local nodes with the data already stored) 2 .
As there is enough capacity, one split will be processed by each involved physical
node 2 . The number of reduce nodes is user defined, by default 2 1
If we assume balanced size of all splits in number of lines, there will be 5 billion
lines/4000 mappers = 1.25 million lines per Mapper 3 .
Students can either use nreducers, or give it a value. The number of unique
keys in this case is the 50 million stories. Number of reduce invocations is 50
million/nred (25m with the default 2) 2 .
(c) Define the concept of data skew in distributed data processing. For the program
defined in 2a) and the data and environment figures presented in 2b), discuss whether
data skew would cause a noticeable effect while running the provided code. If that
will be the case, mention (no need to fully detail) one potential change to the job that
would greatly improve the skew effect.
[10 marks — advanced]
Solution: Data skew affects MapReduce jobs by having an unbalanced distri-
bution of values between the Reduce keys, hence unbalancing the distributed
load. 3 In this particular case, the dataset being of social interactions, data skew
will be a notable effect, with some posts being vastly more popular than others.
This will result in potentially heavy data skew. 3 A simple way to palliate this skew
would be to add a Combiner to the mappers that aggregates together the counts
of reactiontypes for the same post id. 2 As long as posts are emitted by the same
mapper, this approach will be effective. 2
Turn over
ECS765P (2019-2020) Page 7
Question 3
(a) (i) Give one example of a Spark element-wise operation. Describe how would the
Reduce step of an equivalent Hadoop job look like.
(ii) Based on your previous answer, discuss the parallelisation potential of this
element-wise operation as a function of the size of the input data size. De-
tail in your explanation what nodes will be active during the computation, as well
as the data transferred over the network.
[9 marks]
Solution: i) Example: map, flatMap, filter. The Reduce step is not needed, i.e
element-wise operations are Map-only jobs 3 . ii) An element-wise or map-only
operation is embarrassingly parallel, which means that scaling up to a larger
cluster is trivial 2 . All nodes that contain a partition of the data will be used in the
computation 2 . Since no shuffling of the data is needed, no transfers over the
network are needed 2 .
Marking scheme: i) Three marks: basic i)) Six marks: basic-medium
(b) You are given a dataset with the historical results of the top-tier English Football
League per season. It is in a CSV (comma separated value) format and consists of
the following fields:
Season ,Team, Wins , Draws , Losses , GoalsFor , GoalsAgainst
An example of this data is as follows:
1888−1889, Preston North End,18 ,4 ,0 ,74 ,15
1888−1889,Aston V i l l a ,12 ,5 ,5 ,61 ,41
1888−1889,Wolverhampton Wanderers ,12 ,4 ,6 ,51 ,37
1888−1889, Blackburn Rovers ,10 ,6 ,6 ,66 ,45
1888−1889, Bol ton Wanderers ,10 ,2 ,10 ,63 ,59
Suppose you execute the following Python Spark job on the dataset:
1l i n e s = sc . t e x t F i l e ( ’ hdfs : / / inputPath . csv ’ )
2f i l t e r e d 1 = l i n e s . f i l t e r ( lambda l : len ( l . s p l i t ( ” , ” ) ) == 7)
3rows = f i l t e r e d 1 .map( lambda l i n e : l i n e . s p l i t ( ” , ” ) )
4f i l t e r e d 2 = rows . f i l t e r ( lambda r : r [ 2 ] == r [ 4 ] )
5mapped = f i l t e r e d 2 .map( lambda r : ( r [ 1 ] , 1 ) )
6r e s u l t = mapped . reduceByKey ( lambda a , b : a+b )
7r e s u l t . saveAsTextFi le ( ’ hdfs : / / outputPath ’ )
We provide for reference the following information about the Python and Spark-specific
functions appearing in the program:
Turn over
Page 8 ECS765P (2019-2020)
filter is a Spark transformation that for each input element will either generate the
same element as output if the condition expressed in the function is true, or no element
at all if it is false.
map is a Spark transformation that for each input element generates one output
element, whose value is dictated by the function.
reduceByKey is a Spark transformation that takes as input an RDD of pairs of
elements, first groups all the pairs, putting together the ones with the same key, and
generates one pair as a result of the group. The key of the result is the common key,
and the value is the result of reducing the group of values into a single one by applying
the function.
min is a Spark action that returns to the driver the minimum item as specified by the
optional key function.
saveAsTextFile is a Spark action that saves the RDD to the provided HDFS path.
len is the Python function that returns the length of a string, or list.
split is the Python function that splits a string into a list of strings, using the provided
argument as separator.
int is the Python function that converts the string representation of a number into its
numerical value.
(i) Explain what the final result of the code will be. Discuss what each line in the
code does to end up at this result. It may be easier to reference each of these
lines by their number or variable names.
(ii) Detail the changes you need to make to the above code in order to find what
season had the least number of draws in history. The output needs to be a string
of the form “The least number of draws (N) happened in season YYYY-YYYY”. You
may explain these changes in your own words, or provide the code in pseudocode
or Python.
[16 marks]
Solution:
i) The code returns the number of seasons each team had as many wins as
losses 2 . Line 1 creates an RDD by reading a text file from HDFS 1 . Line 2
filters the data for lines which do not have the correct format/have missing fields 1 .
Line 3 splits CSV lines into separate values 1 . Line 4 keeps only the seasons
when a team had as many wins as losses 1 . Line 5 creates key-value pairs of
the team name and a value of 1 of the output of the previous line 1 . Line 6 sums
the values of 1 per team name to count the number of times each time has had
as many wins as losses in a season 1 . Line 7 is an action that writes the result
to HDFS 1 .
ii) Something along the lines of:
Turn over
ECS765P (2019-2020) Page 9
1l i n e s = sc . t e x t F i l e ( ’ hdfs : / / inputPath . csv ’ )
2wel l formed = l i n e s . f i l t e r ( lambda l : len ( l . s p l i t ( ” , ” ) ) == 7)
3rows = wel l formed .map( lambda l : l . s p l i t ( ” , ” ) )
4season draws = rows .map( lambda r : ( r [ 0 ] , i n t ( r [ 3 ] ) ) )
5draws per season = season draws . reduceByKey ( lambda x , y : x+y )
6r e s u l t = draws per season . min ( key=lambda x : x [ 1 ] )
7answer = ’ The l e a s t number o f draws ({} ) happened i n
8season {} ’ . format ( r e s u l t [ 1 ] , r e s u l t [ 0 ] )
Marking scheme: i) Nine marks: basic-medium. ii) Seven marks: medium-
advanced. Line 4 2 , line 5 2 , line 6 2 , line 7–8 1 . Similar code will be given
equivalent marks as long as they implement analogous functions.
Turn over
Page 10 ECS765P (2019-2020)
Question 4
(a) This question is about different types of stream processing.
(i) Components of a streaming algorithm are either stateless or stateful. Explain
what these terms mean in this context and give an example computation for each.
(ii) Micro-batching and pure stream processing both run on unbounded data. Explain
how these two processing models differ. Discuss in what situation each would be
more appropriate, by making reference to latency and throughput considerations.
[12 marks — medium]
Solution:
i) A stateless computation is one where the function will be run on each tuple 1
without the influence or ’state’ maintained from any other 1 , an example of this
would be a filter 1 . Stateful operations may maintain a state 1 based on tuples
previously seen 1 , for example an aggregate count for keys 1 .
ii) Analysis ran in a pure stream processing fashion will operate on every tuple
as they arrive 1 , leading to low latency of results 1 . Micro-batching on the
other-hand may wait x number of seconds before triggering 1 the operation and
executing on all ingested tuples, often leading to higher throughput, but obviously
slower return of the result 1 . Pure streaming may be more appropriate when
there is some real-time requirement or alerts 1 , whilst micro-batching may be
more appropriate for windowing 1 .
(b) This question is about the scalability of streaming systems.
(i) Streaming systems often consist of data sources and computational units; Spouts
and Bolts respectively in Apache Storm. Describe two things that may happen to
these components when a system is scaled up from a single node to a cluster of
machines for higher throughput.
[6 marks — basic]
Solution:
i) The first thing that may happen to the bolts and spouts is that they are distributed
among the nodes within the cluster instead of being on the same machine 3 . The
second thing to occur would be replication of the bolts to allow their tasks to be
parallelised 3 .
(c) Graphs contain a set of vertices (entities within the data) alongside a set of edges
(relationships between entities). Parallel Breadth First Search is a popular algorithm
for resolving the shortest path between two entities within a graph.
Explain how the Pregel model (think like a vertex) can be used to implement graph
algorithms like Parallel Breadth First Search in a distributed environment such as
Turn over
ECS765P (2019-2020) Page 11
Spark GraphX. Give one example use case of finding the shortest path between
entities.
[7 marks — advanced]
Solution:
i) In the Pregel model computation is carried out in supersteps 1 . Each vertex
gets a function to run every superstep, within this they may ingest messages,
calculate some internal state, send message’s to neighbours and vote to halt 2 .
Each vertex may execute in parallel and may, therefore, be distributed across any
number of machines 2 . An example algorithm for shortest path is routing traffic
between two nodes in a computer network 2 .
End of questions

欢迎咨询51作业君
51作业君

Email:51zuoyejun

@gmail.com

添加客服微信: abby12468