SIGMETRICS Tutorial: MapReduce - Google Research

Transcription

Tutorials,June 19th 2009MapReduceThe Programming Model and PracticeJerry ZhaoTechnical LeadJelena Pjesivac-GrbovicSoftware Engineer

Yet another MapReduce tutorial?Some tutorials you might have seen:Introduction to MapReduce Programming ModelHadoop Map/Reduce Programming Tutorialand more.What makes this one different:Some complex "realistic" MapReduce examplesBrief discussion of trade-offs between alternativesGoogle MapReduce implementation internals, tuning tipsAbout this presentationEdited collaboratively on Google Docs for domains

Tutorial OverviewMapReduce programming modelBrief intro to MapReduceUse of MapReduce inside GoogleMapReduce programming examplesMapReduce, similar and alternativesImplementation of Google MapReduceDealing with failuresPerformance & scalabilityUsability

What is MapReduce?A programming model for large-scale distributed dataprocessingSimple, elegant conceptRestricted, yet powerful programming constructBuilding block for other parallel programming toolsExtensible for different applicationsAlso an implementation of a system to execute suchprogramsTake advantage of parallelismTolerate failures and jittersHide messy internals from usersProvide tuning knobs for different applications

Programming ModelInspired by Map/Reduce in functional programminglanguages, such as LISP from 1960's, but not equivalent

MapReduce Execution Overview

Tutorial OverviewMapReduce programming modelBrief intro to MapReduceUse of MapReduce inside GoogleMapReduce programming examplesMapReduce, similar and alternativesImplementation of Google MapReduceDealing with failuresPerformance & scalabilityUsability

Use of MapReduce inside GoogleStats for Month Aug.'04Number of jobsAvg. completion time (secs)Machine years ,00039511,081Map input data (TB)Map output data (TB)reduce output data (TB)Avg. machines per 4,018394Unique From "MapReduce: simplified data processing on large clusters"

MapReduce inside GoogleGooglers' hammer for 80% of our data crunchingLarge-scale web search indexingClustering problems for Google NewsProduce reports for popular queries, e.g. Google TrendProcessing of satellite imagery dataLanguage model processing for statistical machinetranslationLarge-scale machine learning problemsJust a plain tool to reliably spawn large number of taskse.g. parallel data backup and restoreThe other 20%? e.g. Pregel

Use of MR in System Health MonitoringMonitoring service talks to everyserver frequentlyCollectHealth signalsActivity informationConfiguration dataStore time-series data foreverParallel analysis of repository dataMapReduce/Sawzall

Investigating System Health IssuesCase studyHigher DRAM errors observed in a new GMail clusterSimilar servers running GMail elsware not affectedSame version of the software, kernel, firmware, etc.Bad DRAM is the initial culprit. but that same DRAM model was fairly healthy elsewhereActual problem: bad motherboard batchPoor electrical margin in some memory bus signalsGMail got more than its fair share of the bad batchAnalysis of this batch allocated to other services confirmed thetheoryAnalysis possible by having all relevant data in one placeand processing power to digest itMapReduce is part of the infrastructure

Tutorial OverviewMapReduce programming modelBrief intro to MapReduceUse of MapReduce inside GoogleMapReduce programming examplesMapReduce, similar and alternativesImplementation of Google MapReduceDealing with failuresPerformance & scalabilityUsability

Application ExamplesWord count and frequency in a large set of documentsPower of sorted keys and valuesCombiners for map outputComputing average income in a city for a given yearUsing customized readers toOptimize MapReduceMimic rudimentary DBMS functionalityOverlaying satellite imagesHandling various input formats using protocol bufers

Word Count ExampleInput: Large number of text documentsTask: Compute word count across all the documentSolutionMapper:For every word in a document output (word, "1")Reducer:Sum all occurrences of words and output (word, total count)

Word Count Solution//Pseudo-code for "word counting"map(String key, String value):// key: document name,// value: document contentsfor each word w in value:EmitIntermediate(w, "1");reduce(String key, Iterator values):// key: a word// values: a list of countsint word count 0;for each v in values:word count ParseInt(v);Emit(key, AsString(word count));No types, just strings*

