6 Hadoop Scheduling Copy - GitHub Pages

Transcription

TI2736-BBig Data ProcessingClaudia Hauffti2736b-ewi@tudelft.nl

PatternHadoopCtd.GiraphZooKeeperSparkSpark

Learning objectives Exploit Hadoop’s Counters and setup/cleanupefficiently Explain how Hadoop addresses the problem ofjob scheduling Explain Hadoop’s shuffle & sort phase and usethat knowledge to improve your Hadoop code Implement strategies for efficient data input3

Hadoop ProgrammingRevisited: setup andcleanup4

Setup & cleanup One MAPPER object for each map task Associated with a sequence of key/value pairs (the “input split”) map() is called for each key/value pair by the executionframeworkOne REDUCER object for each reduce task Programmer “hints”the number of mappersto usereduce() is called once per intermediate keyProgrammer canset the number ofreducersMAPPER/REDUCER are Java objects - allows side effects Preserving state across multiple inputs Initialise additional resourcesEmit (intermediate) key/value pairs in one go 5

Setup useful for one-off operations: opening an SQL connection loading a dictionary etc.SetupWordCount* - count only valid dictionary terms1 public class MyMapper extends2Mapper Text, IntWritable, Text, IntWritable {34 private Set String dictionary;//all valid words56 public void setup(Context context) throws IOException {7dictionary Sets.newHashSet();8loadDictionary();//defined elsewhere, reads file from HDFS9 }1011 public void map(Text key, IntWritable val, Context context)12throws IOException, InterruptedException ;15context.write(key, new IntWritable(1));16 }17 }6

Setup useful for one-off operations: opening an SQL connection loading a dictionary etc.SetupWordCount* - count only valid dictionary terms1 public class MyMapper extends2Mapper Text, IntWritable, Text, IntWritable {34 private Set String dictionary;//all valid words56 public void setup(Context context) throws IOException {Called once in the life cycle7dictionary Sets.newHashSet();of a Mapper object: before8loadDictionary();//defined elsewhere, reads file from HDFSany calls to map()9 }1011 public void map(Text key, IntWritable val, Context context)Called once for each key/{12throws IOException, InterruptedExceptionvalue pair that appears in13if(!dictionary.contains(key.toString())the input split14return;15context.write(key, new IntWritable(1));16 }17 }7

single reducer settingCleanupWordCount** - how many wordsstart with the same letter?1 public class MyReducer extends2Reducer PairOfIntString, FloatWritable, NullWritable, Text {3 private Map Character, Integer cache;45 public void setup(Context context) throws IOException {6cache Maps.newHashMap();7 }8 public void reduce(PairOfIntString key, Iterable IntWritable 9values, Context context) throws10IOException, InterruptedException {11char c key.toString().charAt(0);12for(IntWritable iw : values){13//add iw to the current value of key c in cache14}15 }17 public void cleanup(Context context) throws IOException,18InterruptedException {19for (Character c : cache.keySet()) {20context.write(new Text(c), new IntWritable(cache.get(c));21}22 }23 }8

single reducer settingCleanupWordCount** - how many wordsstart with the same letter?1 public class MyReducer extends2Reducer PairOfIntString, FloatWritable, NullWritable, Text {3 private Map Character, Integer cache;4Called once in the life cycle of a5 public void setup(Context context) throwsIOException{Reducerobject: beforeany6cache Maps.newHashMap();calls to reduce()7 }8 public void reduce(PairOfIntString key, Iterable IntWritable 9values, Context context) throws10IOException, InterruptedException {Called once for each key that11char c key.toString().charAt(0);was assigned to the reducer12for(IntWritable iw : values){13//add iw to the current value of key c in cache14}15 }17 public void cleanup(Context context) throws IOException,18InterruptedException {Called once in the life cycle of a19for (Character c : cache.keySet()) {object: after all calls to20context.write(new Text(c), new ReducerIntWritable(cache.get(c));reduce()21}22 }23 }9

Hadoop ProgrammingRevisited: Counters10

Counter basics Gathering data about the data we are analysing, e.g. Number of key/value pairs processed in map Number of empty lines/invalid lines Wanted: Easy to collect Estimates are viewable during job execution (e.g. to stop aHadoop job early at too many invalid key/value pairs) Why not use log messages instead? Write to the error log when an invalid line occurs Hadoop’s logs are huge, you need to know where to look Aggregating stats from the logs requires another pass over it11

Counter basics Gathering data about the data we are analysing, e.g. Number of key/value pairs processed in map Number of empty lines/invalid lines Wanted: Easy to collect Viewable during job execution (stop Hadoop job early at toomany invalid key/value pairs) What about log messages? Write to the error log when an invalid line occurs Hadoop’s logs are huge, you need to know where to look Aggregating stats from the logs requires another pass over it12

Counter basics Counters: Hadoop’s way of aggregating statistics Counters count (increment) Built-in counters maintain metrics of the job MapReduce counters (e.g. #skipped records by allmaps) File system counters (e.g. #bytes read from HDFS)Job counters (e.g. #launched map tasks)You have already seen them13

Counter basics Counters: Hadoop’s way of aggregating statistics Counters count (increment) Built-in counters maintain metrics of the job MapReduce counters (e.g. #skipped records by allmaps) File system counters (e.g. #bytes read from HDFS)Job counters (e.g. #launched map tasks)You have already seen them14

Counter basics Counters: Hadoop’s way of aggregating statistics CountersMap-ReducecountFramework(increment)Map input records 5903Mapcountersoutput records 47102 Built-inmaintain metrics of the jobCombine input records 47102 MapReducecounters(e.g. #skipped records by allCombine outputrecords 8380Reduce output records 5934maps)File System Counters File system counters (e.g. #bytes read from HDFS)FILE: Number of bytes read 118124FILE:Numberof#launchedbytes written 1075029 Jobcounters(e.g.map tasks)HDFS: Number of bytes read 996209HDFS: Number of bytes written 59194 You have already seen them15

Built-in vs. user-defined Built-in counters: exist for each Hadoop job User-defined Counters are maintained by theapplication they are associated with Periodically sent to the Tasktracker and then the(pre-YARN setup)Jobtracker for global aggregation Aggregated per job by the ResourceManager (YARN)Counter values are only definite once the job has completed!Counters may go down if a task fails!16

Code exampleWordCount* - count words and chars123456789101112131415161718enum Records {several enum’s possible:WORDS, CHARS;};used to group counterspublic class WordCount {public static class MyMapper extendsMapper LongWritable, Text, Text, IntWritable {public void map(LongWritable key, Text value,Context context) throws IOException {String[] tokens value.toString().split(" ");for (String s : tokens) {context.write(new Text(s), new rement(s.length());}}}17user-defined counters appearautomatically in the final status output

Code exampleWordCount* - count words and chars123456789101112131415161718enum Records {WORDS, CHARS;several enum’sFrameworkpossible:Map-Reduce};used to group counterspublic class WordCount {Map input records 5903public static class MyMapperextendsrecords 47102Map outputMapper LongWritable, Text, Text, IntWritable {Combine input records 47102Combine key,outputpublic void map(LongWritableText records 8380value,ContextIOException {Reducecontext)output throwsrecords 5934String[] tokens value.toString().split(" "); for (String Recordss : tokens) {context.write(newText(s), new IntWritable(1));CHARS 1);WORDS .length());}}}18user-defined counters appearautomatically in the final status output

Code example II1 enum Records { MAP WORDS, REDUCE WORDS; };23 public class WordCount {4-- MAPPER5public void map(LongWritable key, Text value,6Context context)7throws IOException { 89String[] tokens value.toString().split("");Records10for (String s : tokens) {MAP WORDS 4710211context.write(new Text(s), new IntWritable(1));12context.getCounter(Records.MAP WORDS).increment(1);REDUCE WORDS 4710213}14}15-- REDUCER (Combiner is a copy of the Reducer)16public void reduce(Text key, Iterator IntWritable values,17Context context) throws IOException {18int sum 0;19while (values.hasNext())20sum EDUCE WORDS).increment(sum);22}23 }19

Job Scheduling20

Last time GFS/HDFSdistributed file system: file systems thatmanage the storage across a network ofmachines.21Image source: .google.com/en//archive/gfs-sosp2003.pdf

What about the jobs? Hadoop job: unit of work to be performed Input dataMapReduce programConfiguration informationHadoop divides input data into fixed size input splits One map task per splitOne map function call for each record in the splitSplits are processed in parallel (if enoughDataNodes exist)22

“MapReduce1JobTracker and TaskTracker23Image source: http://lintool.github.io/MapReduceAlgorithms/”

“MapReduce1Hadoop in practice: Yahoo!(2010) ”40 nodes/rack sharing one IP switch16GB RAM per cluster node, 1-gigabit Ethernet70% of disk space allocated to HDFS Remainder: operating system, data emitted byMappers (not in HDFS)NameNode: up to 64GB RAMTotal storage: 9.8PB - 3.3PB net storage (replication: 3)60 million files, 63 million blocks54,000 blocks hosted per DataNode1-2 nodes lost per dayTime for cluster to re-replicateHDFS cluster with 3,500 nodeslost blocks: 2 minutes24

YARN (MapReduce 2) JobTracker/TaskTrackers setup becomes a bottleneckin clusters with thousands of nodes As answer YARN has been developed (Yet AnotherResource Negotiator) YARN splits the JobTracker’s tasks (job scheduling andtask progress monitoring) into two daemons: Resource manager (RM) Application master (negotiates with RM for clusterresources; each Hadoop job has a dedicated master)25

Job scheduling Thousands of tasks may make up one job Number of tasks can exceed number of tasks that canrun concurrently Scheduler maintains task queue and tracks progressof running tasks Waiting tasks are assigned nodes as they becomeavailable “Move code to data” Scheduler starts tasks on node that holds a particularblock of data needed by the task if possible26

Job schedulingFIFO schedulerPriority schedulerFair schedulerCapacity scheduler27

Basic schedulers Early on: FIFO scheduler Job occupies the whole cluster while the restwaits Not feasible in larger clusters Improvement: different job priorities VERY HIGH,HIGH, NORMAL, LOW, or VERY LOW Next job is the one with the highest priorityNo pre-emption: if a low priority job is occupyingthe cluster, the high priority job still has to wait28

Fair Scheduler I Goal: every user receives a fair share of the clustercapacity over time If a single job runs, it uses the entire cluster As more jobs are submitted, free task slots are givenaway such that each user receives a “fair share” Short jobs complete in reasonable time, long jobskeep progressingA user who submits more jobs than a second user willnot get more cluster resources on average29

Fair Scheduler II Jobs are placed in pools, default: one pool peruser Pre-emption: if a pool has not received its fairshare for a period of time, the scheduler will killtasks in pools running over capacity to give moreslots to the pool running under capacity Task kill ! Job kill Scheduler needs to keep track of all users,resources used30

Capacity Scheduler Cluster is made up of a number of queues (similarto the Fair Scheduler pools) Each queue has an allocated capacity Within each queue, jobs are scheduled using FIFOwith priorities Idea: users (defined using queues) simulate aseparate MapReduce cluster with FIFOscheduling for each user31

Speculative execution Map phase is only as fast as slowest MAPPER Reduce phase is only as fast as slowest REDUCER Hadoop job is sensitive to stragglers (tasks that take unusuallylong to complete) Idea: identical copy of task executed on a second node; theoutput of whichever node finishes first is used (improvementsup to 40%) Can be done for both MAPPER/REDUCER Strategy does not help if straggler due to skewed datadistribution32

Shuffle & Sort33

Shuffle & sort phase Hadoop guarantee: the input to every reducer issorted by key Shuffle: sorting of intermediate key/value pairs andtransferring them to the reducers (as input) “Shuffle is the heart of MapReduce” Understanding shuffle & sort is vital to recognise jobbottlenecks Disclaimer: constantly evolving (again)34

reduce tasksA high-level viewMAP TASKin-memorybuffermerge (disk)map()inputsplitpartition,sort, andspill to diskcopy acrossthe networkreduce()map tasksREDUCE TASK35sort phase

Map sideMAP TASKin-memorybuffermerge (disk)map()inputsplitpartition,sort, andspill to disk Map task writes output to memory buffer Once the buffer is full, a background thread spills the content to disk (spill file) Data is partitioned corresponding to reducers they will be send to Within partition, in-memory sort by key [combiner runs on the output of sort] After last map() call, the spill files are merged [combiner may run again]36sort phase

Reduce side Reducer requires the map output for its partition from all map tasks of the clusterReducer starts copying data as soon as a map task completes (“copy phase”)Direct copy to reducer’s memory if the output is small, otherwise copy to diskIn-memory buffer is merged and spilled to disk once it grows too largeCombiner may run againOnce all intermediate keys are copied the “sort phase” begins: merge of mapoutputs, maintaining their sort orderingREDUCER output writtento HDFS (first blockreplica to local disk)copy acrossthe networkreduce()map tasksREDUCE TASK37sort phase

A few more detailsMAP TASKreduce tasksin-memorymergeGeneral rule for memoryusage: (disk)buffer map/reduce/shufflemap()inputWhathappens to the data written tosplitlocal disk by the Mapper?Deleted after successful completion of thejob.reduce()REDUCE TASK38Shuffle should get as much memory as possible;write map/reduce with low memory usage (singlespill would bepartition,best)sort, andspill to diskcopy acrossthe networkHow does the Reducer know where to getthe data from?- Successful map task informs task tracker whichinforms the job tracker (via heartbeat)map tasks- Reducer periodically queries the job tracker formap output hosts until it has retrieved all of datasort phase

Sort phase recap Involves all nodes that executed map tasks and willexecute reduce tasks Job with m mappers and r reducers involves up tom*r distinct copy operations Reducers can only start calling reduce() after allmappers are finished Key/value guarantee: one key has all values“attached” Copying can start earlier for intermediate keys39

Data input40

Input splits and logicalbounds One MAPPER object for each map task Associated with a sequence of key/value pairs(the “input split”) map() is called for each key/value pair by theexecution frameworkInput splitrecord(part of) a text fileline of text(range of) database table rowsa single row(part of) an XML fileXML element(part of) a video streamkeyframe41

Input split1 public abstract class InputSplit {23public abstract long getLength() throws4IOException, InterruptedException;56public abstract String[] getLocations() throws7IOException, InterruptedException;8 } Scenario: There are less free map slots than input splits.Split object does not contain the input data, just areferenceGivento thethedataQuestions:input splits and their sizes,what are possible strategies of how to pick the next input splitprocess locationsby a map task? toStorageare used by the executionGiven3 free mapslots andsizesframeworkto placemap7 inputtaskssplitscloseoftothe data{10, 20, 30, 100, 200, 300, 400}, which strategy works best?42

Input splits and logicalbounds3 free map slots and 7 input splits of sizes {10, 20, 30, 100, 200, 300, 400}Random selection: worst case 530 scenario1001020400Shortest input300 split first200200 510101004003002020030300Longest input split first 400400greedy approximation ofoptimal approach; strategyused by Hadoop43300201020010030

Input splits vs. HDFS blocksan input split ends at a logicalrecord; it can cross the blockboundary; requires additionalremote read of the missing datasplitfile lines123split45678split9101112the data between two blockboundaries has a fixed size &resides on a single DataNodeblock boundaryblock boundaryTextInputFormat44block boundary blockboundary

Input splits vs. HDFS blocksan input split ends at a logicalrecord; it can cross the blockboundary; requires additionalremote read of the missing datasplitfile lines123split456the data between two blockboundaries has a fixed size &resides on a single DataNodeblock boundaryblock boundary78What about:split wikipedia 9101112

/page . . . .10GB later. . . .

/page blockboundary block /wikipedia boundaryTextInputFormat45

HDFS: Compressionand Small Files46

Splittable compressionspace/time tradeoff: faster (de)compression means less space savings47Source: Tom White’s Hadoop: The Definite Guide

Splittable compressionmiddle groundbetter compressionoptimizedfor speed, lesseffectivecompressionspace/time tradeoff: faster (de)compression means less space savings48Source: Tom White’s Hadoop: The Definite Guide

Splittable is an importantattribute 1GB uncompressed file Stored within 16 blocks on HDFS (block size 64MB)Hadoop job creates 16 input splits, eachprocessed by one map task1GB gzip-compressed file Stored within 16 blocks on HDFS Hadoop job cannot create 16 input splits (readingat an arbitrary point does not work)A single map task will process the 16 HDFS blocks49

Hadoop Archives Storing a large number of small files is inefficient But: not all files can be easily converted to blocks(e.g. millions of images) Files and blocks occupy namespace which is limited bythe physical memory in the NameNode Small files take up large portion of namespace but notthe disk space Rule of thumb: 150 bytes per file/directory/block(1 million files of one block each: 300MB of memory) Hadoop Archive (*.har) is a solution50small substantially lessthan the block size(64MB/128MB)

A Web special: WARC Web ARCHive format: aggregates digitalresources in an archive and keeps track of relatedinformation Per resource: text header and arbitrary data Extension of the Internet Archive’s ARC format Commonly used to store Web crawls51

A Web special: WARCWARC/0.17WARC-Type: responseWARC-Target-URI: http://www.archive.org/robots.txtWARC-Date: 2008-04-30T20:48:25ZWARC-Payload-Digest: ss: 207.241.229.39WARC-Record-ID: urn:uuid:e7c9eff8-f5bc-4aeb-b3d2-9d3df99afb30 Content-Type: application/http; msgtype responseContent-Length: 782HTTP/1.1 200 OKDate: Wed, 30 Apr 2008 20:48:24 GMTServer: Apache/2.0.54 (Ubuntu) PHP/5.0.5-2ubuntu1.4 mod ssl/2.0.54 OpenSSL/0.9.7gLast-Modified: Sat, 02 Feb 2008 19:40:44 GMTETag: "47c3-1d3-11134700"Accept-Ranges: bytesContent-Length: 467Connection: closeContent-Type: text/plain; charset #52# Welcome to the Archive!

Hadoop’s SequenceFileformat Main usage: intermediate output of Mappers written in this format Flat file, consisting of binary key-value pairs Defines a Reader, Writer and Sorter Three types: Uncompressed key-value pairs Compressed values (“record compressed”) Keys and values compressed (“block compressed”)From small files to SequenceFile:(some key,file content)53

Hadoop’s SequenceFileformatcompression details, file meta-data, sed to synchronise to a record hKeylengthKeyCompressedvalue54No compressionRecord compression

Hadoop’s SequenceFileformatHeaderSyncBlockNumber Compressedof records key lengthsSyncCompressedkeysBlockSyncCompressedvalue lengthsRecordCompressedvaluesBlock compression (allows most compression)55

HDFS: the rest

HDFS is just one possibleimplementation .we can also use the local filesystem57Source: Tom White’s Hadoop: The Definite Guide

Summary Hadoop Counters, setup/cleanup Job scheduling Shuffle & sort Data input58

THE END

Counter basics Counters: Hadoop's way of aggregating statistics Counters count (increment) Built-in counters maintain metrics of the job MapReduce counters (e.g. #skipped records by all maps) File system counters (e.g. #bytes read from HDFS) Job counters (e.g. #launched map tasks) You have already seen them 15 Map .