代写接单-Z5222745

Z5222745 Dongkun Ren Question 1 

Input_File: AAAAA aaaaa AAAAA aaaaa BBBBB AAAAA Ouput_File: AAAAA 3 aaaaa 2 BBBBB 1 (a) data flow in MapReduce. STEP_1. Read aaaaa 1 AAAAA 1 BBBBB 1 Aaaaa 1 AAAAA 1 AAAAA 1 STEP_3. Grouping by reducer; Keys, Values aaaaa 1 aaaaa 1 AAAAA 1 AAAAA 1 AAAAA 1 BBBBB 1 Reduce process STEP_2. Shuffle ; aaaaa 1 aaaaa 1 AAAAA 1 AAAAA 1 AAAAA 1 BBBBB 1 STEP_4: reduce keys, values aaaaa 2 AAAA 3 BBBBB 1 Map process in line and Put It in writable ; In a word, the details of data in workflow is like the above shown, it can also be concluded to 5 terms Input, Map, Shuffing&Sorting, Reduce, Ouput. (b) data flow in Spark Sparks workflow is different from the workflow of MapReduce. 1.Spark use RDD or DataFrame dataStructure. 2. Spark can use many API functions such as flatMap or map which does not rely on the systems MAP and Reduce mechanism. 3. In addition, spark only execute the functions at the end of output. 4. Spark has to use collect to process the distributed data.5. HDFS ecosystem Question 2 (a) The time complexity: O(n word_length) class Mapper method Map(line, text): for line in text: for token1 in line: for token2 in line: emit( (token1, token2), 1) class Reducer method Setup(): queue = PriorityQueue() method Reduce((token1, token2), Iterable<Int> counts): sum <- counts put (token1, token2) in queue method CleanUp(): take topK and context.write Question 3 (a) val docs = sc. parallelize(List((1, "hello scala creates world"), (2, "this scala program"), (3, "creates a pair RDD"), (4, "in spark"))) // my code starts here val InvList = docs.flatMap(x => x._2.split(" ").map(y => (y, x._1))) .groupByKey() .map(x => (x._1, x._2.toList.sorted.distinct)) .sortByKey() // my code ends here InvList.saveAsTextFile("question_a") (b) val pairs = List((1, 2), (3, 4), (3, 5)) // fill your code here, and store the result in a DataFrame resMinMax import spark.implicits._ import org.apache.spark.sql.functions._ case class KeyValuePair(key: Int, value: Int) val pairDF = pairs.toDF("key", "value") val resMinMax = pairDF.groupBy("key").agg(min(col("value")).alias("minValue"), max(col("value")).alias("maxValue")).withColumn("diff", col("maxValue") - col("minValue")) .filter(col("diff") > 100).sort(col("key").desc) resMinMax.write.format("csv").save("hdfs://...") (c) val sc = spark.sparkContext val pairs = sc.parallelize(List((1, 2, 5), (1, 4, 6), (3, 4, 2), (3, 6, 8))) val s = "1" val D = 10 val sourceNode = s.toInt val edgeList = pairs.map(x => Edge(x._1.toLong, x._2.toLong, x._3.toDouble)) val graph = Graph.fromEdges[Int, Double](edgeList, 0) // fill your code here, and store the nodes in a variable NeighborNodes val reverseEdgeList = pairs.map(x => Edge(x._2.toLong, x._1.toLong, x._3.toDouble)) val vRDD = reverseEdgeList.flatMap(edge => Array(edge.srcId, edge.dstId)).distinct.map(id => (id, Double.PositiveInfinity)) val reverseGraph1 = Graph(vRDD, reverseEdgeList) val reverseGraph = reverseGraph1.mapVertices((vID, vVal) => if (vID == sourceNode) 0 else vVal) // pregel components val vprog = (vId: VertexId, vVal: Double, message: Double) => if (message < vVal) message else vVal val sendMessage = (triplet: EdgeTriplet[Double, Double]) => { val newDist = triplet.srcAttr + triplet.attr if (newDist >= triplet.dstAttr) { Iterator.empty } else { Iterator((triplet.srcId, newDist)) } } val mergeMessages = (message1: Double, message2: Double) => if (message1 < message2) message1 else message2 val shortestPathGraph = reverseGraph.pregel(Double.PositiveInfinity, maxIterations = 500, EdgeDirection.Out)(vprog = vprog, sendMsg = sendMessage, mergeMsg = mergeMessages) val neighbourNodes = shortestPathGraph.vertices.filter({ case (vId, vVal) => vVal < D }).map(_._1.toLong).collect().sorted println(neighbourNodes) Question 4 (a) k=3 bac, cba, acb, aab, aaa, bcb, baa, abc, aac (b) Similarities: sim(D1, D2): 0.25 sim(D1, D3): 0.25 sim(D2, D3): 0.4286 (c) h3 = 2 0 0 0 0 1 1 1 0 h1 = [0, 0, 0, 1, 1, 0, 0, 2, 1] h2 = [1, 0, 1, 0, 2, 0, 0, 1, 0] sim(1, 2) = 3 / 9 h1 = [0, 0, 0, 1, 1, 0, 0, 2, 1] h3 = [2, 0, 0, 0, 0, 1, 1, 1, 0] sim(1, 3) = 2 / 9 h2 = [1, 0, 1, 0, 2, 0, 0, 1, 0] h3 = [2, 0, 0, 0, 0, 1, 1, 1, 0] sim(2, 3) = 4 / 9 Question 5 (a) (i) Initial: 0123456 0000000 AfterWordhi: h:7,i:8 h1(hi)=7+8mod7=1 h2(hi)=2mod7=2 0123456 0110000 After Word big: b:1, i:8, g:6 h1(big) = 1+8+6 mod 7 = 1 h2(big) = 3 0123456 0111000 After Word data: d:3, a:0, t:19 h1(data) = 3+0+19 mod 7 = 1 h2(data) = 4 0123456 0111100 (ii) h2(spark) = 5 mod 7 = 5 H(spark) = 4, 5, and the bucket 5 is 0, so Spark is not contained in S. (iii) According to the lectures the false positive probability of a bloom filter is : Where k =2, m =3, n =7 in this case. Hence then we got output is: (b) Initial: B0 B1 B2 B3 B4 H1 0 0 0 0 0 H2 0 0 0 0 0 H3 0 0 0 0 0 s:18, p :15, a:0, r:17, k:10 h1(spark) = 18+15+17+10 mod 7 = 4 After word big: H1=6mod5=1 H2=3mod5=3 H3=1mod5=1 B0 B1 B2 B3 B4 H1 0 1 0 0 0 H2 0 0 0 1 0 H3 0 1 0 0 0 After word data 2: H=3,4,3 2 B0 H1 0 H2 0 H3 0 After word set: H = 1, 3, 3 B0 H1 0 H2 0 H3 0 After word data: H = 3, 4 ,3 B1 B2 B3 B4 1 0 2 0 0 0 1 2 1 0 2 0 B1 B2 B3 B4 2 0 2 0 0 0 2 2 1 0 3 0 B0 B1 B2 B3 B4 H1 0 2 0 3 0 H2 0 0 0 2 3 H3 0 1 0 4 0 After word analytics: H = 3, 4 ,0 B0 B1 B2 B3 B4 H1 0 2 0 4 0 H2 0 0 0 2 4 H3 1 1 0 4 0 Count data: whici is min (4, 4, 4) = 4 Question 6 (i) user-user collaborative (ii) item-item collaborative (iii) RMSE 

51作业君 51作业君

Email:51zuoyejun

@gmail.com

添加客服微信: ITCSdaixie