Word Count Optimization: CombinerApply reduce function to map output before it is sent toreducerReduces number of records outputted by mapper!

Word Frequency ExampleInput: Large number of text documentsTask: Compute word frequency across all the documentFrequency is calculated using the total word countA naive solution with basic MapReduce model requirestwo MapReducesMR1: count number of all words in these documentsUse combinersMR2: count number of each word and divide it by the totalcount from MR1

Word Frequency ExampleCan we do better?Two nice features of Google's MapReduceimplementationOrdering guarantee of reduce keyAuxiliary functionality: EmitToAllReducers(k, v)A nice trick: To compute the total number of words in alldocumentsEvery map task sends its total world count with key ""to ALL reducer splitsKey "" will be the first key processed by reducerSum of its values total number of words!

Word Frequency Solution:Mapper with Combinermap(String key, String value):// key: document name, value: document contentsint word count 0;for each word w in value:EmitIntermediate(w, "1");word count ;EmitIntermediateToAllReducers("", AsString(word count));combine(String key, Iterator values):// Combiner for map output// key: a word, values: a list of countsint partial word count 0;for each v in values:partial word count ParseInt(v);Emit(key, AsString(partial word count));

Word Frequency Solution: Reducerreduce(String key, Iterator values):// Actual reducer// key: a word// values: a list of countsif (is first key):assert("" key); // sanity checktotal word count 0;for each v in values:total word count ParseInt(v)else:assert("" ! key); // sanity checkint word count 0;for each v in values:word count ParseInt(v);Emit(key, AsString(word count / total word count ));

Application ExamplesWord frequency in a large set of documentsPower of sorted keys and valuesCombiners for map outputComputing average income in a city for a given yearUsing customized readers toOptimize MapReduceMimic rudimentary DBMS functionalityOverlaying satellite imagesHandling various input formats using protocol bufers

Average Income In a CitySSTable 1: (SSN, {Personal Information})123456:(John Smith;Sunnyvale, CA)123457:(Jane Brown;Mountain View, CA)123458:(Tom Little;Mountain View, CA)SSTable 2: (SSN, {year, income})123456:(2007, 70000),(2006, 65000),(2005, 6000),.123457:(2007, 72000),(2006, 70000),(2005, 6000),.123458:(2007, 80000),(2006, 85000),(2005, 7500),.Task: Compute average income in each city in 2007Note: Both inputs sorted by SSN

Average Income in a City Basic SolutionMapper 1a:Input: SSN Personal InformationOutput: (SSN, City)Mapper 1b:Input: SSN Annual IncomesOutput: (SSN, 2007 Income)Reducer 1:Input: SSN {City, 2007 Income}Output: (SSN, [City, 2007 Income])Mapper 2:Input: SSN [City, 2007 Income]Output: (City, 2007 Income)Reducer 2:Input: City 2007 IncomesOutput: (City, AVG(2007 Incomes))

Average Income in a City Basic SolutionMapper 1a:Input: SSN Personal InformationOutput: (SSN, City)Mapper 1b:Input: SSN Annual IncomesOutput: (SSN, 2007 Income)Reducer 1:Input: SSN {City, 2007 Income}Output: (SSN, [City, 2007 Income])Mapper 2:Input: SSN [City, 2007 Income]Output: (City, 2007 Income)Reducer 2:Input: City 2007 IncomesOutput: (City, AVG(2007 Incomes))

Average Income in a Joined SolutionMapper:Input: SSN Personal Information and IncomesOutput: (City, 2007 Income)ReducerInput: City 2007 IncomeOutput: (City, AVG(2007 Incomes))

Application ExamplesWord frequency in a large set of documentsPower of sorted keys and valuesCombiners for map outputComputing average income in a city for a given yearUsing customized readers toOptimize MapReduceMimic rudimentary DBMS functionalityOverlaying satellite imagesHandling various input formats using protocol bufers

Stitch Imagery Data for Google MapsA simplified version could be:Imagery data from different content providersDifferent formatsDifferent coveragesDifferent timestampsDifferent resolutionsDifferent exposures/tonesLarge amount to data to be processedGoal: produce data to serve a "satellite" view to users

