Druid For Real-time Analysis - GitHub Pages

Transcription

Druid for real-time analysisYann Esposito7 Avril 2016AbstractDruid explained with high altitude point of viewContents1 Druid the Sales Pitch42 Intro42.1Experience . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .42.2Real Time? . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .42.3Demand . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .42.4Reality . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .43 Origin (PHP)54 1st Refactoring (Node.js)55 Return of Experience66 Return of Experience67 2nd Refactoring68 2nd Refactoring (FTW!)79 2nd Refactoring return of experience81

10 Demo811 Pre Considerations811.1 Discovered vs Invented . . . . . . . . . . . . . . . . . . . . . . . .811.2 In the End8. . . . . . . . . . . . . . . . . . . . . . . . . . . . . .12 Druid812.1 Who? . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .812.2 Goal . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .912.3 Concepts. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .912.4 Key Features . . . . . . . . . . . . . . . . . . . . . . . . . . . . .912.5 Right for me? . . . . . . . . . . . . . . . . . . . . . . . . . . . . .913 High Level Architecture13.1 Inspiration9. . . . . . . . . . . . . . . . . . . . . . . . . . . . . .913.2 Index / Immutability . . . . . . . . . . . . . . . . . . . . . . . . .913.3 Storage . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .1013.4 Specialized Nodes . . . . . . . . . . . . . . . . . . . . . . . . . . .1014 Druid vs X1014.1 Elasticsearch . . . . . . . . . . . . . . . . . . . . . . . . . . . . .1014.2 Key/Value Stores (HBase/Cassandra/OpenTSDB) . . . . . . . .1014.3 Spark . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .1014.4 SQL-on-Hadoop (Impala/Drill/Spark SQL/Presto) . . . . . . . .1015 Data15.1 Concepts11. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .1115.2 Indexing . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .1115.3 Loading . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .1115.4 Querying . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .1115.5 Segments . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .112

16 Roll-up1216.1 Example . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .1216.2 as SQL . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .1217 Segments1217.1 Sharding . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .1217.2 Core Data Structure . . . . . . . . . . . . . . . . . . . . . . . . .1317.3 Example . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .1317.4 Example (multiple matches) . . . . . . . . . . . . . . . . . . . . .1317.5 Real-time ingestion . . . . . . . . . . . . . . . . . . . . . . . . . .1317.6 Batch Ingestion . . . . . . . . . . . . . . . . . . . . . . . . . . . .1417.7 Real-time Ingestion . . . . . . . . . . . . . . . . . . . . . . . . . .1418 Querying1418.1 Query types . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .1418.2 Example(s) . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .1418.3 Result . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .1518.4 Caching . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .1519 Druid Components1519.1 Druid . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .1519.2 Also . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .1519.3 Coordinator . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .1620 When not to choose Druid1621 Graphite (metrics)1622 Pivot (exploring data)1723 Caravel1724 Conclusions1824.1 Precompute your time series? . . . . . . . . . . . . . . . . . . . .1824.2 Don’t reinvent it . . . . . . . . . . . . . . . . . . . . . . . . . . .1824.3 Druid way is the right way! . . . . . . . . . . . . . . . . . . . . .183

1Druid the Sales Pitch Sub-Second QueriesReal-time StreamsScalable to PetabytesDeploy AnywhereVibrant Community (Open Source) Ideal for powering user-facing analytic applications Deploy anywhere: cloud, on-premise, integrate with Haddop, Spark,Kafka, Storm, Samza2Intro2.1Experience Real Time Social Media Analytics2.2Real Time? Ingestion Latency: seconds Query Latency: seconds2.3Demand Twitter: 20k msg/s, 1msg 10ko during 24h Facebook public: 1000 to 2000 msg/s continuously Low Latency2.4Reality Twitter: 400 msg/s continuously, burst to 1500 Facebook: 1000 to 2000 msg/s4

3Origin (PHP)41st Refactoring (Node.js) Ingestion still in PHPNode.js, Perl, Java & R for sentiment analysisMongoDBManually made time series (Incremental Map/Reduce)Manually coded HyperLogLog in js5

5Return of Experience6Return of Experience Ingestion still in PHP (600 msg/s max) Node.js, Perl, Java (10 msg/s max)72nd Refactoring HaskellClojure / ClojurescriptKafka / ZookeeperMesos / MarathonElasticsearchDruid6

Figure 1: Too Slow, Bored82nd Refactoring (FTW!)7

92nd Refactoring return of experience No limit, everything is scalableHigh availabilityLow latency: Ingestion & User faced queryingCheap if done correctlyThanks Druid!10Demo Low Latency High Volume of Data Analysis Typically pulseDEMO Time1111.1Pre ConsiderationsDiscovered vs InventedTry to conceptualize a s.t. Ingest EventsReal-Time QueriesScalableHighly AvailableAnalytics: timeseries, alerting system, top N, etc 11.2In the EndDruid concepts are always emerging naturally1212.1DruidWho?MetamarketsPowered by Druid Alibaba, Cisco, Criteo, eBay, Hulu, Netflix, Paypal 8

12.2GoalDruid is an open source store designed for real-time exploratory analytics on large data sets.hosted dashboard that would allow users to arbitrarily explore andvisualize event streams.12.3Concepts Column-oriented storage layout distributed, shared-nothing architecture advanced indexing structure12.4 12.5 1313.1Key FeaturesSub-second OLAP QueriesReal-time Streaming IngestionPower Analytic ApplicationsCost EffectiveHigh AvailableScalableRight for me?require fast aggregationsexploratory analyticsanalysis in real-timelots of data (trillions of events, petabytes of data)no single point of failureHigh Level ArchitectureInspiration Google’s BigQuery/Dremel Google’s PowerDrill13.2Index / ImmutabilityDruid indexes data to create mostly immutable views.9

