Intro To Apache Spark - GitHub Pages

Transcription

Intro to Apache Sparkhttp://databricks.com/download slides:training.databricks.com/workshop/itas workshop.pdfLicensed under a Creative Commons Attribution-NonCommercialNoDerivatives 4.0 International License

00: Getting StartedIntroductioninstalls intros, while people arrive: 20 min

Intro: Online Course MaterialsResources for the course are available nload slides code data to your laptop:training.databricks.com/workshop/itas ip(should have been provided on USB sticks)!Also, please complete our course survey:http://goo.gl/QpBSnR

Intro: Success CriteriaBy end of day, participants will be comfortablewith the following: open a Spark Shell develop Spark apps for typical use cases use of some ML algorithms explore data sets loaded from HDFS, etc. review of Spark SQL, Spark Streaming, MLlib follow-up courses and certification developer community resources, events, etc. return to workplace and demo use of Spark!

Intro: Preliminaries intros: tell us briefly about your background anyone need to use AWS instead of laptops?key, if needed? See tutorial: PEMConnect to Your Amazon EC2 Instancefrom Windows Using PuTTY

01: Getting StartedInstallationhands-on lab: 20 min

Installation:Let’s get started using Apache Spark,in just four easy steps databricks.com/spark-training-resources#itasfor class, copy from the USB sticks!NB: please do not install/run Spark using: Homebrew on MacOSX Cygwin on Windows

Step 1: Install Java JDK 6/7 on MacOSX or s/jdk7-downloads-1880260.html follow the license agreement instructions then click the download for your OS need JDK instead of JRE (for Maven, etc.)

Step 2: Download Sparkwe will use Spark 1.1.01. copy from the USB sticks2. double click the archive file to open it3. connect into the newly created directory!for a fallback: spark.apache.org/downloads.html

Step 3: Run Spark Shellwe’ll run Spark’s interactive shell within the “spark” directory, run:./bin/spark-shell!then from the “scala ” REPL prompt,let’s create some data val data 1 to 10000

Step 4: Create an RDDcreate an RDD based on that data val distData sc.parallelize(data)!then use a filter to select values less than 10 distData.filter( 10).collect()

Step 4: Create an RDDcreate anval distData sc.parallelize(data)then use a filter to select values less than 10 dCheckpoint:what do you get for d#file-01-repl-txt

Installation: Optional Downloads: PythonFor Python 2.7, check out Anaconda byContinuum Analytics for a conda/

Installation: Optional Downloads: MavenJava builds later also require Maven, which youcan download at:maven.apache.org/download.cgi

03: Getting StartedSpark Deconstructedlecture: 20 min

Spark Deconstructed:Let’s spend a few minutes on this Scala thing scala-lang.org/

Spark Deconstructed: Log Mining Example// load error messages from a log into memory!// then interactively search for various patterns!// 2!!// base RDD!val lines sc.textFile("hdfs://.")!!// transformed RDDs!val errors lines.filter( .startsWith("ERROR"))!val messages errors.map( .split("\t")).map(r r(1))!messages.cache()!!// action 1!messages.filter( .contains("mysql")).count()!!// action 2!messages.filter( .contains("php")).count()

Spark Deconstructed: Log Mining ExampleWe start with Spark running on a cluster submitting code to be evaluated on it:WorkerWorkerDriverWorker

Spark Deconstructed: Log Mining Example// base RDD!val lines sc.textFile("hdfs://.")!!// transformed RDDs!val errors lines.filter( .startsWith("ERROR"))!val messages errors.map( .split("\t")).map(r r(1))!messages.cache()!!// action 1!messages.filter( .contains("mysql")).count()!!discussing the other part// action 2!messages.filter( .contains("php")).count()

Spark Deconstructed: Log Mining ExampleAt this point, take a look at the transformedRDD operator graph:scala messages.toDebugString!res5: String !MappedRDD[4] at map at console :16 (3 partitions)!MappedRDD[3] at map at console :16 (3 partitions)!FilteredRDD[2] at filter at console :14 (3 partitions)!MappedRDD[1] at textFile at console :12 (3 partitions)!HadoopRDD[0] at textFile at console :12 (3 partitions)