Stitch Imagery Data Algorithm1. Split the whole territory into "tiles" with fixed location IDs2. Split each source image according to the tiles it covers3. For a given tile, stitch contributions from different sources,based on its freshness and resolution, or other preference4. Serve the merged imagery data for each tile, so they can beloaded into and served from a image server farm.

Using Protocol Buffersto Encode Structured DataOpen sourced from Google, among many others:http://code.google.com/p/protobuf/It supports C , Java and Python.A way of encoding structured data in an efficient yet extensibleformat. e.g. we can definemessage Tile {required int64 location id 1;group coverage {double latitude 2;double longitude 3;double width 4;// in kmdouble length 5;// in km}required bytes image data 6; // Bitmap Image datarequired int64 timestamp 7;optional float resolution 8 [default 10];optinal string debug info 10;}Google uses Protocol Buffers for almost all its internal RPCprotocols, file formats and of course in MapReduce.

Stitch Imagery Data Solution: Mappermap(String key, String value):// key: image file name// value: image dataTile whole image;switch (file type(key)):FROM PROVIDER A: Convert A(value, &whole image);FROM PROVIDER B: Convert B(.);.// split whole image according to the grid into tilesfor each Tile t in whole imagestring r(t.location id(),v);

Stitch Imagery Data Solution: Reducerreduce(String key, Iterator values):// key: location id,// values: tiles from different sourcessort values according v.resolution() and v.timestamp();Tile merged tile;for each v in values:overlay pixels in v to merged tile based onv.coverage();Normalize merged tile to be the serve tile size;Emit(key, ProtobufToString(merged tile));

Tutorial OverviewMapReduce programming modelBrief intro to MapReduceUse of MapReduce inside GoogleMapReduce programming examplesMapReduce, similar and alternativesImplementation of Google MapReduceDealing with failuresPerformance & scalabilityUsability

Distributed Computing LandscapeDimensions to compare Apples and OrangesData organizationProgramming modelExecution modelTarget applicationsAssumed computing environmentOverall operating cost

My Basket of Fruit

Nutritional Information of My BasketMPIWhat they areProgramming ModelData organizationA general parrellelprogramming paradigmMapReduceDBMS/SQLA programming paradigmA system to store, manipulateand its associated execution and serve data.systemMessages passing between Restricted to Map/ReducenodesoperationsDeclarative on dataquery/retrieving;Stored proceduresNo assumption"files" can be shardedOrganized datastructuresk,v pairs: string/protomsgTables with rich typesData to be manipulated AnyExecution modelNodes are hysical data localityTransactionQuery/operation optimizationMaterialized viewUsabilitySteep learning curve*;difficult to debugSimple conceptCould be hard to optimizeDeclarative interface;Could be hard to debug inruntimeFlexible to accommodatevarious applicationsPlow through large amountof data with commodityhardwareInteractive querying the data;Maintain a consistent viewacross clientsKey selling pointSee what others say: [1], [2], [3]

Taste Them with Your Own Grain of SaltDimensions to choose between Apples and Oranges for anapplication developer:Target applicationsComplex operations run frequently v.s. one time plowOff-line processing v.s. real-time servingAssumed computing environmentOff-the-shelf, custom-made or donatedFormats and sources of your dataOverall operating costHardware maintenance, license feeManpower to develop, monitor and debug

