ISIT312 Big Data Management Spring 2021 Assignment 3 All files left on Moodle in a state "Draft(not submitted)" will not be evaluated. Please refer to the submission dropbox on Moodle for the submission due date and time. This assessment contributes to 20% of the total evaluation in the subject. The deliverable is specified in the task(s). It is a requirement that all Laboratory and Assignment tasks in this subject must be solved individually without any cooperation with the other students. If you have any doubts, questions, etc. please consult your lecturer or tutor during lab classes or office hours. Plagiarism will result in a FAIL grade being recorded for that assessment task. The environment of implementation is the Ubuntu virtual machine which is imported from the BigDataVM-2021v2_2.ova file. Q1. Apache HBase (3 marks) Consider the following conceptual model: Develop two alternative implementations of the above schema in HBase; in other words, two HBase tables are to be created. Each HBase table must be loaded with at least two instructors and two subjects. The names of column families and column qualifiers must be indicative. You can determine the row keys and cell values. After finishing loading the data , use a “scan” command to list all rows from each table. You must also explain the key difference between your two implementations. Deliverable. A file solution1.pdf which includes: • The Hadoop commands, HBase shell commands and execution output in Zeppelin or Terminal; • An explanation of the difference between the two implementations. SUBJECT code ID tit le INSTRUCTOR staff-number ID first-name last-name email is-the-coordinator-of teaches Q2. Apache Pig (4 marks) DATA SETS: apat63_99.txt and cite75_99.txt which are in the “datasets” folder on Desktop of the VM. The source of the two data sets is http://www.nber.org/patents/ The first file apat63_99.txt contains about 3 million records for the U.S. patents. Please refer to the Assignment 1 specification for a description of this data set. The second file cite75_99.txt contains more than 16 million lines of citation records. The following content shows the first few lines: "CITING","CITED" 3858241,956203 3858241,1324234 3858241,3398406 3858241,3557384 3858241,3634889 3858242,1515701 3858242,3319261 … For example, the second line shows that patent 3858241 cites patent 956203. The file is sorted by the citing patents. A citation count of a patent refers the number of times that it is cited by other patents. For example, if a patent is cited by 100 patents in total, the citation count of this patient is 100. Load the files into HDFS and use process the following operations in Apache Pig: (1) Find the average number of claims grouped by patents where the grant year is ≥1975 and the country is AU. (2) Find the patent number, grant year and country for the most-cited patient(s), namely those patent(s) with the largest citation count. Deliverable. A file solution2.pdf which contains your implementation (in Pig Latin), commands and results for the above two operations. Q3. Apache Spark, Hive and HBase (8 marks) Consider the above conceptual model of a data warehouse. The data of this model is stored in the files customer.tbl, order_details.tbl, order.tbl, product.tbl and salesperson.tbl, all of which are available in a “Resources” folder of Assignment 3 on Moodle. Note, that each file has a header with information about the meanings of data in each column. A header is not a data component of each file. Remove the headers before transferring the files into HDFS (1) Load the data in the above five files into five external tables in Hive. (You can make reasonable assumptions on the data types.) (2) Load the five Hive tables into Spark dataframes and process the following operations in Spark: a. Find the number of orders whose ship-city is London. b. Find the number of products that were not ordered in 1996. c. Find the order value (i.e., unit price multiplied by quantity of products per order) for order IDs 10270 to 10279. d. Sort the salespersons by the total order value of orders they handled in a descending order, and find the employee ID, fist name and last name of the top three salespersons. (3) Convert the Spark dataframe for the salesperson data (which is from salesperson.tbl) into an HBase table. Then, in the HBase shell, use the get command to retrieve the salesperson data with the employee ID “1”. (You can make reasonable assumptions on the column families.) Note. All the above steps need to be performed in the command line interfaces of the related software. Do not use Zeppelin. Deliverable. A file solution3.pdf which contains all your commands and code in the related command line interfaces. SALESPERSON employee-id ID last-nme first-name tit le birth-date hire-date notes CUSTOMER customer-id ID customer-code company-name contact-name contact-tit le city region postal-code country phone fax PRODUCT product-id ID product-name unit-price units- in-stock units-on-order discontinuted ORDER order-id ID order-date ship-via ship-city ship-region ship-postal-code ship-country ORDER-DETAIL unit-price quantity discount Q4. Spark Streaming (5 marks) DATASET: A file containing some transaction data in the resources folder for this assignment on Moodle. The transaction data includes daily retail records of a retailer in one year. Based on the following sample Scala code, implement a streaming query on the above dataset. The file source of this query is HDFS. Thus, after you download (and unzip) the files, you need to upload the files to HDFS. import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.Trigger // for an older version of Spark, also include: // import org.apache.spark.sql.streaming.ProcessingTime val retail_data = ...
val staticDataFrame = spark.read.format("csv") .option("header", "true") .option("inferSchema", "true") .load(retail_data) val staticSchema = staticDataFrame.schema staticDataFrame.printSchema() // root // |-- InvoiceNo: string (nullable = true) // |-- StockCode: string (nullable = true) // |-- Description: string (nullable = true) // |-- Quantity: integer (nullable = true) // |-- InvoiceDate: timestamp (nullable = true) // |-- UnitPrice: double (nullable = true) // |-- CustomerID: double (nullable = true) // |-- Country: string (nullable = true) spark.conf.set("spark.sql.shuffle.partitions", 2) val streamingDataFrame = spark.readStream .schema(staticSchema) .option("maxFilesPerTrigger", 10) .format("csv") .option("header", "true") .load(retail_data) streamingDataFrame.isStreaming //true if streaming val purchaseQuery = streamingDataFrame /* */ val query = purchaseQuery .writeStream .format("console") .queryName("customer_purchases") .outputMode("complete") .trigger(Trigger.ProcessingTime("5 seconds")) // for an older version of Spark, use: // .trigger(ProcessingTime("5 seconds")) .start() The streaming query performs the following operations: • It filters out data with a missing value (if any) in the InvoiceNo, UnitPrice, Quantity and CustomerID columns (i.e., rows with a null value in those columns are removed). • It returns the average purchasing value (= UnitPrice × Quantity / (InvoiceNo)) per customer, which is sorted in a descending order of the average purchasing value. (Note: The return will be updated in the streaming query.) Based on the provided sample code, implement a Scala script to complete the above operations. Execute the script in Spark shell by using :paste command. Report the first 20 rows of the first four return batches. Deliverable. A file solution4.pdf which includes your Scala code, commands and the output (namely the return batches) End of specification 欢迎咨询51作业君