Conquering Big Data With BDAS (Berkeley Data Analytics)

Transcription

UCBERKELEYConquering Big Data withBDAS (Berkeley Data Analytics)Ion StoicaUC Berkeley / Databricks / Conviva

Extracting Value from Big DataInsights, diagnosis, e.g.,» Why is user engagement dropping?» Why is the system slow?» Detect spam, DDoS attacksDecisions, e.g.,» Decide what features to add to a product» Personalized medical treatment» Decide when to change an aircraft engine part» Decide what ads to showData only as useful as the decisions it enables

What do We Need?Interactive queries: enable faster decisions» E.g., identify why a site is slow and fix itQueries on streaming data: enable decisionson real-time data» E.g., fraud detection, detect DDoS attacksSophisticated data processing: enable“better” decisions» E.g., anomaly detection, trend analysis

Our GoalBatchSingleFramework! "InteractiveStreamingSupport batch, streaming, and interactive computations in a unified frameworkEasy to develop sophisticated algorithms (e.g., graph, ML algos)

The Berkeley AMPLablgorithmsJanuary 2011 – 2017» 8 faculty» 40 students» 3 software engineer teamOrganized for collaborationachineseopleAMPCamp3(August, 2013)3 day retreats(twice a year)220 campers(100 companies)

The Berkeley AMPLabGovernmental and industrial funding:Goal: Next generation of open source dataanalytics stack for industry & academia:Berkeley Data Analytics Stack (BDAS)

Data Processing StackData Processing LayerResource Management LayerStorage Layer

Hadoop StackHivePig ImpalaData ProcessingLayerStormHadoop MRYarnResourceHadoopManagementLayerHDFS, S3, StorageLayer

BDAS ngLayer MLlibSharkSQLSparkMesosResource ManagementLayerTachyonStorageLayerHDFS, S3,

How do BDAS & Hadoop fit together?BlinkDBSparkGraphXStreamingShark SQLSparkHadoopMesosYarnTachyonHDFS, S3, HDFS, S3, MLBaseMLlib

How do BDAS & Hadoop fit HivePigGraphXSharkMLXStramingStreamingShark SQLImpalaStormMLlibSQLlibrarySparkSparkHadoop MRHadoopMesosYarnTachyonHDFS, S3, HDFS, S3,

How do BDAS & Hadoop fit baseMLlibraryHivePigHadoop MRHadoopMesosYarnTachyonHDFS, S3, HDFS, S3, ImpalaStorm

Apache MesosMLBaseSpark BlinkDBGraphXStream. SharkMLlibSparkMesosTachyonHDFS, S3, Enable multiple frameworks to share samecluster resources (e.g., Hadoop, Storm, Spark)Twitter’s large scale deployment» 10,000 servers,» 500 engineers running jobs on MesosThird party Mesos schedulers» AirBnB’s Chronos» Twitter’s AuroraMesospehere: startup to commercialize Mesos

Apache SparkDistributed Execution EngineMLBaseSpark BlinkDBGraphXStream. SharkMLlibSparkMesosTachyonHDFS, S3, » Fault-tolerant, efficient in-memory storage» Low-latency large-scale task scheduler» Powerful prog. model and APIs: Python, Java, ScalaFast: up to 100x faster than Hadoop MR» Can run sub-second jobs on hundreds of nodesEasy to use: 2-5x less code than Hadoop MRGeneral: support interactive & iterative apps

Fault ToleranceMLBaseSpark BlinkDBGraphXStream. SharkMLlibNeed to achieveSparkMesosTachyonHDFS, S3, » High throughput reads and writes» Efficient memory usageReplication» Writes bottlenecked by network» Inefficient: store multiple replicasPersist update logs» Big data processing can generate massive logsOur solution: Resilient Distributed Datasets (RDDs)» Partitioned collection of immutable records» Use lineage to reconstruct lost data

MLBaseSpark BlinkDBGraphXStream. SharkMLlibRDD ExampleSparkMesosTachyonHDFS, S3, Two-partition RDD A {A1, A2} stored on disk1) filter and cache à RDD B2) joinà RDD C3) aggregate à RDD DfilterB1joinC1RDD AA1agg.A2filterB2joinC2D