Existing MapReduce and Similar SystemsGoogle MapReduceSupport C , Java, Python, Sawzall, etc.Based on proprietary infrastructuresGFS(SOSP'03), MapReduce(OSDI'04) , Sawzall(SPJ'05), Chubby(OSDI'06), Bigtable(OSDI'06)and some open source librariesHadoop Map-ReduceOpen Source! (Kudos to Doug and the team.)Plus the whole equivalent package, and moreHDFS, Map-Reduce, Pig, Zookeeper, HBase, HiveUsed by Yahoo!, Facebook, Amazon and Google-IBM NSF clusterDryadProprietary, based on Microsoft SQL serversDryad(EuroSys'07), DryadLINQ(OSDI'08)Michael's Dryad TechTalk@Google (Nov.'07)And others

Tutorial OverviewMapReduce programming modelBrief intro to MapReduceUse of MapReduce inside GoogleMapReduce programming examplesMapReduce, similar and alternativesImplementation of Google MapReduceDealing with failuresPerformance & scalabilityUsability

Google Computing InfrastructureInfrastructure must supportDiverse set of applicationsIncreasing over timeEver-increasing application usageEver-increasing computational requirementsCost effectiveData centersGoogle-specific mechanical, thermal and electrical designHighly-customized PC-class motherboardsRunning LinuxIn-house management & application software

Sharing is the Way of Life Batch processing(MapReduce, Sazwall)

Major ChallengesTo organize the world’s information and make it universallyaccessible and useful.Failure handlingBad apples appear now and thereScalabilityFast growing datasetBroad extension of Google servicesPerformance and utilizationMinimizing run-time for individual jobsMaximizing throughput across all servicesUsabilityTroubleshootingPerformance tuningProduction monitoring

Failures in LiteratureLANL data (DSN 2006)Data collected over 9 yearsCovered 4750 machines and 24101 CPUsDistribution of failuresHardware 60%, Software 20%, Network/Environment/Humans 5%, Aliens 25%*Depending on a system, failures occurred betweenonce a day to once a monthMost of the systems in the survey were the cream of the crop attheir timePlanetLab (SIGMETRICS 2008 HotMetrics Workshop)Average frequency of failures per node in a 3-months periodHard failures: 2.1Soft failures: 41Approximately failure every 4 days

Failures in Google Data CentersDRAM errors analysis (SIGMETRICS 2009)Data collected over 2.5 years25,000 to 70,000 errors per billion device hours per MbitOrder of magnitude more than under lab conditions8% of DIMMs affected by errorsHard errors are dominant cause of failureDisk drive failure analysis (FAST 2007)Annualized Failure Rates vary from 1.7% for one year olddrives to over 8.6% in three year old onesUtilization affects failure rates only in very old and very olddisk drive populationsTemperature change can cause increase in failure ratesbut mostly for old drives

Failures in GoogleFailures are a part of everyday lifeMostly due to the scale and shared environmentSources of job failuresHardwareSoftwarePreemption by a more important jobUnavailability of a resource due to overloadFailure typesPermanentTransient

Different Failures Require Different ActionsFatal failure (the whole job dies)Simplest case around :)You'd prefer to resume computation rather than recomputeTransient failuresYou'd want your job to adjust and finish whenissues resolveProgram hangs. forever.Define "forever"Can we figure out why?What to do?"It's-Not-My-Fault" failures

MapReduce: Task Failure

Recover from Task Failure by Reexecution

Recover by Checkpointing Map Output

MapReduce: Master Failure

Master as a Single Point of Failure

Resume from Execution Log on GFS

MapReduce: Slow Worker/Task

Handle Unfixable FailuresInput data is in a partially wrong format or is corruptedData is mostly well-formatted, but there are instances whereyour code crashesCorruptions happen rarely, but they are possible at scaleYour application depends on an external library which youdo not controlWhich happens to have a bug for a particular, yet very rare,input patternWhat would you do?Your job is critical to finish as soon as possibleThe problematic records are very rareIGNORE IT!

Tutorial OverviewMapReduce programming modelBrief intro to MapReduceUse of MapReduce inside GoogleMapReduce programming examplesMapReduce, similar and alternativesImplementation of Google MapReduceDealing with failuresPerformance & scalabilitySome techniques and tuning tipsDealing with stragglersUsability

Performance and Scalability ofMapReduceTerasort and Petasort with MapReduce in Nov 2008Not particularly representative for production MRsAn important benchmark to evaluate the whole stackSorted 1TB (as 10 billion 100-byte uncompressed text)on 1,000 computers in 68 secondsSorted 1PB (10 trillion 100-byte records) on 4,000computers in 6 hours and 2 minutesWith Open-source Hadoop in May 2009 (TechReport)Terasort: 62 seconds on 1460 nodesPetasort: 16 hours and 15 minutes on 3658 nodes

Built up on Great Google InfrastructureGoogle MapReduce is built upon an set of highperformance infrastructure components:Google file system (GFS) (SOSP'03)Chubby distributed lock service (OSDI'06)Bigtable for structured data storage (OSDI'06)Google cluster management systemPowerful yet energy efficient* hardware and finetunedplatform softwareOther house-built libraries and services

Take Advantage of Locality Hints fromGFSFiles in GFSDivided into chunks (default 64MB)Stored with replications, typical r 3Reading from local disk is much faster and cheaperthan reading from a remote serverMapReduce uses the locality hints from GFSTry to assign a task to a machine with a local copy ofinputOr, less preferable, to a machine where a copystored on a server on the same network switchOr, assign to any available worker

Tuning Task GranularityQuestions often asked in production:How many Map tasks I should split my input into?How many Reduce splits I should have?Implications on scalabilityMaster has to make O(M R) decisionsSystem has to keep O(M*R) metadata for distributingmap output to reducersTo balance locality, performance and scalabilityBy default, each map task is 64MB ( GFS chunksize)Usually, #reduce tasks is a small multiple of #machine

More on Map Task SizeSmall map tasks allow fast failure recoveryDefine "small": input size, output size or processing timeBig map tasks may force mappers to read frommultiple remote chunkserversToo many small map shards might lead to excessiveoverhead in map output distribution

Reduce Task Partitioning FunctionIt is relatively easy to control Map input granularityEach map task is independentFor Reduce tasks, we can tweak the partitioning functioninstead.Reduce key*.blogspot.comReduceinput 7Gaverage reduce input size for a givenkey300K

Tutorial OverviewMapReduce programming modelBrief intro to MapReduceUse of MapReduce inside GoogleMapReduce programming examplesMapReduce, similar and alternativesImplementation of Google MapReduceDealing with failuresPerformance & scalabilityDealing with stragglersUsability

Dealing with Reduce StragglersMany reason leads to stragglers but reducing is inherentlyexpensive:Reducer retrieves data remotely from many serversSorting is expensive on local resourcesReducing usually can not start until Mapping is doneRe-execution due to machine failures could double theruntime.

Dealing with Reduce StragglersTechnique 1:Create a backup instance as early and as necessary aspossible

Steal Reduce Input for BackupsTechnique 2:Retrieving map output and sorting are expensive, but wecan transport the sorted input to the backup reducer

Reduce Task SplittingTechnique 3:Divide a reduce task into smaller ones to take advantage ofmore parallelism.

Tutorial OverviewMapReduce programming modelBrief intro to MapReduceUse of MapReduce inside GoogleMapReduce programming examplesMapReduce, similar and alternativesImplementation of Google MapReduceDealing with failuresPerformance & scalability(Operational) Usabilitymonitoring, debugging, profiling, etc.

Tools for Google MapReduceLocal run mode for debugging/profiling MapReduceapplicationsStatus page to monitor and track progress of MapReduceexecutions, alsoEmail notificationReplay progress postmortemDistributed counters used by MapReduce library andapplication for validation, debugging and tuningSystem invariantPerformance profiling

MapReduce CountersLight-weighted stats with only "increment" operationsper task counters: contributed by each M/R taskonly counted once even there are backup instancesper worker counters: contributed by each workerprocessaggregated contributions from all instancesCan be easily added by developersExamples:num map output records num reduce input recordsCPU time spend in Map() and Reduce() functions

MapReduce Development inside GoogleSupport C , Java, Python, Sawzall, etc.Nurtured greatly by Google engineer communityFriendly internal user discussion groupsFix-it! instead of complain-about-it! attitudeUsers contribute to both the core library and contribThousands of Mapper Reducer implementationsTens of Input/Output formatsEndless new ideas and proposals

SummaryMapReduce is a flexible programming framework formany applications through a couple of restricted Map()/Reduce() constructsGoogle invented and implemented MapReduce aroundits infrastructure to allow our engineers scale with thegrowth of the Internet, and the growth of Googleproducts/servicesOpen source implementations of MapReduce, such asHadoop are creating a new ecosystem to enable largescale computing over the off-the-shelf clustersSo happy MapReducing!

Thank you!

A naive solution with basic MapReduce model requires two MapReduces MR1: count number of all words in these documents Use combiners MR2: count number of each word and divide it by the total count from MR1. Word Frequency Example Can we do better? Two nice features of Google's MapReduce