Spark Deconstructed: Log Mining Example// base RDD!val lines sc.textFile("hdfs://.")!!// transformed RDDs!val errors lines.filter( .startsWith("ERROR"))!val messages errors.map( .split("\t")).map(r r(1))!messages.cache()!!// action 1!messages.filter( .contains("mysql")).count()!!Worker// action 2!messages.filter( .contains("php")).count()discussing the other partWorkerDriverWorker

Spark Deconstructed: Log Mining Example// base RDD!val lines sc.textFile("hdfs://.")!!// transformed RDDs!val errors lines.filter( .startsWith("ERROR"))!val messages errors.map( .split("\t")).map(r r(1))!messages.cache()!!// action 1!messages.filter( .contains("mysql")).count()!!Worker// action 2!messages.filter( .contains("php")).count()block 1discussing the other partWorkerDriverblock 2Workerblock 3

Spark Deconstructed: Log Mining Example// base RDD!val lines sc.textFile("hdfs://.")!!// transformed RDDs!val errors lines.filter( .startsWith("ERROR"))!val messages errors.map( .split("\t")).map(r r(1))!messages.cache()!!// action 1!messages.filter( .contains("mysql")).count()!!Worker// action 2!messages.filter( .contains("php")).count()block 1discussing the other partWorkerDriverblock 2Workerblock 3

Spark Deconstructed: Log Mining Example// base RDD!val lines sc.textFile("hdfs://.")!!// transformed RDDs!val errors lines.filter( .startsWith("ERROR"))!val messages errors.map( .split("\t")).map(r r(1))!messages.cache()!!// action 1!messages.filter( .contains("mysql")).count()!!Worker// action 2!messages.filter( .contains("php")).count()block 1discussing the other partreadHDFSblockWorkerDriverblock 2Workerblock 3readHDFSblockreadHDFSblock

Spark Deconstructed: Log Mining Example// base RDD!val lines sc.textFile("hdfs://.")!!// transformed RDDs!val errors lines.filter( .startsWith("ERROR"))!val messages errors.map( .split("\t")).map(r r(1))!messages.cache()!!// action 1!messages.filter( .contains("mysql")).count()!cache 1!Worker// action 2!messages.filter( .contains("php")).count()process,cache datablock 1discussing the other partcache 2WorkerDriverprocess,cache datablock 2cache 3Workerblock 3process,cache data

Spark Deconstructed: Log Mining Example// base RDD!val lines sc.textFile("hdfs://.")!!// transformed RDDs!val errors lines.filter( .startsWith("ERROR"))!val messages errors.map( .split("\t")).map(r r(1))!messages.cache()!!// action 1!messages.filter( .contains("mysql")).count()!cache 1!Worker// action 2!messages.filter( .contains("php")).count()block 1discussing the other partcache 2WorkerDriverblock 2cache 3Workerblock 3

Spark Deconstructed: Log Mining Example// base RDD!val lines sc.textFile("hdfs://.")!!// transformed RDDs!val errors lines.filter( .startsWith("ERROR"))!val messages errors.map( .split("\t")).map(r r(1))!messages.cache()!discussing the other part!// action 1!messages.filter( .contains("mysql")).count()!cache 1!Worker// action 2!messages.filter( .contains("php")).count()block 1cache 2WorkerDriverblock 2cache 3Workerblock 3

Spark Deconstructed: Log Mining Example// base RDD!val lines sc.textFile("hdfs://.")!!// transformed RDDs!val errors lines.filter( .startsWith("ERROR"))!val messages errors.map( .split("\t")).map(r r(1))!messages.cache()!discussing the other part!// action 1!messages.filter( .contains(“mysql")).count()!cache 1!processfrom cacheWorker// action 2!messages.filter( .contains("php")).count()block 1cache 2WorkerDriverblock 2cache 3Workerblock 3processfrom cacheprocessfrom cache

Spark Deconstructed: Log Mining Example// base RDD!val lines sc.textFile("hdfs://.")!!// transformed RDDs!val errors lines.filter( .startsWith("ERROR"))!val messages errors.map( .split("\t")).map(r r(1))!messages.cache()!discussing the other part!// action 1!messages.filter( .contains(“mysql")).count()!cache 1!Worker// action 2!messages.filter( .contains("php")).count()block 1cache 2WorkerDriverblock 2cache 3Workerblock 3

