HDFS: Hadoop Distributed File System

Transcription

HDFS:Hadoop DistributedFile SystemCIS 612Sunnie Chung

Introduction What is Big Data?– Bulk Amount– Unstructured Lots of Applications which need to handlehuge amount of data (in terms of 500 TB perday) If a regular machine need to transmit 1TB ofdata through 4 channels : 43 Minutes. What if 500 TB ?SS Chung CIS 612 Lecture Notes2

What is Hadoop? Framework for large-scale data processing Inspired by Google’s Architecture:– Google File System (GFS) and MapReduce Open-source Apache project– Nutch search engine project– Apache Incubator Written in Java and shell scriptsSS Chung CIS 612 Lecture Notes3

Hadoop Distributed File System (HDFS) Storage unit of Hadoop Relies on principles of Distributed File System. HDFS have a Master-Slave architecture Main Components:– Name Node : Master– Data Node : Slave 3 replicas for each block Default Block Size : 128MBSS Chung CIS 612 Lecture Notes4

HHadoop Distributed File System (HDFS) Hadoop Distributed File System (HDFS)– Runs entirely in userspace– The file system is dynamically distributed across multiplecomputers– Allows for nodes to be added or removed easily– Highly scalable in a horizontal fashion Hadoop Development Platform– Uses a MapReduce model for working with data– Users can program in Java, C , and other languagesSS Chung CIS 612 Lecture Notes5

Why should I use Hadoop? Fault-tolerant hardware is expensive Hadoop designed to run on commodity hardware Automatically handles data replication and deals withnode failure Does all the hard work so you can focus on processingdataSS Chung CIS 612 Lecture Notes6

HDFS: Key Features Highly Fault Tolerant:Automatic Failure Recovery System High aggregate throughput for streaming large files Supports replication and locality features Designed to work with systems with vary large file(files with size in TB) and few in number. Provides streaming access to file system data. It isspecifically good for write once read many kind offiles (for example Log files).SS Chung CIS 612 Lecture Notes7

Hadoop Distributed File System (HDFS) Can be built out of commodity hardware. HDFSdoesn't need highly expensive storage devices– Uses off the shelf hardware Rapid Elasticity– Need more capacity, just assign some more nodes– Scalable– Can add or remove nodes with little effort orreconfiguration Resistant to Failure Individual node failure does not disrupt thesystemSS Chung CIS 612 Lecture Notes8

Who uses Hadoop?SS Chung CIS 612 Lecture Notes9

What features does Hadoop offer? API and implementation for working withMapReduce Infrastructure––––Job configuration and efficient schedulingWeb-based monitoring of cluster statsHandles failures in computation and data nodesDistributed File System optimized for huge amounts ofdataSS Chung CIS 612 Lecture Notes10

When should you choose Hadoop? Need to process a lot of unstructured data Processing needs are easily run in parallel Batch jobs are acceptable Access to lots of cheap commodity machinesSS Chung CIS 612 Lecture Notes11

When should you avoid Hadoop? Intense calculations with little or no data Processing cannot easily run in parallel Data is not self-contained Need interactive resultsSS Chung CIS 612 Lecture Notes12

Hadoop Examples Hadoop would be a good choice for:–––––Indexing log filesSorting vast amounts of dataImage analysisSearch engine optimizationAnalytics Hadoop would be a poor choice for:– Calculating Pi to 1,000,000 digits– Calculating Fibonacci sequences– A general RDBMS replacementSS Chung CIS 612 Lecture Notes13

Hadoop Distributed File System (HDFS) How does Hadoop work?– Runs on top of multiple commodity systems– A Hadoop cluster is composed of nodes One Master Node Many Slave Nodes– Multiple nodes are used for storing data & processingdata– System abstracts the underlying hardware tousers/softwareSS Chung CIS 612 Lecture Notes14

How HDFS works: Split Data Data copied into HDFS is split into blocks Typical HDFS block size is 128 MB– (VS 4 KB on UNIX File Systems)SS Chung CIS 612 Lecture Notes15

How HDFS works: Replication Each block is replicated to multiple machines This allows for node failure without data lossBlock#1DataNode 1Block#2Block#2DataNode 2Block#3SS Chung CIS 612 Lecture NotesBlock#1DataNode 3Block#316

