Pig, A High Level Data Processing System On Hadoop

Transcription

Pig, a high level dataprocessing system on Hadoop

Is MapReduce not Good Enough? Restricted programming model Only two phasesJob chain for long data flowToo many lines of code even for simple logic How many lines do you have for word count?Programmers are responsible for this2

Pig to the Rescue High level dataflow language (Pig Latin) Much simpler than JavaSimplifies the data processingPuts the operations at the apropriate phasesChains multiple MR jobs3

How Pig is used in the Industry At Yahoo, 70% MapReduce jobs are written inPigUsed to Process web logsBuild user behavior modelsProcess imagesData miningAlso used by Twitter, LinkedIn, eBay, AOL, .4

Motivation by Example Suppose we haveuser data in one file,website data inanother file.We need to find thetop 5 most visitedpages by usersaged 18-255

In MapReduce6

In Pig Latin7

Pig runs over Hadoop8

Wait a minute How to map the data to records By default, one line one recordUser can customize the loading processHow to identify attributes and map them to theschema Delimiter to separate different attributesBy default, delimiter is tab. Customizable.9

MapReduce Vs. Pig cont. Join in MapReduce Various algorithms. None of them are easy toimplement in MapReduceMulti-way join is more complicatedHard to integrate into SPJA workflow10

MapReduce Vs. Pig cont. Join in Pig Various algorithms are already available.Some of them are generic to support multi-way joinNo need to consider integration into SPJA workflow. Pigdoes that for you!A LOAD 'input/join/A';B LOAD 'input/join/B';C JOIN A BY 0, B BY 1;DUMP C;11

Pig Latin Data flow language Users specify a sequence of operations toprocess dataMore control on the process, compared withdeclarative languageVarious data types are supportedSchema is supportedUser-defined functions are supported12

Statement A statement represents an operation, or a stage inthe data flowUsually a variable is used to represent the result ofthe statementNot limited to data processing operations, but alsocontains filesystem operations13

Schema User can optionally define the schema of the inputdataOnce the schema of the source data is given, theschema of the intermediate relation will be inducedby Pig14

Schema cont. Why schema? Scripts are more readable (by alias) Help system validate the inputSimilar to Database? Yes. But schema here is optional Schema is not fixed for a particular dataset,but changable15

Schema cont. Schema 1A LOAD 'input/A' as (name:chararray, age:int);B FILTER A BY age ! 20; Schema 2A LOAD 'input/A' as (name:chararray, age:chararray);B FILTER A BY age ! '20'; No SchemaA LOAD 'input/A' ;B FILTER A BY A. 1 ! '20';16

Data Types Every attribute can always be interpreted as abytearray, without further type definitionSimple data types For each attribute Defined by user in the schema Int, double, chararray .Complex data types Usually contructed by relational operations Tuple, bag, map17

Data Types cont. Type casting Pig will try to cast data types when typeinconsistency is seen. Warning will be thrown if casting fails. Processstill goes onValidation Null will replace the inconvertable data type intype casting User can tell a corrupted record by detectingwhether a particular attribute is null18

Date Types cont.19

Operators Relational Operators Represent an operation that will be added tothe logical plan LOAD, STORE, FILTER, JOIN,FOREACH.GENERATE20

Operators Diagnostic Operators Show the status/metadata of the relations Used for debugging Will not be integrated into execution plan DESCRIBE, EXPLAIN, ILLUSTRATE.21

Functions Eval Functions Filter Functions Impose ordering between two records. Used by ORDERoperationLoad Functions Test whether a record satisfies particular predicateComparison Functions Record transformationSpecify how to load data into relationsStore Functions Specify how to store relations to external storage22

Functions Built-in Functions Hard-coded routines offered by Pig.User Defined Function (UDF) Supports customized functionalities Piggy Bank, a warehouse for UDFs23

View of Pig from inside

Pig Execution Modes Local mode Launch single JVMAccess local file systemNo MR job runningHadoop mode Execute a sequence of MR jobsPig interacts with Hadoop master node25

