CMSC 491 Hadoop-Based Distributed Compu8ng Spring 2016 .

Transcription

CMSC 491Hadoop-Based Distributed Compu8ngSpring 2016Adam Shook

Objec8ves Explain why and how Hadoop has become thefounda8on for virtually all modern dataarchitectures Explain the architecture and design of HDFSand MapReduce

Agenda! What is it?Why should I care?Example UsageHadoop Core– HDFS– MapReduce Ecosystem, maybe

WHAT IS HADOOP?

You tell me

Distributed Systems Collec&on of independent computers thatappears to its users as a single coherent system* Take a bunch of Ethernet cable, wire togethermachines with a heUy amount of CPU, RAM, andstorage Figure out how to build soUware to parallelizestorage/computa8on/etc. across our cluster– While tolera8ng failures and all kinds of other garbage “The hard part”*Andrew S. Tanenbaum

Proper8es of Distributed Systems ReliabilityAvailabilityScalabilityEfficiency

Reliability Can the system deliver services in face ofseveral component failures?

Availability How much latency is imposed on the systemwhen a failure occurs?# of NinesAvailability %Per yearPer monthPer week190%36.5 days72 hours16.8 hours299%3.65 days7.20 hours1.68 hours399.9%8.76 hours43.8 minutes10.1 minutes499.99%52.56 minutes4.38 minutes1.01 minutes599.999%5.26 minutes25.9 seconds6.05 seconds699.9999%31.5 seconds2.59 seconds604.8 ms799.99999%3.15 seconds262.97 ms60.48 ms899.999999%315.569 ms26.297 ms6.048 ms999.9999999%31.5569 ms2.6297 ms0.6048 ms

Scalability Can the system scale to support a growingnumber of tasks?

Efficiency How efficient is the system, in terms oflatency and throughput?

WHY SHOULD I CARE?

The Short Answer is

The V’s of Big Data! VolumeVelocityVarietyVeracityValue– New and Improved!

Data Sources Social MediaWeb LogsVideo NetworksSensorsTransac8ons (banking, etc.)E-mail, Text MessagingPaper Documents

Value in all of this! Fraud Detec8onPredic8ve ModelsRecommenda8onsAnalyzing ThreatsMarket AnalysisOthers!

How do we extract value? Monolithic Compu8ng! Build bigger faster computers! Limited solu8on– Expensive and does not scale as data volumeincreases

Enter Distributed Compu8ng Processing is distributed across manycomputers Distribute the workload to powerful computenodes with some separate storage

New Class of Challenges Does not scale gracefully Hardware failure is common Sor8ng, combining, and analyzing data spreadacross thousands of machines is not easy

RDBMS is s8ll alive! SQL is s8ll very relevant, with many complexbusiness rules mapping to a rela8onal model But there is much more than can be done byleaving rela8onal behind and looking at all ofyour data NoSQL can expand what you have, but it hasits disadvantages

NoSQL Downsides Increased middle-8er complexityConstraints on query capabilityNo standard seman8cs for queryComplex to setup and maintain

An Ideal Cluster Linear horizontal scalabilityAnaly8cs run in isola8onSimple API with mul8ple language supportAvailable in spite of hardware failure

Enter Hadoop Hits these major requirements Two core pieces– Distributed File System (HDFS)– Flexible analy8c framework (MapReduce) Many ecosystem components to expand onwhat core Hadoop offers

Scalability Near-linear horizontal scalability Clusters are built on commodity hardware Component failure is an expecta8on

Data Access Moving data from storage to a processor isexpensive Store data and process the data on the samemachines Process data intelligently by being local

Disk Performance Disk technology has made significantadvancements Take advantage of mul8ple disks in parallel– 1 disk, 3TB of data, 300MB/s, 2.5 hours to read– 1,000 disks, same data, 10 seconds to read Distribu8on of data and co-loca8on ofprocessing makes this a reality

Complex Processing Code Hadoop framework abstracts complexdistributed compu8ng environment– No synchroniza8on code– No networking code– No I/O code MapReduce developer focuses on the analysis– Job runs the same on one node or 4,000 nodes!

Fault Tolerance Failure is inevitable, and it is planned for Component failure is automa8cally detectedand seamlessly handled System con8nues to operate as expected withminimal degrada8on

Hadoop History Spun off of Nutch, an open source web searchengine Google Whitepapers– GFS– MapReduce Nutch re-architected to birth Hadoop Hadoop is very mainstream today

Hadoop Versions Hadoop 2.7.1 is the latest release Two Java APIs, referred to as the old and newAPIs Two task management systems– MapReduce v1 JobTracker, TaskTracker– MapReduce v2 – A YARN applica8on MR runs in containers in the YARN framework