HDFS Architecture

Hadoop Distributed File System (HDFS)p: HDFS Consists of data blocks HDFSHDFS is a multi-node system– Files are divided into data Name Node (Master) Single point of failure Data Node (Slave) Failure tolerant (Datareplication)blocks– Default size if 64MB– Default replication of blocks is 3– Blocks are spread out over DataNodesSS Chung CIS 612 Lecture Notes18

Hadoop Architecture OverviewClientJob TrackerTask TrackerDataNodeTask TrackerNameNodeDataNodeDataNodeSS Chung CIS 612 Lecture NotesDataNode19

Hadoop Components: Job TrackerClientJob TrackerTask TrackerDataNodeTask TrackerName NodeDataNodeDataNodeDataNode Only one Job Tracker per cluster Receives job requests submitted by client Schedules and monitorsjobsonNotestask trackersSS Chung CIS612 Lecture20

Hadoop Components: Name Name NodeDataNodeDataNodeDataNode One active Name Node per cluster Manages the file system namespace and metadata Single point of failure: Good place to spend money on hardwareSS Chung CIS 612 Lecture Notes21

Name Node Master of HDFSMaintains and Manages data on Data NodesHigh reliability Machine (can be even RAID)Expensive HardwareStores NO data; Just holds Metadata!Secondary Name Node:– Reads from RAM of Name Node and stores it to harddisks periodically. Active & Passive Name Nodes from Gen2 HadoopSS Chung CIS 612 Lecture Notes22

Hadoop Components: Task kerName NodeDataNodeDataNodeDataNode There are typically a lot of task trackers Responsible for executing operations Reads blocks of data from data nodesSS Chung CIS 612 Lecture Notes23

Hadoop Components: Data Name NodeDataNodeDataNodeDataNode There are typically a lot of data nodes Data nodes manage data blocks and serve them to clients Data is replicated so failure is not a problemSS Chung CIS 612 Lecture Notes24

Data Nodes Slaves in HDFSProvides Data StorageDeployed on independent machinesResponsible for serving Read/Write requests fromClient. The data processing is done on Data Nodes.SS Chung CIS 612 Lecture Notes25

HDFS ArchitectureSS Chung CIS 612 Lecture Notes26

Hadoop Modes of OperationHadoop supports three modes of operation: Standalone Pseudo-Distributed Fully-DistributedSS Chung CIS 612 Lecture Notes27

HDFS OperationSS Chung CIS 612 Lecture Notes28

HDFS Operation Client makes a Write request to Name Node Name Node responds with the information abouton available data nodes and where data to bewritten. Client write the data to the addressed Data Node. Replicas for all blocks are automatically createdby Data Pipeline. If Write fails, Data Node will notify the Clientand get new location to write. If Write Completed Successfully,Acknowledgement is given to Client Non-Posted Write by HadoopSS Chung CIS 612 Lecture Notes29

HDFS: File WriteSS Chung CIS 612 Lecture Notes30

HDFS: File ReadSS Chung CIS 612 Lecture Notes31

HadoopHadoop:DevelopmentPlatformHadoopStack– User written code runs on system– System appears to user as a single entity– User does not need to worry aboutdistributed system– Many system can run on top of Hadoop Allows further abstraction from systemSS Chung CIS 612 Lecture Notes32

Hive and Hadoop:HBase are layersof HadoopHive on& topHBaseHBaseZooKeeper HBase & Hive are applications Provide an interface to data on the HDFS Other programs or applications may use Hive orHBase as an intermediate layerSS Chung CIS 612 Lecture Notes33

Hadoop: Hive Hive–––––Data warehousing applicationSQL like commands (HiveQL)Not a traditional relational databaseScales horizontally with easeSupports massive amounts of data** Facebook has more than 15PB of information stored in it and imports 60TB each day (as of 2010)SS Chung CIS 612 Lecture Notes34

Hadoop: HBase HBase– No SQL Like language Uses custom Java API for working with data– Modeled after Google’s BigTable– Random read/write operations allowed– Multiple concurrent read/write operations allowedSS Chung CIS 612 Lecture Notes35