CompilationCompilation26

ParsingParsing Type checking with schema Reference verification Logical plan generation One-to-one fashion Independent of execution platform Limited optimization No execution until DUMP or STORE04/13/1027

Logical Plan Logic PlanA LOAD 'file1' AS (x, y, z);LOADLOADB LOAD 'file2' AS (t, u, v);C FILTER A by y 0;FILTERD JOIN C BY x, B BY u;E GROUP D BY z;F FOREACH E GENERATEgroup, COUNT(D);JOINGROUPSTORE F INTO 'output';FOREACHSTORE04/13/1028

Physical PlanPhysical Plan 1:1 correspondence with most logical operators Except for: DISTINCT (CO)GROUP JOIN ORDER04/13/1029

Joins in MapReduce Two typical types of join Map-side joinReduce-side join30

Map-side JoinTable LMap tasks:Table R31

REDUCE-SIDE JOINPairs:Pairs: (key,(key, targetedtargeted 021091::661::3::9783019681::661::4::9783002751 ::1193::5::978824291193, L:1::1193::5::978300760661, L :1::661::3::978302109661, L :1::661::3::978301968661, L :1::661::4::9783002751193, L :1 ::1193::5 ::97882429(661, )(661, )(661, )(1193, )(1193, )L: ratings.dat661::James and theGlant 914::My Fair Lady.1193::One Flew Overthe 2355::Bug’s Life, A 3408::Erin Brockovich 661, R:661::James and the Gla 914, R: 914::My Fair Lady.1193, R: 1193::One Flew Over 2355, R: 2355::Bug’s Life, A 3408, R: 3408::Erin Brockovi (661, )(2355, )(3048, )GroupGroup byby joinjoin keykey(661,[L :1::661::3::97 ],[R:661::James ],[L:1::661::3::978 ],[L :1::661::4::97 ])(2355, [R:2355::B’ ])(3408, [R:3408::Eri ]){(661::James ) }X(1::661::3::97 ),(1::661::3::97 ),(1::661::4::97 )(1,Ja.,3, )(1,Ja.,3, )(1,Ja.,4, )BuffersBuffers recordsrecords intointo twotwo setssetsaccordingtothetableaccording to the table tagtag Cross-productCross-product(914, )(1193, )Drawback: all records may have to be bufferedR: movies.datOut of memory The key cardinality is small The data is highly skewedPhase /FunctionImprovementMap FunctionOutput key is changed to a composite of the join key and thetable tag.Partitioning functionHashcode is computed from just the join key part of thecomposite keyGrouping functionRecords are grouped on just the join key32

Physical PlanPhysical Plan 1:1 correspondence with most logical operators Except for: DISTINCT (CO)GROUP JOIN ORDER04/13/1033

LOADLOADFILTERLOCAL REARRANGELOADLOADGLOBAL REARRANGEFILTERPACKAGEJOINFOREACHLOCAL REARRANGEGROUPGLOBAL REARRANGEFOREACHSTORE04/13/10PACKAGEFOREACHSTORE34

PhysicalOptimizationsPhysical Optimization Always use combiner for pre-aggregation Insert SPLIT to re-use intermediate result Early projection (logical or physical?)04/13/1035

MapReducePlanMapReduce Plan Determine MapReduce boundaries GLOBAL REARRANGE STORE/LOADSome operations are done by MapReduceframeworkCoalesce other operators into Map & ReducestagesGenerate job jar file04/13/1036

LOADLOADMapFILTERFILTERLOCAL REARRANGELOCAL REARRANGEGLOBAL REARRANGEReducePACKAGEPACKAGEFOREACHFOREACHLOCAL REARRANGEMapLOCAL REARRANGEGLOBAL OREACH37

Execution in Hadoop Mode The MR jobs not dependent on anything inthe MR plan will be submitted for executionMR jobs will be removed from MR plan aftercompletion Jobs whose dependencies are satisfied are nowready for executionCurrently, no support for inter-job faulttolerance38

Discussion of the TwoReadings on Pig (SIGMOD2008 and VLDB 2009)

Discussion Points for Reading 1 Examples of the nested data model,CoGroup, and Join (Figure 2)Nested query in Section 3.740

What are the Logical, Physical, andMapReduce plans for:STORE answer INTO ‘/user/alan/answer’;41

LOADLOADMapLOCAL REARRANGELOCAL REARRANGEGLOBAL L REARRANGELOCAL REARRANGEGLOBAL FOREACHFILTERSTORE42

Recall Operator PlumbingπB,DσR.A “c”SR Materialization: output of one operator written todisk, next operator reads from the diskPipelining: output of one operator directly fed tonext operator43

MaterializationπB,DMaterialized hereσR.A “c”SR44

Iterators: PipeliningπB,DσR.A “c”S Each operator supports: Open() GetNext() Close()R45

How do these operators execute in Pig? MapFILTERLOCAL REARRANGEReducePACKAGEHints (based on Reading 2): What will Hadoop’s mapfunction and reduce functioncalls do in this case? How does each operator work?What does each operator do?(Section 4.3) Outermost operator graph(Section 5) Iterator model (Section 5)FOREACH04/13/1046

Branching Flows in Pigclicks LOAD clicks'AS (userid, pageid, linkid, viewedat);SPLIT clicks INTOpages IF pageid IS NOT NULL,links IF linkid IS NOT NULL;cpages FOREACH pages GENERATEuserid,CanonicalizePage(pageid) AS cpage,viewedat;clinks FOREACH links GENERATE userid,CanonicalizeLink(linkid) AS clink,viewedat;STORE cpages INTO pages';STORE clinks INTO links';04/13/10 Hints (based on Reading 2,Section 5.1, last two parasbefore Section 5.1.1): Outermost data flow graphNew pause signal for iterators47

Branching Flows in Pig Draw the MapReduce plan for this queryclicks LOAD clicks'AS (userid, pageid, linkid, viewedat);byuser GROUP clicks BY userid;result FOREACH byuser {uniqPages DISTINCT clicks.pageid;uniqLinks DISTINCT clicks.linkid;GENERATE group, COUNT(uniqPages),COUNT(uniqLinks);};04/13/1048

Branching Flows in Pig Draw the MapReduce plan for this queryclicks LOAD clicks'AS (userid, pageid, linkid, viewedat);byuser GROUP clicks BY userid;result FOREACH byuser {fltrd FILTER clicks BY viewedat IS NOTNULL;uniqPages DISTINCT fltrd.pageid;uniqLinks DISTINCT fltrd.linkid;GENERATE group, COUNT(uniqPages),COUNT(uniqLinks);};04/13/1049

Performance and futureimprovement

Pig PerformanceImages from http://wiki.apache.org/pig/PigTalksPapers51

Future Improvements Query optimization Non-Java UDFsGrouping and joining on pre-partitioned/sorted data Currently rule-based optimizer for plan rearrangementand join selectionCost-based in the futureAvoid data shuffling for grouping and joiningBuilding metadata facilities to keep track of data layoutSkew handling For load balancing52

Get more information at the Pig websiteYou can work with the source code toimplement something new in PigAlso take a look at Hive, a similar systemfrom Facebook53

References Some of the content come from the followingpresentations: Introduction to data processing using Hadoop andPig, by Ricardo VarelaPig, Making Hadoop Easy, by Alan F. GatesLarge-scale social media analysis with Hadoop,by Jake HofmanGetting Started on Hadoop, by Paco NathanMapReduce Online, by Tyson Condie and NeilConway54

Introduction to data processing using Hadoop and Pig, by Ricardo Varela Pig, Making Hadoop Easy, by Alan F. Gates Large-scale social media analysis with Hadoop, by Jake Hofman Getting Started on Hadoop, by Paco Nathan MapReduce Online, by Tyson Condie and Neil Conway 54. Title: Microsoft PowerPoint - LectureNotes_PigLatin.ppt