Clustering Lecture 8: MapReduce - University Of Arkansas

Transcription

ClusteringLecture 8: MapReduceJing GaoSUNY Buffalo1

Divide and ker”“worker”r1r2r3“Result”Combine4

Distributed GrepVerybigdataSplit datagrepmatchesSplit datagrepmatchesSplit datagrepmatchesSplit datagrepmatchescatAllmatches5

Distributed Word CountVerybigdataSplit datacountcountSplit datacountcountSplit datacountcountSplit datacountcountmergemergedcount6

Parallelization Challenges How do we assign work units to workers? What if we have more work units thanworkers? What if workers need to share partial results? How do we aggregate partial results? How do we know all the workers havefinished? What if workers die?7

Common Theme? Parallelization problems arise from– Communication between workers (e.g., toexchange state)– Access to shared resources (e.g., data) Thus, we need a synchronization mechanism8

Source: Ricardo Guimarães Herrmann

Managing Multiple Workers Difficult because– We don’t know the order in which workers run– We don’t know when workers interrupt each other– We don’t know the order in which workers access shared data Thus, we need– Semaphores (lock, unlock)– Conditional variables (wait, notify, broadcast)– Barriers Still, lots of problems– Deadlock, race conditions, . Moral of the story: be careful!10

What’s the point? Right level of abstraction– multi-core/cluster environment Hide system-level details from the developers– No more race conditions, lock contention, etc. Separating the what from how– Developer specifies the computation that needs tobe performed– Execution framework (“runtime”) handles actualexecution12

MapReduce Key properties– Google has used successfully is processing its “big-data” sets( 20000 peta bytes per day)– Users specify the computation in terms of a map and areduce function– Underlying runtime system automatically parallelizes thecomputation across large-scale clusters of machines– Underlying system also handles machine failures, efficientcommunications, and performance issues13

MapReduce can refer to The programming model The execution framework (aka “runtime”) The specific implementationUsage is usually clear from context!14

Typical Large-Data Problem Iterate over a large number of recordsExtract something of interest from eachShuffle and sort intermediate resultsAggregate intermediate resultsGenerate final outputKey idea: provide a functional abstraction for these twooperations15(Dean and Ghemawat, OSDI 2004)

MapReduce Programming Model Programmers specify two functions:map (k, v) [(k’, v’)]reduce (k’, [v’]) [(k’, v’)]– All values with the same key are sent to the samereducer The execution framework handles everythingelse 16

“Everything Else” The execution framework––––Scheduling: assigns workers to map and reduce tasks“Data distribution”: moves processes to dataSynchronization: gathers, sorts, and shuffles intermediate dataErrors and faults: detects worker failures and restarts Limited control over data and execution flow– All algorithms must expressed in m, r, c, p You don’t know:––––Where mappers and reducers runWhen a mapper or reducer begins or finishesWhich input a particular mapper is processingWhich intermediate key a particular reducer is processing17

MapReduce Implementations Google MapReduce– Not available outside Google Hadoop– An open-source implementation in Java– Development led by Yahoo, used in production– Now an Apache project– Rapidly expanding software ecosystem Custom research implementations– For GPUs, cell processors, etc.19

Who uses Hadoop? Amazon/A9FacebookGoogleIBMJoostLast.fmNew York TimesPowerSetVeohYahoo! 20

How do we get data to the workers?NASSANCompute NodesWhat’s the problem here?21

Distributed File System Move workers to the data– Store data on the local disks of nodes in the cluster– Start up the workers on the node that has the datalocal Why?– Not enough RAM to hold all the data in memory– Disk access is slow, but disk throughput is reasonable A distributed file system– GFS (Google File System) for Google’s MapReduce– HDFS (Hadoop Distributed File System) for Hadoop22