Common Use Cases Log ProcessingImage Iden8fica8onExtract Transform Load (ETL)Recommenda8on EnginesTime-Series Storage and ProcessingBuilding Search IndexesLong-Term ArchiveAudit Logging

Non-Use Cases Data processing handled by one large server ACID Transac8ons

LinkedIn Architecture

LinkedIn Applica8ons

LinkedIn Applica8ons

LinkedIn Applica8ons

HDFS

Hadoop Distributed File System Inspired by Google File System (GFS)High performance file system for storing dataRela8vely simple centralized managementFault tolerance through data replica8onOp8mized for MapReduce processing– Exposing data locality Linearly scalable Wripen in Java APIs in all the useful languages

Hadoop Distributed File System Use of commodity hardwareFiles are write once, read manyLeverages large streaming reads vs randomFavors high throughput vs low latencyModest number of huge files

HDFS Architecture Split large files into blocks Distribute and replicate blocks to nodes Two key services– Master NameNode– Many DataNodes Backup/Checkpoint NameNode for HA User interacts with a UNIX file system

NameNode Single master service for HDFS Was a single point of failure Stores file to block to loca8on mappings in anamespace All transac8ons are logged to disk Can recover based on checkpoints of thenamespace and transac8on logs

NameNode Memory HDFS prefers fewer larger files as a result of themetadata Consider 1GB of data with 64MB block size– Stored as one file Name: 1 item Blocks: 16*3 48 items Total items: 49– Stored as 1024 1MB files Names: 1024 items Blocks: 1024*3 3072 items Total items: 4096 Each item is around 200 bytes

Checkpoint Node (Secondary NN) Performs checkpoints of the namespace andlogs Not a hot backup! HDFS 2.0 introduced NameNode HA– Ac8ve and a Standby NameNode servicecoordinated via ZooKeeper

DataNode Stores blocks on local diskSends frequent heartbeats to NameNodeSends block reports to NameNodeClients connect to DataNodes for I/O

How HDFS Works - Writes1Client contacts NameNode to write dataClientNameNode23Client sequen8allywrites blocks to DataNodeA1DataNode AA2NameNode says write it to these nodesA3DataNode BA4DataNode CDataNode D

How HDFS Works - WritesClientDataNodes replicate datablocks, orchestratedby the NameNodeA1NameNodeA2A2A1A4A3DataNode ADataNode BA3A2A4DataNode CA4A1A3DataNode D

How HDFS Works - ReadsClient contacts NameNode to read data1ClientNameNode2NameNode says you can find it here3Client sequen8allyreads blocks from DataNodeA1A2A2A1A4A3DataNode ADataNode BA3A2A4DataNode CA4A1A3DataNode D

How HDFS Works - FailureClientNameNodeClient connects to anothernode serving that blockA1A2A2A1A4A3DataNode ADataNode BA3A2A4DataNode CA4A1A3DataNode D

HDFS Blocks Default block size was 64MB, now 128 MB– Configurable– Larger sizes are common, say 256 MB or 512 MB Default replica8on is three– Also configurable, not rarely changed Stored as files on the DataNode’s local fs– Cannot associate any block with its true file

Replica8on Strategy First copy is wripen to the same node as theclient– If the client is not part of the cluster, first blockgoes to a random node Second copy is wripen to a node on adifferent rack Third copy is wripen to a different node onthe same rack as the second copy

Data Locality Key in achieving performance MapReduce tasks run as close to data aspossible Some terms– Local– On-Rack– Off-Rack

Data Corrup8on Use of checksums to to ensure block integrity Checksum is calculatd on read and comparedagainst that when it was wripen– Fast to calculate and space-efficient If the checksums differ, client reads the blockfrom another DataNode– Corrupted block will be deleted and replicated by anon-corrupt block DataNode periodically runs a background threadto do this checksum process– For those files that aren’t read very oUen

Fault Tolerance If no heartbeat is received from DN within aconfigurable 8me window, it is considered lost– 10 minute default NameNode will:– Determine which blocks were on the lost node– Locate other DataNodes with valid copies– Instruct DataNodes with copies to replicate blocksto other DataNodes

Interac8ng with HDFS Primary interfaces are CLI and Java APIWeb UI for read-only accessWebHDFS provides RESTful read/writeHppFS also existssnakebite is a Python library from Spo8fyFUSE, allows HDFS to be mounted on standardfile system– Typically used so legacy apps can use HDFS data ‘hdfs dfs -help’ will show CLI usage

HADOOP MAPREDUCE

MapReduce! A programming model for processing data Contains two phases! Map– Perform a map func8on on key/value pairs Reduce– Perform a reduce func8on on key/value groups Groups are created by sor8ng map output Opera8ons on key/value pairs open the door forvery parallelizable algorithms