MLBaseSpark BlinkDBGraphXStream. SharkMLlibRDD ExampleSparkMesosTachyonHDFS, S3, C1 lost due to node failure before reduce finishesfilterB1joinC1RDD AA1agg.A2filterB2joinC2D

MLBaseSpark BlinkDBGraphXStream. SharkMLlibRDD ExampleSparkMesosTachyonHDFS, S3, C1 lost due to node failure before reduce finishesReconstruct C1, eventually, on different nodefilterB1joinC1RDD AA1agg.A2filterB2joinC2D

Spark StreamingMLBaseSpark BlinkDBGraphXStream. SharkMLlibSparkMesosTachyonHDFS, S3, Existing solutions: recordby-record processingLow latencyinputrecordsHard to» Provide fault tolerance» Mitigate stragglerfilternode 1inputrecordsfilternode 2agg.node 3

MLBaseSpark BlinkDBGraphXStream. SharkMLlibSpark StreamingImplemented as sequenceof micro-jobs ( 1s)SparkMesosTachyonHDFS, S3, live data stream» Fault tolerant» Mitigate stragglers» Ensure exactly one semanticsSparkStreamingbatches of XsecondsprocessedresultsSparkSpark & SparkStreaming: batch, interactive,and streaming computations

SharkMLBaseSpark BlinkDBGraphXStream. SharkMLlibSparkMesosTachyonHDFS, S3, Hive over Spark: full support for HQL and UDFsUp to 100x when input is in memoryUp to 5-10x when input is on diskRunning on hundreds of nodes at Yahoo!

25201050StreamingResponse Time (s)2520151014040120300Interactive (SQL)200Hadoop35Time per Iteration (s)Shark (mem)Shark (disk)Impala (mem)5Hive45Impala (disk)35Spark15Storm30SparkThroughput (MB/s/node)Not Only General, but Fast100806040Batch(ML, Spark)

Spark DistributionIncludes» Spark (core)» Spark Streaming» GraphX (alpha release)» MLlibIn the future:» Shark» TachyonMLBaseSpark BlinkDBGraphXStream. SharkMLlibSparkMesosTachyonHDFS, S3,

Explosive GrowthMLBaseSpark BlinkDBGraphXStream. SharkMLlibSparkMesosTachyonHDFS, S3, 2,500 Spark meetup users180 contributors from 30 companies140!1st Spark Summit» 450 attendees» 140 companies2nd Spark Summit» June 30 – July 2Contributors in past year120!100!80!60!40!20!0!Giraph!Storm!Tez!

Explosive GrowthMLBaseSpark BlinkDBGraphXStream. SharkMLlibSparkMesosTachyonHDFS, S3, Databricks: founded in 2013 to commercializeSpark PlatformIncluded in all major Hadoop Distributions» Cloudera» MapR» Hortonworks (technical preview)Enterprise support: Cloudera, MapR, DatastaxSpark and Shark available on Amazon’s EMR

BlinkDBMLBaseSpark BlinkDBGraphXStream. SharkMLlibSparkMesosTachyonHDFS, S3, Trade between query performance512GBand accuracy using samplingWhy?» In-memory processing doesn’t 40-60GB/sguarantee interactive processing E.g., 10’s sec just to scan 512 GBRAM! Gap between memory capacity andtransfer rate increasing16 coresdoubles every18 monthsdoubles every36 months

Key InsightComputationsdon’talwaysneedexactanswers Input often noisy: exact computations do notguarantee exact answers Error often acceptable if small and boundedBest scale 200g errorSpeedometers OmniPod Insulin Pump 2.5 % error 0.96 % error(edmunds.com) (www.ncbi.nlm.nih.gov/pubmed/22226273)

Approach: SamplingCompute results on samples instead of full data» Typically, error depends on sample size (n) not onoriginal data size, i.e., error α 1 / nCan trade between answer’s latency andaccuracy and cost

BlinkDB InterfaceSELECT avg(sessionTime)FROM TableWHERE city ‘San Francisco’ AND ‘dt 2012-9-2’234.23 15.32WITHIN 1 SECONDS