Hadoop MapReduceHadoop has it’s own implementation of MapReduceHadoop 1.0.4API: http://hadoop.apache.org/docs/r1.0.4/api/Tutorial: http://hadoop.apache.org/docs/r1.0.4/mapred tutorial.htmlCustom SerializationData TypesWritable/ComparableText vs StringLongWritable vs longIntWritable vs intDoubleWritable vs doubleSS Chung CIS 612 Lecture Notes36

Structure of a Hadoop Mapper (WordCount)SS Chung CIS 612 Lecture Notes37

Structure of a Hadoop Reducer (WordCount)SS Chung CIS 612 Lecture Notes38

Hadoop MapReduce Working with the Hadoop http://hadoop.apache.org/docs/r1.0.4/commands manual.html A quick overview of Hadoop commands bin/start-all.sh bin/stop-all.sh bin/hadoop fs –put localSourcePath hdfsDestinationPath bin/hadoop fs –get hdfsSourcePath localDestinationPath bin/hadoop fs –rmr folderToDelete bin/hadoop job –kill job id Running a Hadoop MR Program bin/hadoop jar jarFileName.jar programToRun parm1 parm2 SS Chung CIS 612 Lecture Notes39

Useful Application Sites[1] http://wiki.apache.org/hadoop/EclipsePlugIn[2] 10gen. Mongodb. http://www.mongodb.org/[3] Apache. Cassandra. http://cassandra.apache.org/[4] Apache. Hadoop. http://hadoop.apache.org/[5] Apache. Hbase. http://hbase.apache.org/[6] Apache, Hive. http://hive.apache.org/[7] Apache, Pig. http://pig.apache.org/[8] Zoo Keeper, http://zookeeper.apache.org/SS Chung CIS 612 Lecture Notes40

How MapReduce Works in Hadoop

Lifecycle of a MapReduce JobMap functionReduce functionRun this program as aMapReduce job

Lifecycle of a MapReduce JobMap functionReduce functionRun this program as aMapReduce job

Lifecycle of a MapReduce JobTimeInputSplitsMapWave 1MapWave 2ReduceWave 1ReduceWave 2

Hadoop MR Job Interface:Input Format The Hadoop MapReduce framework spawnsone map task for each InputSplit InputSplit: Input File is Split to Input Splits (Logicalsplits (usually 1 block), not Physically split chunks)Input Format::getInputSplits() The number of maps is usually driven by the totalnumber of blocks (InputSplits) of the input files.1 block size 128 MB,10 TB file configured with 82000 maps

Hadoop MR Job Interface:map() The framework then callsmap(WritableComparable, Writable, OutputCollector,Reporter) for each key/value pair (line num, line string) in the InputSplit for that task. Output pairs are collected with calls ble).

Hadoop MR Job Interface:combiner() Optional combiner, viaJobConf.setCombinerClass(Class) to perform local aggregation of the intermediateoutputs of mapper

Hadoop MR Job Interface:Partitioner() Partitioner controls the partitioning of the keys of theintermediate map-outputs. The key (or a subset of the key) is used to derive thepartition, typically by a hash function. The total number of partitions is the same as thenumber of reducers HashPartitioner is the default Partitioner of reducetasks for the job

Hadoop MR Job Interface:reducer() Reducer has 3 primary phases:1. Shuffle:2. Sort3. Reduce

Hadoop MR Job Interface:reducer() ShuffleInput to the Reducer is the sorted output of the mappers.In this phase the framework fetches the relevantpartition of the output of all the mappers, via HTTP. SortThe framework groups Reducer inputs by keys (sincedifferent mappers may have output the same key) in thisstage. The shuffle and sort phases occur simultaneously;while map-outputs are being fetched they are merged.

Hadoop MR Job Interface:reducer() ReduceThe framework then callsreduce(WritableComparable, Iterator, OutputCollector, Reporter)method for each key, (list of values) pair in thegrouped inputs. The output of the reduce task is typically written tothe FileSystem viaOutputCollector.collect(WritableComparable, Writable).

MR Job Parameters Map Parametersio.sort.mb Shuffle/Reduce ldmapred.job.shuffle.merge.percent