13.3StorageStore data in custom column format highly optimized for aggregation & filter.13.4 1414.1Specialized NodesA Druid cluster is composed of various type of nodesEach designed to do a small set of things very wellNodes don’t need to be deployed on individual hardwareMany node types can be colocated in productionDruid vs XElasticsearch resource requirement much higher for ingestion & aggregation No data summarization (100x in real world data)14.2Key/Value Stores (HBase/Cassandra/OpenTSDB) Must Pre-compute Result– Exponential storage– Hours of pre-processing time Use the dimensions as key (like in OpenTSDB)– No filter index other than range– Hard for complex predicates14.3Spark Druid can be used to accelerate OLAP queries in Spark Druid focuses on the latencies to ingest and serve queries Too long for end user to arbitrarily explore data14.4SQL-on-Hadoop (Impala/Drill/Spark SQL/Presto) Queries: more data transfer between nodes Data Ingestion: bottleneck by backing store Query Flexibility: more flexible (full joins)10

1515.1DataConcepts Timestamp column: query centered on time axis Dimension columns: strings (used to filter or to group) Metric columns: used for aggregations (count, sum, mean, etc )15.2 15.3IndexingImmutable snapshots of datadata structure highly optimized for analytic queriesEach column is stored separatelyIndexes data on a per shard (segment) levelLoading Real-Time Batch15.4Querying JSON over HTTP Single Table Operations, no joins.15.5Segments Per time interval– skip segments when querying Immutable– Cache friendly– No locking Versioned– No locking– Read-write concurrency11

age. ge. nb addedCthulhu2 251 32CthulhuAzatoth2 60Azatoth1 1216.2deleted656245879953deleted1274518653as SQLGROUP BY timestamp, page, nb, added, deleted:: nb COUNT(1), added SUM(added), deleted SUM(deleted)In practice can dramatically reduce the size (up to x100)1717.1SegmentsShardingsampleData 2011-01-01T01:00:00:00Z 2011-01-01T02:00:00:00Z v1 page. nb added deletedCthulhu1 2045Azatoth1 30106sampleData 2011-01-01T01:00:00:00Z 2011-01-01T02:00:00:00Z v1 page. nb added deletedCthulhu1 1245Azatoth2 308012

17.2Core Data Structure dictionary a bitmap for each value a list of the columns values encoded using the dictionary17.3Exampledictionary: { "Cthulhu": 0, "Azatoth": 1 }column data: [0, 0, 1, 1]bitmaps (one for each value of the column):value "Cthulhu": [1,1,0,0]value "Azatoth": [0,0,1,1]17.4Example (multiple matches)dictionary: { "Cthulhu": 0, "Azatoth": 1 }column data: [0, [0,1], 1, 1]bitmaps (one for each value of the column):value "Cthulhu": [1,1,0,0]value "Azatoth": [0,1,1,1]17.5Real-time ingestion Via Real-Time Node and Firehose– No redundancy or HA, thus not recommended Via Indexing Service and Tranquility API– Core API13

– Integration with Streaming Frameworks– HTTP Server– Kafka Consumer17.6Batch Ingestion File based (HDFS, S3, )17.7Real-time IngestionTask 1: [Interval][ Window ]Task ------- time1818.1 18.2QueryingQuery typesGroup by: group by multiple dimensionsTop N: like grouping by a single dimensionTimeseries: without grouping over dimensionsSearch: Dimensions lookupTime Boundary: Find available data timeframeMetadata queriesExample(s){"queryType": "groupBy","dataSource": "druidtest","granularity": "all","dimensions": [],"aggregations": [{"type": "count", "name": "rows"},{"type": "longSum", "name": "imps", "fieldName": "impressions"},{"type": "doubleSum", "name": "wp", "fieldName": "wp"}],"intervals": ["2010-01-01T00:00/2020-01-01T00"]}14

18.3Result[ {"version" : "v1","timestamp" : "2010-01-01T00:00:00.000Z","event" : {"imps" : 5,"wp" : 15000.0,"rows" : 5}} ]18.4Caching Historical node level– By segment Broker Level– By segment and query– groupBy is disabled on purpose! By default: local caching1919.1 Druid ComponentsDruidReal-time NodesHistorical NodesBroker NodesCoordinatorFor indexing:– Overlord– Middle Manager19.2 AlsoDeep Storage (S3, HDFS, )Metadata Storage (SQL)Load BalancerCache15

19.3 CoordinatorReal-time Nodes (pull data, index it)Historical Nodes (keep old segments)Broker Nodes (route queries to RT & Hist. nodes, merge)Coordinator (manage segemnts)For indexing:– Overlord (distribute task to the middle manager)– Middle Manager (execute tasks via Peons)20 21When not to choose DruidData is not time-seriesCardinality is very highNumber of dimensions is highSetup cost must be avoidedGraphite (metrics)Graphite16

22Pivot (exploring data)Pivot23CaravelCaravel17

24Conclusions24.1Precompute your time series?24.2Don’t reinvent it 24.31.2.3.4.5.need a user facing APIneed time series on many dimensionsneed real-timebig volume of dataDruid way is the right way!Push in kafkaAdd the right dimensionsPush in druid?Profit!18

14 Druid vs X 14.1 Elasticsearch resource requirement much higher for ingestion & aggregation No data summarization (100x in real world data) 14.2 Key/Value Stores (HBase/Cassandra/OpenTSDB) Must Pre-compute Result – Exponential storage – Hours of pre-processing time Use the dimensions as key (like in OpenTSDB)