Transcription
UCBERKELEYConquering Big Data withBDAS (Berkeley Data Analytics)Ion StoicaUC Berkeley / Databricks / Conviva
Extracting Value from Big DataInsights, diagnosis, e.g.,» Why is user engagement dropping?» Why is the system slow?» Detect spam, DDoS attacksDecisions, e.g.,» Decide what features to add to a product» Personalized medical treatment» Decide when to change an aircraft engine part» Decide what ads to showData only as useful as the decisions it enables
What do We Need?Interactive queries: enable faster decisions» E.g., identify why a site is slow and fix itQueries on streaming data: enable decisionson real-time data» E.g., fraud detection, detect DDoS attacksSophisticated data processing: enable“better” decisions» E.g., anomaly detection, trend analysis
Our GoalBatchSingleFramework! "InteractiveStreamingSupport batch, streaming, and interactive computations in a unified frameworkEasy to develop sophisticated algorithms (e.g., graph, ML algos)
The Berkeley AMPLablgorithmsJanuary 2011 – 2017» 8 faculty» 40 students» 3 software engineer teamOrganized for collaborationachineseopleAMPCamp3(August, 2013)3 day retreats(twice a year)220 campers(100 companies)
The Berkeley AMPLabGovernmental and industrial funding:Goal: Next generation of open source dataanalytics stack for industry & academia:Berkeley Data Analytics Stack (BDAS)
Data Processing StackData Processing LayerResource Management LayerStorage Layer
Hadoop StackHivePig ImpalaData ProcessingLayerStormHadoop MRYarnResourceHadoopManagementLayerHDFS, S3, StorageLayer
BDAS ngLayer MLlibSharkSQLSparkMesosResource ManagementLayerTachyonStorageLayerHDFS, S3,
How do BDAS & Hadoop fit together?BlinkDBSparkGraphXStreamingShark SQLSparkHadoopMesosYarnTachyonHDFS, S3, HDFS, S3, MLBaseMLlib
How do BDAS & Hadoop fit HivePigGraphXSharkMLXStramingStreamingShark SQLImpalaStormMLlibSQLlibrarySparkSparkHadoop MRHadoopMesosYarnTachyonHDFS, S3, HDFS, S3,
How do BDAS & Hadoop fit baseMLlibraryHivePigHadoop MRHadoopMesosYarnTachyonHDFS, S3, HDFS, S3, ImpalaStorm
Apache MesosMLBaseSpark BlinkDBGraphXStream. SharkMLlibSparkMesosTachyonHDFS, S3, Enable multiple frameworks to share samecluster resources (e.g., Hadoop, Storm, Spark)Twitter’s large scale deployment» 10,000 servers,» 500 engineers running jobs on MesosThird party Mesos schedulers» AirBnB’s Chronos» Twitter’s AuroraMesospehere: startup to commercialize Mesos
Apache SparkDistributed Execution EngineMLBaseSpark BlinkDBGraphXStream. SharkMLlibSparkMesosTachyonHDFS, S3, » Fault-tolerant, efficient in-memory storage» Low-latency large-scale task scheduler» Powerful prog. model and APIs: Python, Java, ScalaFast: up to 100x faster than Hadoop MR» Can run sub-second jobs on hundreds of nodesEasy to use: 2-5x less code than Hadoop MRGeneral: support interactive & iterative apps
Fault ToleranceMLBaseSpark BlinkDBGraphXStream. SharkMLlibNeed to achieveSparkMesosTachyonHDFS, S3, » High throughput reads and writes» Efficient memory usageReplication» Writes bottlenecked by network» Inefficient: store multiple replicasPersist update logs» Big data processing can generate massive logsOur solution: Resilient Distributed Datasets (RDDs)» Partitioned collection of immutable records» Use lineage to reconstruct lost data
MLBaseSpark BlinkDBGraphXStream. SharkMLlibRDD ExampleSparkMesosTachyonHDFS, S3, Two-partition RDD A {A1, A2} stored on disk1) filter and cache à RDD B2) joinà RDD C3) aggregate à RDD DfilterB1joinC1RDD AA1agg.A2filterB2joinC2D
MLBaseSpark BlinkDBGraphXStream. SharkMLlibRDD ExampleSparkMesosTachyonHDFS, S3, C1 lost due to node failure before reduce finishesfilterB1joinC1RDD AA1agg.A2filterB2joinC2D
MLBaseSpark BlinkDBGraphXStream. SharkMLlibRDD ExampleSparkMesosTachyonHDFS, S3, C1 lost due to node failure before reduce finishesReconstruct C1, eventually, on different nodefilterB1joinC1RDD AA1agg.A2filterB2joinC2D
Spark StreamingMLBaseSpark BlinkDBGraphXStream. SharkMLlibSparkMesosTachyonHDFS, S3, Existing solutions: recordby-record processingLow latencyinputrecordsHard to» Provide fault tolerance» Mitigate stragglerfilternode 1inputrecordsfilternode 2agg.node 3
MLBaseSpark BlinkDBGraphXStream. SharkMLlibSpark StreamingImplemented as sequenceof micro-jobs ( 1s)SparkMesosTachyonHDFS, S3, live data stream» Fault tolerant» Mitigate stragglers» Ensure exactly one semanticsSparkStreamingbatches of XsecondsprocessedresultsSparkSpark & SparkStreaming: batch, interactive,and streaming computations
SharkMLBaseSpark BlinkDBGraphXStream. SharkMLlibSparkMesosTachyonHDFS, S3, Hive over Spark: full support for HQL and UDFsUp to 100x when input is in memoryUp to 5-10x when input is on diskRunning on hundreds of nodes at Yahoo!
25201050StreamingResponse Time (s)2520151014040120300Interactive (SQL)200Hadoop35Time per Iteration (s)Shark (mem)Shark (disk)Impala (mem)5Hive45Impala (disk)35Spark15Storm30SparkThroughput (MB/s/node)Not Only General, but Fast100806040Batch(ML, Spark)
Spark DistributionIncludes» Spark (core)» Spark Streaming» GraphX (alpha release)» MLlibIn the future:» Shark» TachyonMLBaseSpark BlinkDBGraphXStream. SharkMLlibSparkMesosTachyonHDFS, S3,
Explosive GrowthMLBaseSpark BlinkDBGraphXStream. SharkMLlibSparkMesosTachyonHDFS, S3, 2,500 Spark meetup users180 contributors from 30 companies140!1st Spark Summit» 450 attendees» 140 companies2nd Spark Summit» June 30 – July 2Contributors in past year120!100!80!60!40!20!0!Giraph!Storm!Tez!
Explosive GrowthMLBaseSpark BlinkDBGraphXStream. SharkMLlibSparkMesosTachyonHDFS, S3, Databricks: founded in 2013 to commercializeSpark PlatformIncluded in all major Hadoop Distributions» Cloudera» MapR» Hortonworks (technical preview)Enterprise support: Cloudera, MapR, DatastaxSpark and Shark available on Amazon’s EMR
BlinkDBMLBaseSpark BlinkDBGraphXStream. SharkMLlibSparkMesosTachyonHDFS, S3, Trade between query performance512GBand accuracy using samplingWhy?» In-memory processing doesn’t 40-60GB/sguarantee interactive processing E.g., 10’s sec just to scan 512 GBRAM! Gap between memory capacity andtransfer rate increasing16 coresdoubles every18 monthsdoubles every36 months
Key InsightComputationsdon’talwaysneedexactanswers Input often noisy: exact computations do notguarantee exact answers Error often acceptable if small and boundedBest scale 200g errorSpeedometers OmniPod Insulin Pump 2.5 % error 0.96 % error(edmunds.com) (www.ncbi.nlm.nih.gov/pubmed/22226273)
Approach: SamplingCompute results on samples instead of full data» Typically, error depends on sample size (n) not onoriginal data size, i.e., error α 1 / nCan trade between answer’s latency andaccuracy and cost
BlinkDB InterfaceSELECT avg(sessionTime)FROM TableWHERE city ‘San Francisco’ AND ‘dt 2012-9-2’234.23 15.32WITHIN 1 SECONDS
BlinkDB InterfaceSELECT avg(sessionTime)FROM TableWHERE city ‘San Francisco’ AND ‘dt 2012-9-2’WITHIN 2 SECONDS234.23 15.32239.46 4.96SELECTavg(sessionTime)FROMTableWHEREcity ‘SanFrancisco’AND‘dt 2012- ‐9- ‐2’ERROR0.1CONFIDENCE95.0%
Quick ResultsDataset» 365 mil rows, 204GB on disk» 600 GB in memory (deserialized format)Query: query computing 95-th percentile25 EC2 instances with» 4 cores» 15GB RAM
Query Response ery Response Time10x as response timeis dominated by I/O10310-1181310810-210-310-410-5Fractionof full data
Query Response ery Response TimeError Bars(0.02%)10310-1(0.07%) (1.1%) (3.4%) (11%)1813108 Fractionof full data10-210-310-410-5
TABLEOriginalDataSampling ModuleBlinkDB OverviewOffline-sampling:Optimal set ofsamples acrossdifferent dimensions(columns or sets ofcolumns) to supportad-hoc exploratoryqueries
BlinkDB OverviewOriginalDataSampling ModuleTABLESample Placement:Samples striped over100s or 1,000s ofmachines both ondisks and in-memory.On-DiskSamplesIn-MemorySamples
BlinkDB OverviewQuery PlanSELECTfoo (*)FROMTABLEWITHIN 2Sample SelectionTABLEOriginalDataSampling s
BlinkDB OverviewQuery PlanSELECTfoo (*)FROMTABLEWITHIN 2Sample SelectionHiveQL/SQLQueryOriginalDataSampling ModuleTABLEOnline sampleselection to pickbest sample(s)based on querylatency les
BlinkDB OverviewSELECTfoo (*)FROMTABLEWITHIN 2New Query PlanError Bars &Confidence IntervalsSample mpling ModuleTABLEResult182.23 5.56(95% confidence)On-DiskSamplesIn-MemorySamplesParallel queryexecution onmultiple samplesstriped acrossmultiple machines.
BlinkDB ChallengesWhich set of samples to build given a storagebudget?Which sample to run the query on?How to accurately estimate the error?
BlinkDB ChallengesWhich set of samples to build given a storagebudget?Which sample to run the query on?How to accurately estimate the error?
How to Accurately Estimate Error?Close formulas for limited number of operators» E.g., count, mean, percentilesWhat about user defined functions (UDFs)?» Use bootsrap technique
BootstrapQuantify accuracy of a sample estimator, f()DistributionXf (X)can’tcomputef (X)aswedon’thaveXf (S)whatisf(S)’serror?randomsampleSf (S1)S1Sk Si N samplingwithreplacement S Nf (Sk)usef(S1), , f(SK)tocompute estimator:mean(f(Si) error,e.g.:stdev(f(Si))
Bootstrap for BlinkDBQuantify accuracy of a query on a sample tableOriginalTableTQ(T)Q(T) takestoolong!Q(S)whatisQ(S)’serror?sampleSQ (S1)S1Sk Si N samplingwithreplacement S NQ (Sk)useQ(S1), , v(Q(Si))
How Do You Know ErrorEstimation is Correct?Assumption: f() is Hadamard differentiable» How do you know an UDF is Hadamarddifferentiable?» Sufficient, not necessary conditionOnly approximations of true error distribution(true for closed formula as well)Previous work doesn’t address errorestimation correctness
How Bad it Is?Workloads» Conviva: 268 real-world 113 hadcustom User-Defined Functions» FacebookClosed Forms/Bootstrap fails for» 3 in 10 Conviva Queries» 4 in 10 Facebook QueriesNeed runtime diagnosis!
Error DiagnosisCompare bootstrapping with ground truth forsmall samplesCheck whether error improves as sample sizeincreases
Ground Truth (Approximation)TOriginalTable
Ground Truth (Approximation)OriginalTableTQ (S1)ξ stdev(Q(S j )) S1SpQ (Sp)Estimator qualityassessment
Ground Truth and BootstrapSpQ (S11)Q (S1k)Sp1Q (Sp1)SpkQ (Spk)ξ 1* stdev(Q(S1 j )) S1k S1 S11 TBootstrap onIndividual Samples OriginalTableξ *p stdev(Q(S pj ))
Ground Truth vs. Bootstrapξ i mean(ξ ij )ξ *i1 stdev(Q(Si1 j )) ξ *ip stdev(Q(Sipj ))Relative ErrorBootstrapGround Truthn1n2Sample Sizen3
Ground Truth vs. Bootstrapξ i mean(ξ ij )ξ *i1 stdev(Q(Si1 j )) ξ *ip stdev(Q(Sipj ))Relative ErrorBootstrapGround Truthn1n2Sample Sizen3
Ground Truth vs. Bootstrapξ i mean(ξ ij )ξ *i1 stdev(Q(Si1 j )) ξ *ip stdev(Q(Sipj ))Relative ErrorBootstrapGround Truthn1n2Sample Sizen3
How Well Does it Work in Practice?Evaluated on Conviva Query WorkloadDiagnostic predicted that 207 (77%) queriescan be approximated» False Negatives: 18» False Positives: 3 (conditional UDFs)
OverheadBoostrap and Diagnostic overheads can bevery large» Diagnostics requires to run 30,000 small queries!Optimization» Pushdown filter» One pass executionCan reduce overhead by orders of magnitude
Query Bootstrap 0001Diagnosis overheadBootstrap overheadQuery Response Time723Bootsrap Diagnosis 53xmore than query itself!46710-110-244210-343610-442610-5Fractionof full data
Optimization: Filter PushdownPerform filtering before resampling» Can dramatically reduce I/OnfilterNresamplen SamplenresamplefilterNresample nnresamplenfilterN elementsAssume n NSampleN
Optimization: One Pass Exec.[k]resample resampleCompute all k resultsin one pass!resample [k]filterfilterSampleSamplePoissoned resampling:construct all k resamplesin one pass!For each resample add new column» Specify how many times each row has been selected» Query generate results for each resamples in one pass
Query Bootstrap Diagnosis"(with Filter Pushdown and Single Pass)sec.1020300Diagnosis overheadBootstrap overheadQuery Response Time250200150120Less than 70% overhead(80x ionof full data
Open SourceBlinkDB is open-sourced and released athttp://blinkdb.org
Open SourceUsed regularly by 100 engineers at Facebook Inc.60
Lesson LearnedFocus on novel usage scenariosBe paranoid about simplicity» Very hard to build real complex systems in academiaBasic functionality first, performance opt. next
Example: MesosFocus on novel usage scenarios» Let multiple frameworks share same clusterBe paranoid about simplicity» Just enforce allocation policy across frameworks,e.g., fair sharing» Let frameworks decide which slots they accept,which tasks to run, and when to run them» First release: 10K codeBasic functionality first, performance opt. next» First, support arbitrary scheduling, but inneficient» Latter added filters to improve performance
Example: SparkFocus on novel usage scenarios» Interactive queries and iterative (ML) algorithmsBe paranoid about simplicity» Immutable data; avoid complex consistencyprotocols» First release: 2K codeBasic functionality first, performance opt. next» First, no automatic check-pointing» Latter to add automatic checkpoint
Example: BlinkDBFocus on novel usage scenarios» Approximate computations; error diagnosisBe paranoid about simplicity» Use bootstrap as generic technique; no support forclose formulaBasic functionality first, performance opt. next» First, straightforward error estimation, very expensive» Latter, optimizations to reduce overhead» First, manual sample generation» Later, automatic sample generation (to do)
SummaryBDAS: address next Big Data challengesUnify batch, interactive, and streamingBatchSparkInteractiveEnable users to trade betweenStreaming» Response time, accuracy, and costExplosive adoption» 30 companies, 180 individual contributors (Spark)» Spark platform included in all major Hadoop distros» Enterprise grade support and professional services forboth Mesos and Spark platform
Conquering Big Data with BDAS (Berkeley Data Analytics) UC#BERKELEY# Ion Stoica UC Berkeley / Databricks / Conviva. Extracting Value from Big Data . » Use lineage to reconstruct lost data Mesos Spark Spark Stream. Shark BlinkDB GraphX MLlib MLBase Tachyon HDFS, S3, RDD Example Two-partition RDD A {A 1, A 2} stored on disk