Components in a Hadoop MR WorkflowNext few slides are from: -solving-with-apache-hadoop-pig

Job Submission

Initialization

Scheduling

Execution

Map Task

Hadoop MR Job Interface:Partitioner() Partitioner controls the partitioning of the keys of theintermediate map-outputs. The key (or a subset of the key) is used to derive thepartition, typically by a hash function. The total number of partitions is the same as thenumber of reducers HashPartitioner is the default Partitioner of reducetasks for the job

Partitioning at the end of Mapper The total number ofpartitions in each mapper’slocal hard disk is the sameas the number of reducers.So, B number of ort/Combine on Key(io.sort.mb)B main memory buffers(io.sort.mb)Hash table for partitionRi (k B-1 pages)h2h2Input bufferfor SiOutputbufferDisk

Map: Sort in io.sort.mb (in each Partition)& Spill to Disk in Partitions A record emitted from a map will be serialized into a buffer andmetadata will be stored into accounting buffers. When either the serialization buffer or the metadata exceed athreshold, the contents of the buffers will be sorted and writtento disk in the background while the map continues to outputrecords. If either buffer fills completely while the spill is in progress, themap thread will block. When the map is finished, any remaining records are written todisk and all on-disk segments are merged into a single file. Minimizing the number of spills to disk can decrease map time,but a larger buffer also decreases the memory available to themapper.

Sort Buffer

Reduce Tasks

Quick Overview of Other Topics Dealing with failures Hadoop Distributed FileSystem (HDFS) Optimizing a MapReduce job

Dealing with Failures and Slow Tasks What to do when a task fails?– Try again (retries possible because of idempotence)– Try again somewhere else– Report failure What about slow tasks: stragglers– Run another version of the same task in parallel. Takeresults from the one that finishes first– What are the pros and cons of this approach?Fault tolerance is ofhigh priority in theMapReduce framework

HDFS Architecture

Lifecycle of a MapReduce JobTimeInputSplitsMapWave 1MapWave 2ReduceWave 1ReduceWave 2How are the number of splits, number of map and reducetasks, memory allocation to tasks, etc., determined?

Job Configuration Parameters 190 parameters inHadoop Set manually or defaultsare used

Hadoop Job Configuration ParametersImage source: http://www.jaso.co.kr/265

Tuning Hadoop Job Conf. Parameters Do their settings impact performance? What are ways to set these parameters?– Defaults -- are they good enough?– Best practices -- the best setting can depend on data, job, andcluster properties– Automatic setting

Experimental Setting Hadoop cluster on 1 master 16 workers Each node:––––2GHz AMD processor, 1.8GB RAM, 30GB local diskRelatively ill-provisioned!Xen VM running Debian LinuxMax 4 concurrent maps & 2 reduces Maximum map wave size 16x4 64 Maximum reduce wave size 16x2 32 Not all users can run large Hadoop clusters:– Can Hadoop be made competitive in the 10-25 node, multi GBto TB data size range?

Parameters Varied in Experiments

Hadoop 50GB TeraSort Varying number of reduce tasks, number of concurrent sortedstreams for merging, and fraction of map-side sort bufferdevoted to metadata storage

Hadoop 50GB TeraSort Varying number of reduce tasks for different valuesof the fraction of map-side sort buffer devoted tometadata storage (with io.sort.factor 500)

Hadoop 50GB TeraSort Varying number of reduce tasks for different values ofio.sort.factor (io.sort.record.percent 0.05, default)

Hadoop 75GB TeraSort 1D projection forio.sort.factor 500

Automatic Optimization? (Not yet in Hadoop)MapMapWave 1 Wave 2MapWave 3ReduceWave 1ShuffleReduceWave 2What if#reducesincreasedto 9?MapMapWave 1 Wave 2MapWave 3Reduce Reduce ReduceWave 1 Wave 2 Wave 3

H Hadoop Distributed File System (HDFS) Hadoop Distributed File System (HDFS) – Runs entirely in userspace – The file system is dynamically distributed across multiple computers – Allows for nodes to be added or removed easily – Highly scalable in a horizontal fashion Hadoop Development Platform – Uses a MapReduce model f