Spark Deconstructed:Looking at the RDD transformations andactions from another perspective // load error messages from a log into memory!// then interactively search for various patterns!// 2!!// base RDD!val lines sc.textFile("hdfs://.")!!transformations// transformed RDDs!val errors lines.filter( .startsWith("ERROR"))!val messages errors.map( .split("\t")).map(r r(1))!messages.cache()!!// action 1!messages.filter( .contains("mysql")).count()!!// action 2!messages.filter( .contains("php")).count()RDDRDDRDDRDDactionvalue

Spark Deconstructed:RDD// base RDD!val lines sc.textFile("hdfs://.")

Spark Deconstructed:transformationsRDDRDDRDDRDD// transformed RDDs!val errors lines.filter( .startsWith("ERROR"))!val messages errors.map( .split("\t")).map(r r(1))!messages.cache()

Spark Deconstructed:transformationsRDDRDDRDDRDDaction// action 1!messages.filter( .contains("mysql")).count()value

04: Getting StartedSimple Spark Appslab: 20 min

Simple Spark Apps: WordCountDefinition:countofteneacheach wordwordappearsappearscount howhow ofteninof texttextdocumentsdocumentsin aacollectioncollection ofThis simple program provides a good test casefor parallel processing, since it: void map (String doc id, String text):!for each word w in segment(text):!!!emit(w, "1");!requires a minimal amount of codevoid reduce (String word, Iterator group):!demonstrates use of both symbolic andnumeric values!isn’t many steps away from search indexingserves as a “Hello World” for Big Data apps!A distributed computing framework that can runWordCount efficiently in parallel at scalecan likely handle much larger and more interestingcompute problemsint count 0;!for each pc in group:!!count Int(pc);!emit(word, String(count));

Simple Spark Apps: WordCountScala:val f sc.textFile("README.md")!val wc f.flatMap(l l.split(" ")).map(word (word, 1)).reduceByKey( )!wc.saveAsTextFile("wc out")Python:from operator import add!f sc.textFile("README.md")!wc f.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add)!wc.saveAsTextFile("wc out")