BlinkDB InterfaceSELECT avg(sessionTime)FROM TableWHERE city ‘San Francisco’ AND ‘dt 2012-9-2’WITHIN 2 SECONDS234.23 15.32239.46 4.96SELECTavg(sessionTime)FROMTableWHEREcity ‘SanFrancisco’AND‘dt 2012- ‐9- ‐2’ERROR0.1CONFIDENCE95.0%

Quick ResultsDataset» 365 mil rows, 204GB on disk» 600 GB in memory (deserialized format)Query: query computing 95-th percentile25 EC2 instances with» 4 cores» 15GB RAM

Query Response ery Response Time10x as response timeis dominated by I/O10310-1181310810-210-310-410-5Fractionof full data

Query Response ery Response TimeError Bars(0.02%)10310-1(0.07%) (1.1%) (3.4%) (11%)1813108 Fractionof full data10-210-310-410-5

TABLEOriginalDataSampling ModuleBlinkDB OverviewOffline-sampling:Optimal set ofsamples acrossdifferent dimensions(columns or sets ofcolumns) to supportad-hoc exploratoryqueries

BlinkDB OverviewOriginalDataSampling ModuleTABLESample Placement:Samples striped over100s or 1,000s ofmachines both ondisks and in-memory.On-DiskSamplesIn-MemorySamples

BlinkDB OverviewQuery PlanSELECTfoo (*)FROMTABLEWITHIN 2Sample SelectionTABLEOriginalDataSampling s

BlinkDB OverviewQuery PlanSELECTfoo (*)FROMTABLEWITHIN 2Sample SelectionHiveQL/SQLQueryOriginalDataSampling ModuleTABLEOnline sampleselection to pickbest sample(s)based on querylatency les

BlinkDB OverviewSELECTfoo (*)FROMTABLEWITHIN 2New Query PlanError Bars &Confidence IntervalsSample mpling ModuleTABLEResult182.23 5.56(95% confidence)On-DiskSamplesIn-MemorySamplesParallel queryexecution onmultiple samplesstriped acrossmultiple machines.

BlinkDB ChallengesWhich set of samples to build given a storagebudget?Which sample to run the query on?How to accurately estimate the error?

BlinkDB ChallengesWhich set of samples to build given a storagebudget?Which sample to run the query on?How to accurately estimate the error?

How to Accurately Estimate Error?Close formulas for limited number of operators» E.g., count, mean, percentilesWhat about user defined functions (UDFs)?» Use bootsrap technique