Hadoop MapReduce Automa8c paralleliza8on and distribu8on of tasks Framework handles scheduling task andrepea8ng failed tasks Developer can code many pieces of the puzzle Framework handles a Shuffle and Sort phasebetween map and reduce tasks. Developer need only focus on the task at hand,rather than how to manage where data comesfrom and where it goes

MRv1 and MRv2 Both manage compute resources, jobs, andtasks Job API the same MRv1 is proven in produc8on– JobTracker / TaskTrackers MRv2 is a new applica8on on YARN– YARN is a generic playorm for developingdistributed applica8ons

MRv1 - JobTrackerMonitors job and task progressIssues task a?empts to TaskTrackersRe-tries failed task apemptsFour failed apempts of same task one failedjob Default scheduler is FIFO order – CapacityScheduler or FairScheduler is preferred Single point of failure for MapReduce

MRv1 – TaskTrackers Runs on same node as DataNodesSends heartbeats and task reports to JTConfigurable number of map and reduce slotsRuns map and reduce task a?empts in aseparate JVM

How MapReduce WorksClient submits job to JobTracker1ClientJobTracker4JobTracker submitstasks to TaskTrackersJobTracker reports metrics2A1A2A4A2A1A3A3A2A4A4A1A3DataNode ADataNode BDataNode CDataNode DTaskTracker ATaskTracker BTaskTracker CTaskTracker DB1B3B4B2B3Job output is wripen toDataNodes w/replica8onB3B13B2B4B4B1B2

How MapReduce Works - FailureClientJobTrackerJobTracker assigns task to different nodeA1A2A4A2A1A3A3A2A4A4A1A3DataNode ADataNode BDataNode CDataNode DTaskTracker ATaskTracker BTaskTracker CTaskTracker DB1B3B4B2B3B1B3B2B4B4B1B2

Map Input(0, "hadoop is fun")(52, "I love hadoop")(104, "Pig is more fun")Map Task 0Map Task 1Map Task 2("hadoop", 1)("I", 1)("Pig", 1)("is", 1)("love", 1)("is", 1)("fun", 1)("hadoop", 1)("more", 1)Map Output("fun", 1)SHUFFLE AND SORT("fun", {1,1})Reducer Input Groups("hadoop", {1,1})("love", {1})("I", {1})Reduce Task 0("is", {1,1})("more", {1})("Pig", {1})Reduce Task 1("fun", 2)Reducer Output("hadoop", 2)("love", 1)("I", 1)("is", 2)("more", 1)("Pig", 1)

Example -- Word Count Count the number of map(byte offset, line)foreach word in line8mes each word isemit(word, 1)used in a body oftextreduce(word, counts) Usessum 0TextInputFormat andforeach count in countsTextOutputFormatsum countemit(word, sum)

Mapper Codepublic class WordMapperextends Mapper LongWritable, Text, Text, IntWritable {private final static IntWritable ONE new IntWritable(1);private Text word new Text();public void map(LongWritable key, Text value, Context context) {String line value.toString();StringTokenizer tokenizer new StringTokenizer(line);while (tokenizer.hasMoreTokens()) d, ONE);}}}

Shuffle and SortMapper 0P0P0P1P2P0P0Mapper 1P3P0P0P1P1P2P1P1Mapper 2P3P1P0P2P1P2P2P2Mapper 3P3P2P0P3P1P2P3P3P0P1P2P3Reducer 0Reducer 1Reducer 2Reducer 3P3Mapper outputsto a single1par88oned file2Reducers copytheir parts3Reducermergespar88onsP3

Reducer Codepublic class IntSumReducerextends Reducer Text, IntWritable, Text, IntWritable {public void reduce(Text key, Iterable IntWritable values,Context context) {int sum 0;for (IntWritable val : values) {sum val.get();}context.write(key, new IntWritable(sum));}}

Resources, Wrap-up, etc. hpp://hadoop.apache.org Suppor8ve community– Hadoop-DC– Data Science MD– Bal8more Hadoop Users Group Plenty of resources available to learn more– Books– Email lists– Blogs

That Ecosystem I Men8oned seGreenplum zaTajoHAWQ ParquetMahoutOozieStormZooKeeperSparkCassandraKa aCrunchAzkabanKnox ixTezRanger

References Hadoop: The Defini8ve Guide, Chapter 16.2 hpp://www.slideshare.net/s shah/the-bigdata-ecosystem-at-linkedin-23512853 s/2013/01/hadoop ecosystem full2.png Google Images

Hadoop History Spun off of Nutch, an open source web search engine Google Whitepapers – GFS – MapReduce Nutch re-architected to birth Hadoop Hadoop is