Simple Spark Apps: WordCountScala:val f sc.textFile(val wcwc.saveAsTextFile(Checkpoint:Python: how many “Spark” keywords?from operatorf scwc fwc.saveAsTextFile(

Simple Spark Apps: Code DataThe code data for the following exampleof a join is available tage 1A:B:RDDE:map()map()stage 2C:join()D:map()map()stage 3

Simple Spark Apps: Source Codeval format new java.text.SimpleDateFormat("yyyy-MM-dd")!!case class Register (d: java.util.Date, uuid: String, cust id: String, lat: Float,lng: Float)!!case!class Click (d: java.util.Date, uuid: String, landing page: Int)!val reg sc.textFile("reg.tsv").map( .split("\t")).map(!r (r(1), Register(format.parse(r(0)), r(1), r(2), r(3).toFloat, r(4).toFloat))!)!!val clk sc.textFile("clk.tsv").map( .split("\t")).map(!c (c(1), Click(format.parse(c(0)), c(1), c(2).trim.toInt))!)!!reg.join(clk).collect()

Simple Spark Apps: Operator Graphscala reg.join(clk).toDebugString!res5: String !FlatMappedValuesRDD[46] at join at console :23 (1 partitions)!MappedValuesRDD[45] at join at console :23 (1 partitions)!CoGroupedRDD[44] at join at console :23 (1 partitions)!MappedRDD[36] at map at console :16 (1 partitions)!MappedRDD[35] at map at console :16 (1 partitions)!MappedRDD[34] at textFile at console :16 (1 partitions)!HadoopRDD[33] at textFile at console :16 (1 partitions)!MappedRDD[40] at map at console :16 (1 partitions)!MappedRDD[39] at map at console :16 (1 partitions)!MappedRDD[38] at textFile at console :16 (1 partitions)!HadoopRDD[37] at textFile at console :16 (1 partitions)cachedpartitionstage 1A:B:RDDE:map()map()stage 2C:join()D:map()map()stage 3

Simple Spark Apps: Operator Graphcachedpartitionstage 1A:B:RDDE:map()map()stage 2C:join()D:map()map()stage 3

Simple Spark Apps: AssignmentUsing the README.md and CHANGES.txt filesin the Spark directory:1. create RDDs to filter each line for thekeyword “Spark”2. perform a WordCount on each, i.e., so theresults are (K,V) pairs of (word, count)3. join the two RDDs

Simple Spark Apps: AssignmentUsing thein the Spark directory:1. create RDDs to filter each line for thekeyword “Spark”Checkpoint:2. perform a WordCount on each, i.e., so thehowmany“Spark”keywords?results are (K,V) pairs of (word, count)3. join the two RDDs

05: Getting StartedA Brief Historylecture: 35 min

A Brief History:2004MapReduce paper20022002MapReduce @ Google20042010Spark paper200620082008Hadoop Summit2006Hadoop @ Yahoo!2010201220142014Apache Spark top-level

A Brief History: MapReducecirca 1979 – Stanford, MIT, CMU, etc.set/list operations in LISP, Prolog, etc., for parallel /lisp.htmcirca 2004 – GoogleMapReduce: Simplified Data Processing on Large ClustersJeffrey Dean and Sanjay circa 2006 – ApacheHadoop, originating from the Nutch ProjectDoug Cuttingresearch.yahoo.com/files/cutting.pdfcirca 2008 – Yahooweb scale search indexingHadoop Summit, HUG, etc.developer.yahoo.com/hadoop/circa 2009 – Amazon AWSElastic MapReduceHadoop modified for EC2/S3, plus support for Hive, Pig, Cascading, etc.aws.amazon.com/elasticmapreduce/

A Brief History: MapReduceOpen Discussion:Enumerate several changes in data centertechnologies since 2002

A Brief History: MapReduceRich Freitas, IBM ility-gap/meanwhile, spinnydisks haven’t changedall that much d-technology-trends-ibm/

A Brief History: MapReduceMapReduce use cases showed two majorlimitations:1. difficultly of programming directly in MR2. performance bottlenecks, or batch notfitting the use casesIn short, MR doesn’t compose well for largeapplicationsTherefore, people built specialized systems asworkarounds

A Brief History: phLabStormGeneral Batch ProcessingSpecialized Systems:iterative, interactive, streaming, graph, etc.The State of Spark, and Where We're Going NextMatei ZahariaSpark Summit (2013)youtu.be/nU6vO2EJAb4TezS4

A Brief History: Spark2004MapReduce paper20022002MapReduce @ Google20042010Spark paper2006200820102008Hadoop Summit2006Hadoop @ Yahoo!Spark: Cluster Computing with Working SetsMatei Zaharia, Mosharaf Chowdhury,Michael J. Franklin, Scott Shenker, Ion StoicaUSENIX HotCloud oud spark.pdf!Resilient Distributed Datasets: A Fault-Tolerant Abstraction forIn-Memory Cluster ComputingMatei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave,Justin Ma, Murphy McCauley, Michael J. Franklin, Scott Shenker, Ion StoicaNSDI di12-final138.pdf201220142014Apache Spark top-level

A Brief History: SparkUnlike the various specialized systems, Spark’sgoal was to generalize MapReduce to supportnew apps within same engineTwo reasonably small additions are enough toexpress the previous models: fast data sharing general DAGsThis allows for an approach which is moreefficient for the engine, and much simplerfor the end users

A Brief History: SparkSome key points about Spark: handles batch, interactive, and real-timewithin a single framework native integration with Java, Python, Scalaprogramming at a higher level of abstractionmore general: map/reduce is just one setof supported constructs

A Brief History: Sparkused as libs, instead ofspecialized systems

A Brief History: Spark

A Brief History: Spark

(break)break: 15 min

03: Intro Spark AppsSpark Essentialslecture/lab: 45 min

Spark Essentials:Intro apps, showing examples in bothScala and Python Let’s start with the basic concepts uide.htmlusing, natively, with IPython Notebook:IPYTHON OPTS "notebook --pylab inline" ./bin/pyspark

Spark Essentials: SparkContextFirst thing that a Spark program does is createa SparkContext object, which tells Spark howto access a clusterIn the shell for either Scala or Python, this isthe sc variable, which is created automaticallyOther programs must use a constructor toinstantiate a new SparkContextThen in turn SparkContext gets used to createother variables

Spark Essentials: SparkContextScala:scala sc!res: spark.SparkContext spark.SparkContext@470d1f30Python: sc! pyspark.context.SparkContext object at 0x7f7570783350

Spark Essentials: MasterThe master parameter for a SparkContextdetermines which cluster to usemasterdescriptionlocalrun Spark locally with one worker thread(no parallelism)local[K]run Spark locally with K worker threads(ideally set to # cores)spark://HOST:PORTconnect to a Spark standalone cluster;PORT depends on config (7077 by default)mesos://HOST:PORTconnect to a Mesos cluster;PORT depends on config (5050 by default)

Spark Essentials: .htmlWorker NodeExecutortaskDriver ProgramcachetaskCluster ManagerSparkContextWorker NodeExecutortaskcachetask

Spark Essentials: Clusters1. master connects to a cluster manager toallocate resources across applications2. acquires executors on cluster nodes –processes run compute tasks, cache data3. sends app code to the executors4. sends tasks for the executors to runWorker NodeExecutortaskDriver ProgramcachetaskCluster ManagerSparkContextWorker NodeExecutortaskcachetask

Spark Essentials: RDDResilient Distributed Datasets (RDD) are theprimary abstraction in Spark – a fault-tolerantcollection of elements that can be operated onin parallelThere are currently two types: parallelized collections – take an existing Scalacollection and run functions on it in parallelHadoop datasets – run functions on each recordof a file in Hadoop distributed file system or anyother storage system supported by Hadoop

Spark Essentials: RDD two types of operations on RDDs:transformations and actions transformations are lazy(not computed immediately) the transformed RDD gets recomputedwhen an action is run on it (default) however, an RDD can be persisted intostorage in memory or disk

Spark Essentials: RDDScala:scala val data Array(1, 2, 3, 4, 5)!data: Array[Int] Array(1, 2, 3, 4, 5)!!scala val distData sc.parallelize(data)!distData: spark.RDD[Int] spark.ParallelCollection@10d13e3ePython: data [1, 2, 3, 4, 5]! data![1, 2, 3, 4, 5]!! distData sc.parallelize(data)! distData!ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:229

Spark Essentials: RDDSpark can create RDDs from any file stored in HDFSor other storage systems supported by Hadoop, e.g.,local file system, Amazon S3, Hypertable, HBase, etc.Spark supports text files, SequenceFiles, and anyother Hadoop InputFormat, and can also take adirectory or a glob (e.g. ue

Spark Essentials: RDDScala:scala val distFile sc.textFile("README.md")!distFile: spark.RDD[String] spark.HadoopRDD@1d4cee08Python: distFile sc.textFile("README.md")!14/04/19 23:42:40 INFO storage.MemoryStore: ensureFreeSpace(36827) calledwith curMem 0, maxMem 318111744!14/04/19 23:42:40 INFO storage.MemoryStore: Block broadcast 0 stored asvalues to memory (estimated size 36.0 KB, free 303.3 MB)! distFile!MappedRDD[2] at textFile at NativeMethodAccessorImpl.java:-2

Spark Essentials: TransformationsTransformations create a new dataset froman existing oneAll transformations in Spark are lazy: theydo not compute their results right away –instead they remember the transformationsapplied to some base dataset optimize the required calculations recover from lost data partitions

Spark Essentials: descriptionreturn a new distributed dataset formed by passingeach element of the source through a function funcreturn a new dataset formed by selecting thoseelements of the source on which func returns trueflatMap(func)similar to map, but each input item can be mappedto 0 or more output items (so func should return aSeq rather than a single item)sample(withReplacement,fraction, seed)sample a fraction fraction of the data, with or withoutreplacement, using a given random number generatorseedunion(otherDataset)return a new dataset that contains the union of theelements in the source dataset and the argumentdistinct([numTasks]))return a new dataset that contains the distinct elementsof the source dataset

Spark Essentials: ([numTasks])when called on a dataset of (K, V) pairs, returns adataset of (K, Seq[V]) pairsreduceByKey(func,[numTasks])when called on a dataset of (K, V) pairs, returnsa dataset of (K, V) pairs where the values for eachkey are aggregated using the given reduce functionsortByKey([ascending],[numTasks])when called on a dataset of (K, V) pairs where Kimplements Ordered, returns a dataset of (K, V)pairs sorted by keys in ascending or descending order,as specified in the boolean ascending argumentjoin(otherDataset,[numTasks])when called on datasets of type (K, V) and (K, W),returns a dataset of (K, (V, W)) pairs with all pairsof elements for each keycogroup(otherDataset,[numTasks])when called on datasets of type (K, V) and (K, W),returns a dataset of (K, Seq[V], Seq[W]) tuples –also called groupWithcartesian(otherDataset)when called on datasets of types T and U, returns adataset of (T, U) pairs (all pairs of elements)

Spark Essentials: TransformationsScala:val distFile sc.textFile("README.md")!distFile.map(l l.split(" ")).collect()!distFile.flatMap(l l.split(" ")).collect()distFile is a collection of linesPython:distFile sc.textFile("README.md")!distFile.map(lambda x: x.split(' ')).collect()!distFile.flatMap(lambda x: x.split(' ')).collect()

Spark Essentials: TransformationsScala:val distFile sc.textFile("README.md")!distFile.map(l l.split(" ")).collect()!distFile.flatMap(l l.split(" ")).collect()closuresPython:distFile sc.textFile("README.md")!distFile.map(lambda x: x.split(' ')).collect()!distFile.flatMap(lambda x: x.split(' ')).collect()

Spark Essentials: TransformationsScala:val distFile sc.textFile("README.md")!distFile.map(l l.split(" ")).collect()!distFile.flatMap(l l.split(" ")).collect()closuresPython:distFile sc.textFile("README.md")!distFile.map(lambda x: x.split(' ')).collect()!distFile.flatMap(lambda x: x.split(' ')).collect()looking at the output, how would youcompare results for map() vs. flatMap() ?

Spark Essentials: TransformationsUsing closures is now possible in Java 8 withlambda expressions support, see the Java-8.htmltransformationsRDDRDDRDDRDDactionvalue

Spark Essentials: TransformationsJava 7:JavaRDD String distFile sc.textFile("README.md");!!// Map each line to multiple words!JavaRDD String words distFile.flatMap(!new FlatMapFunction String, String () {!public Iterable String call(String line) {!return Arrays.asList(line.split(" "));!}!});Java 8:JavaRDD String distFile sc.textFile("README.md");!JavaRDD String words !distFile.flatMap(line - Arrays.asList(line.split(" ")));

Spark Essentials: Actionsactiondescriptionreduce(func)aggregate the elements of the dataset using a functionfunc (which takes two arguments and returns one),and should also be commutative and associative sothat it can be computed correctly in parallelcollect()return all the elements of the dataset as an array atthe driver program – usually useful after a filter orother operation that returns a sufficiently small subsetof the datacount()return the number of elements in the datasetfirst()return the first element of the dataset – similar totake(1)take(n)return an array with the first n elements of the dataset– currently not executed in parallel, instead the driverprogram computes all the elementstakeSample(withReplacement,fraction, seed)return an array with a random sample of num elementsof the dataset, with or without replacement, using thegiven random number generator seed

Spark Essentials: ActionsactiondescriptionsaveAsTextFile(path)write the elements of the dataset as a text file (or setof text files) in a given directory in the local filesystem,HDFS or any other Hadoop-supported file system.Spark will call toString on each element to convertit to a line of text in the filesaveAsSequenceFile(path)write the elements of the dataset as a HadoopSequenceFile in a given path in the local filesystem,HDFS or any other Hadoop-supported file system.Only available on RDDs of key-value pairs that eitherimplement Hadoop's Writable interface or areimplicitly convertible to Writable (Spark includesconversions for basic types like Int, Double, String,etc).countByKey()only available on RDDs of type (K, V). Returns a Map of (K, Int) pairs with the count of each keyforeach(func)run a function func on each element of the dataset –usually done for side effects such as updating anaccumulator variable or interacting with externalstorage systems

Spark Essentials: ActionsScala:val f sc.textFile("README.md")!val words f.flatMap(l l.split(" ")).map(word (word, 1))!words.reduceByKey( ).collect.foreach(println)Python:from operator import add!f sc.textFile("README.md")!words f.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1))!words.reduceByKey(add).collect()

Spark Essentials: PersistenceSpark can persist (or cache) a dataset inmemory across operationsEach node stores in memory any slices of itthat it computes and reuses them in otheractions on that dataset – often making futureactions more than 10x fasterThe cache is fault-tolerant: if any partitionof an RDD is lost, it will automatically berecomputed using the transformations thatoriginally created it

Spark Essentials: PersistencetransformationdescriptionMEMORY ONLYStore RDD as deserialized Java objects in the JVM.If the RDD does not fit in memory, some partitionswill not be cached and will be recomputed on the flyeach time they're needed. This is the default level.MEMORY AND DISKStore RDD as deserialized Java objects in the JVM.If the RDD does not fit in memory, store the partitionsthat don't fit on disk, and read them from there whenthey're needed.MEMORY ONLY SERStore RDD as serialized Java objects (one byte arrayper partition). This is generally more space-efficientthan deserialized objects, especially when using a fastserializer, but more CPU-intensive to read.MEMORY AND DISK SERSimilar to MEMORY ONLY SER, but spill partitionsthat don't fit in memory to disk instead of recomputingthem on the fly each time they're needed.DISK ONLYMEMORY ONLY 2,MEMORY AND DISK 2, etcStore the RDD partitions only on disk.Same as the levels above, but replicate each partitionon two cluster nodes.

Spark Essentials: PersistenceScala:val f sc.textFile("README.md")!val w f.flatMap(l l.split(" ")).map(word (word, 1)).cache()!w.reduceByKey( ).collect.foreach(println)Python:from operator import add!f sc.textFile("README.md")!w f.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).cache()!w.reduceByKey(add).collect()

Spark Essentials: Broadcast VariablesBroadcast variables let programmer keep aread-only variable cached on each machinerather than shipping a copy of it with tasksFor example, to give every node a copy ofa large input dataset efficientlySpark also attempts to distribute broadcastvariables using efficient broadcast algorithmsto reduce communication cost

Spark Essentials: Broadcast VariablesScala:val broadcastVar sc.broadcast(Array(1, 2, 3))!broadcastVar.valuePython:broadcastVar sc.broadcast(list(range(1, 4)))!broadcastVar.value

Spark Essentials: AccumulatorsAccumulators are variables that can only be“added” to through an associative operationUsed to implement counters and sums,efficiently in parallelSpark natively supports accumulators ofnumeric value types and standard mutablecollections, and programmers can extendfor new typesOnly the driver program can read anaccumulator’s value, not the tasks

Spark Essentials: AccumulatorsScala:val accum sc.accumulator(0)!sc.parallelize(Array(1, 2, 3, 4)).foreach(x accum x)!!accum.valuePython:accum sc.accumulator(0)!rdd sc.parallelize([1, 2, 3, 4])!def f(x):!global accum!accum x!!rdd.foreach(f)!!accum.value

Spark Essentials: AccumulatorsScala:val accum sc.accumulator(0)!sc.parallelize(Array(1, 2, 3, 4)).foreach(x accum x)!!accum.valuePython:accum sc.accumulator(0)!rdd sc.parallelize([1, 2, 3, 4])!def f(x):!global accum!accum x!!rdd.foreach(f)!!accum.valuedriver-side

Spark Essentials: (K,V) pairsScala:val pair (a, b)!!pair. 1 // a!pair. 2 // bPython:pair (a, b)!!pair[0] # a!pair[1] # bJava:Tuple2 pair new Tuple2(a, b);!!pair. 1 // a!pair. 2 // b

Spark Essentials: API DetailsFor more details about the Scala/Java tml#org.apache.spark.package!For more details about the Python API:spark.apache.org/docs/latest/api/python/

03: Intro Spark AppsSpark Exampleslecture/lab: 10 min

Spark Examples: Estimate PiNext, try using a Monte Carlo method to estimat

By end of day, participants will be comfortable with the following:! open a Spark Shell! develop Spark apps for typical use cases! use of some ML algorithms! explore data sets loaded from HDFS, etc.! review of Spark SQL, Spark Streaming, MLlib! follow-up courses and certification! developer community resources, events, etc.! return to workplace and demo use of .