BootstrapQuantify accuracy of a sample estimator, f()DistributionXf (X)can’tcomputef (X)aswedon’thaveXf (S)whatisf(S)’serror?randomsampleSf (S1)S1Sk Si N samplingwithreplacement S Nf (Sk)usef(S1), , f(SK)tocompute estimator:mean(f(Si) error,e.g.:stdev(f(Si))

Bootstrap for BlinkDBQuantify accuracy of a query on a sample tableOriginalTableTQ(T)Q(T) takestoolong!Q(S)whatisQ(S)’serror?sampleSQ (S1)S1Sk Si N samplingwithreplacement S NQ (Sk)useQ(S1), , v(Q(Si))

How Do You Know ErrorEstimation is Correct?Assumption: f() is Hadamard differentiable» How do you know an UDF is Hadamarddifferentiable?» Sufficient, not necessary conditionOnly approximations of true error distribution(true for closed formula as well)Previous work doesn’t address errorestimation correctness

How Bad it Is?Workloads» Conviva: 268 real-world 113 hadcustom User-Defined Functions» FacebookClosed Forms/Bootstrap fails for» 3 in 10 Conviva Queries» 4 in 10 Facebook QueriesNeed runtime diagnosis!

Error DiagnosisCompare bootstrapping with ground truth forsmall samplesCheck whether error improves as sample sizeincreases

Ground Truth (Approximation)TOriginalTable

Ground Truth (Approximation)OriginalTableTQ (S1)ξ stdev(Q(S j )) S1SpQ (Sp)Estimator qualityassessment

Ground Truth and BootstrapSpQ (S11)Q (S1k)Sp1Q (Sp1)SpkQ (Spk)ξ 1* stdev(Q(S1 j )) S1k S1 S11 TBootstrap onIndividual Samples OriginalTableξ *p stdev(Q(S pj ))

Ground Truth vs. Bootstrapξ i mean(ξ ij )ξ *i1 stdev(Q(Si1 j )) ξ *ip stdev(Q(Sipj ))Relative ErrorBootstrapGround Truthn1n2Sample Sizen3

Ground Truth vs. Bootstrapξ i mean(ξ ij )ξ *i1 stdev(Q(Si1 j )) ξ *ip stdev(Q(Sipj ))Relative ErrorBootstrapGround Truthn1n2Sample Sizen3

Ground Truth vs. Bootstrapξ i mean(ξ ij )ξ *i1 stdev(Q(Si1 j )) ξ *ip stdev(Q(Sipj ))Relative ErrorBootstrapGround Truthn1n2Sample Sizen3

How Well Does it Work in Practice?Evaluated on Conviva Query WorkloadDiagnostic predicted that 207 (77%) queriescan be approximated» False Negatives: 18» False Positives: 3 (conditional UDFs)

OverheadBoostrap and Diagnostic overheads can bevery large» Diagnostics requires to run 30,000 small queries!Optimization» Pushdown filter» One pass executionCan reduce overhead by orders of magnitude

Query Bootstrap 0001Diagnosis overheadBootstrap overheadQuery Response Time723Bootsrap Diagnosis 53xmore than query itself!46710-110-244210-343610-442610-5Fractionof full data

Optimization: Filter PushdownPerform filtering before resampling» Can dramatically reduce I/OnfilterNresamplen SamplenresamplefilterNresample nnresamplenfilterN elementsAssume n NSampleN

Optimization: One Pass Exec.[k]resample resampleCompute all k resultsin one pass!resample [k]filterfilterSampleSamplePoissoned resampling:construct all k resamplesin one pass!For each resample add new column» Specify how many times each row has been selected» Query generate results for each resamples in one pass

Query Bootstrap Diagnosis"(with Filter Pushdown and Single Pass)sec.1020300Diagnosis overheadBootstrap overheadQuery Response Time250200150120Less than 70% overhead(80x ionof full data

Open SourceBlinkDB is open-sourced and released athttp://blinkdb.org

Open SourceUsed regularly by 100 engineers at Facebook Inc.60

Lesson LearnedFocus on novel usage scenariosBe paranoid about simplicity» Very hard to build real complex systems in academiaBasic functionality first, performance opt. next

Example: MesosFocus on novel usage scenarios» Let multiple frameworks share same clusterBe paranoid about simplicity» Just enforce allocation policy across frameworks,e.g., fair sharing» Let frameworks decide which slots they accept,which tasks to run, and when to run them» First release: 10K codeBasic functionality first, performance opt. next» First, support arbitrary scheduling, but inneficient» Latter added filters to improve performance

Example: SparkFocus on novel usage scenarios» Interactive queries and iterative (ML) algorithmsBe paranoid about simplicity» Immutable data; avoid complex consistencyprotocols» First release: 2K codeBasic functionality first, performance opt. next» First, no automatic check-pointing» Latter to add automatic checkpoint

Example: BlinkDBFocus on novel usage scenarios» Approximate computations; error diagnosisBe paranoid about simplicity» Use bootstrap as generic technique; no support forclose formulaBasic functionality first, performance opt. next» First, straightforward error estimation, very expensive» Latter, optimizations to reduce overhead» First, manual sample generation» Later, automatic sample generation (to do)

SummaryBDAS: address next Big Data challengesUnify batch, interactive, and streamingBatchSparkInteractiveEnable users to trade betweenStreaming» Response time, accuracy, and costExplosive adoption» 30 companies, 180 individual contributors (Spark)» Spark platform included in all major Hadoop distros» Enterprise grade support and professional services forboth Mesos and Spark platform

Conquering Big Data with BDAS (Berkeley Data Analytics) UC#BERKELEY# Ion Stoica UC Berkeley / Databricks / Conviva. Extracting Value from Big Data . » Use lineage to reconstruct lost data Mesos Spark Spark Stream. Shark BlinkDB GraphX MLlib MLBase Tachyon HDFS, S3, RDD Example Two-partition RDD A {A 1, A 2} stored on disk