Distributed File System Design Chunk Servers––––File is split into contiguous chunksTypically each chunk is 16-64MBEach chunk replicated (usually 2x or 3x)Try to keep replicas in different racks Master node– a.k.a. Name Nodes in HDFS– Stores metadata– Might be replicated Client library for file access– Talks to master to find chunk servers– Connects directly to chunk servers to access data23

Hadoop Cluster ArchitectureClientTaskTrackerDataNodeSlave nodeJob submission nodeHDFS masterJobTrackerNameNodeTaskTrackerDataNodeSlave nodeFrom Jimmy Lin’s slidesTaskTrackerDataNodeSlave node25

Map ReduceVerybigdataREDUCEMAP Map:– Accepts input key/valuepair– Emits intermediatekey/value pairResult Reduce :– Accepts intermediatekey/value* pair– Emits output key/valuepair26

The Map StepInputkey-value pairsIntermediatekey-value pairskvkvkvmapkvkv kmap vkv27

The Reduce StepIntermediatekey-value pairsOutputkey-value pairsKey-value groupsreducekvkvvvkvkvreducekvkvgroupkv kvvk vkv28

MapReduce Input: a set of key/value pairs User supplies two functions:– map(k,v) list(k1,v1)– reduce(k1, list(v1)) (k1,v2) (k1,v1) is an intermediate key/value pair Output is the set of (k1,v2) pairs29

Word Count We have a large collection of documents Count the number of times each distinct wordappears in the collection of documents

Word Count ExecutionInputthe quickbrown foxMapMapShuffle & SortReduceOutputReducebrown, 2fox, 2how, 1now, 1the, 3Reduceate, 1cow, 1mouse, 1quick, 1the, 1brown, 1fox, 1the, 1fox, 1the, 1the fox atethe mouseMapquick, 1how, 1now, 1brown, 1how nowbrown cowMapate, 1mouse, 1cow, 131

Word Count using MapReducemap(key, value):// key: document name; value: text of documentfor each word w in value:emit(w, 1)reduce(key, values):// key: a word; value: an iterator over countsresult 0for each count v in values:result vemit(result)32

Combiners Often a map task will produce many pairs of the form(k,v1), (k,v2), for the same key k– E.g., popular words in Word Count Can save network time by pre-aggregating at mapperFor associative ops. like sum, count, maxDecreases size of intermediate dataExample: local counting for Word Count:def combiner(key, values):output(key, sum(values))33

Word Count with CombinerInputthe quickbrown foxMap & CombineMapShuffle & SortReduceOutputReducebrown, 2fox, 2how, 1now, 1the, 3Reduceate, 1cow, 1mouse, 1quick, 1the, 1brown, 1fox, 1the, 2fox, 1the fox atethe mouseMapquick, 1how, 1now, 1brown, 1how nowbrown cowMapate, 1mouse, 1cow, 134

Partition Function Inputs to map tasks are created by contiguoussplits of input file For reduce, we need to ensure that records withthe same intermediate key end up at the sameworker System uses a default partition function e.g.,hash(key) mod R Sometimes useful to override– Balance the loads– Specific requirement on which key value pairs shouldbe in the same output files35

k1 v1k2 v2mapa 1k3 v3k4 v4mapb 2c 3k5 v5k6 v6mapc 6a 5mapc 2b 7c 8Shuffle and Sort: aggregate values by keysa1 5b2 7c2 3 6 8reducereducereducer1 s1r2 s2r3 s336

k1 v1k2 v2mapa 1k4 v4mapb 2c 3combinea 1k3 v3c 6a 5mapc 2b 7combinec 9partitionk6 v6mapcombineb 2k5 v5a 5partitionc 8combinec 2b 7partitionc 8partitionShuffle and Sort: aggregate values by keysa1 5b2 7c2 39 68 8reducereducereducer1 s1r2 s2r3 s337

Limited control over data and execution flow -All algorithms must expressed in m, r, c, p You don ' t know: -Where mappers and reducers run -When a mapper or reducer begins or finishes -Which input a particular mapper is processing -Which intermediate key a particular reducer is processing . 17