NewSQL, SQL On Hadoop - Uni-leipzig.de

Transcription

NewSQL, SQL on HadoopProf. Dr. Andreas ThorHochschule für Telekommunikation Leipzig (HfTL)thor@hft-leipzig.de2nd International ScaDS Summer School on Big Data, July 12, 2016Andreas Thor: NewSQL, SQL on Hadoop1

Agenda SQL on Hadoop– Motivation: Why MR is not enough?– Hadoop-based Frameworks– Translating SQL to MapReduce, Optimizing data flows NewSQL– Motivation: RDBMS and the Cloud– Types of NewSQL systems– In-Memory Databases, Data Partitioning No complete overview of all tools Focus on ideas / techniquesAndreas Thor: NewSQL, SQL on Hadoop2

Data analysis / Queries on Big Data Simple aggregations, ad-hoc analyses– Number of clicks / page views per day / month– How many foto uploads on New Year’s Eve 2015? How many tweets during theEURO 2016 final? Preprocessing for Data Mining– Identify user groups / types– Find suspicious / frequent patterns in UGC (user generated content) If your data is in Hadoop– use the query capabilities of your NoSQL store!– write a MapReduce / Spark program to analyze it! Really?Andreas Thor: NewSQL, SQL on Hadoop3

Data Analysis: Access Frequency Skew Empirical analysis from companies reveals access frequency skew– Zipf-like distribution: Few files account for a very high number of accesses– 90% of all files accessed only onceChen et. al: Interactive Analytical Processing in Big Data Systems: A Cross-Industry Study of MapReduce Workloads. VLDB 2012Andreas Thor: NewSQL, SQL on Hadoop4

SQL-based Data Analysis Copy to Relational Database / Data Warehouse?– Development overhead for rarely used files– Import is inefficient High-level language for Hadoop-based data analyses– Data analysts do not need to be able to program MapReduce, Spark etc.– Efficient re-use of scripts / workflows for similar analysis tasks SQL interface for Hadoop needed––––SQL is declarative, concisePeople know SQLInterface with existing analysis softwareCan be combined with MapReduce / SparkAndreas Thor: NewSQL, SQL on Hadoop5

Hadoop Ecosystem (simplified)Data Type /AlgorithmExecution EngineCluster ManagementData StorageAndreas Thor: NewSQL, SQL on HadoopSQLGraphMachineLearning MapReduce, Spark, TezHadoop YarnHDFS6

Processing Frameworks for HadoopMark Grover: Processing frameworks for Hadoop, s-for-hadoop.htmlAndreas Thor: NewSQL, SQL on Hadoop7

Hadoop-based Data Analysis FrameworksQuelle: Chen et. al: Interactive Analytical Processing in Big Data Systems: A Cross-Industry Study of MapReduce Workloads. VLDB 2012Andreas Thor: NewSQL, SQL on Hadoop8

Apache Hive Data Warehouse Infrastructure on Hadoop– Hive 2.1 (June 2016) for Hadoop 2.x “Hive MapReduce SQL”– SQL is simple to use– MapReduce provides scalability and fault tolerance HiveQL SQL-like query language– Extendible with MapReduce scripts and user-defined functions (e.g., in Python)Andreas Thor: NewSQL, SQL on Hadoop9

Hive: Metastore Mapping files to logical tables– Flexible (de)serialization of tables (CSV, XML, JSON)d 20160710ClicksTabled 20160711Partitions(multiple levels)HDFS Files(split into hash buckets) Table corresponds to HDFS directory: /clicks– Subdirectories for partitioning (based on attributes): /clicks/d 20160710– Bucketing: Split files into parts Advantage: Direct data access, i.e., no transformation / loading intorelational format Disadvantage: No pre-processing (e.g., indexing)Andreas Thor: NewSQL, SQL on Hadoop10

Hive: Workflow1. User issues SQL query2. Hive parses and plans query3. Query converted to MapReduce4. MapReduce run by HadoopAbadi et. al: SQL-on-Hadoop Tutorial. VLDB 2015Andreas Thor: NewSQL, SQL on Hadoop11

Hive: Queryg1g2SELECT g1.x, g1.avg, g2.cntFROM (SELECT a.x, AVG(a.y) AS avgFROM aGROUP BY a.x) g1JOIN (SELECT b.x, COUNT(b.y) AS cntFROM bGROUP BY b.x) g2JOINON (g1.x g2.x)ORDER BY g1.avgORDERAndreas Thor: NewSQL, SQL on Hadoop12

Hive: Query Optimization Query optimization employs ideasfrom database research– Logical (rule-based) transformations– Cost-based optimizations Projection / selection pushdown– Remove unnecessary attributes /records as early as possible Adaptive implementations, e.g.,joins– Based on statistics (e.g., number ofrecords, min-max eting-august-2009-facebookAndreas Thor: NewSQL, SQL on Hadoop13

Semi-structured JSON data vs. relational data JSON data (collection of objects){“ id”:”1”, 0,"size":12345},"tags":["tuna","shark"]}{“ id”:”2”, :250,"size":32091},"tags":["oak"]}. Relational: Nested table with multi-valued 917:5817:43user 6464128480size12345320911253928344928750398[tuna, shark][oak][tahoe, powder][maui, tuna][maui][maui]Source: /04/icouch.htmlAndreas Thor: NewSQL, SQL on Hadoop14

SELECT camera, AVG(info.size)FROM PicturesWHERE user “john”GROUP BY cameraSQL to MapReduce:Examplemapfunction (doc) {if (doc.user “john”) { reducefunction (key, values) {emit(doc.camera,sum 0;doc.info.size); }foreach (v:values) sum v;}return sum/values.length;}keyvalue{id:3,user:john .}canon 1253{id:4,user:john .}nikon{id:5,user:bob .}92834keyvaluescanon[32091, 1253]nikon[92834]reducecanon 32091map{id:2,user:john .}shuffle sort{id:1,user:bob eas Thor: NewSQL, SQL on Hadoop15

SQL to MapReduceSQLMapReduceSelectionWHERE user ‘John‘Filter in map functionif (user ‘John‘) { emit ( ); }ProjectionSELECT camera, sizeMap output valueemit ( , {camera, size} );GroupingGROUP BY cameraMap output key grouping attribute(s)emit ( camera, );AggregationSELECT AVG (size)Computation in reduce functionaverage ( [size1, size2, ]);Nested QueriesFROM (SELECT FROM ) AS TSequence of MapReduce programsOutput of MR1 (inner query) input to MR2 (outer q.)SortingORDER BY cameraMap output key sorting attribute(s)Requires single reducer or range partitionerJoinFROM R JOIN S ON (R.b S.b)-- see next slides --Andreas Thor: NewSQL, SQL on Hadoop16

Repartition Join (for Equi Join) Naïve 2,(3,(4,S:c1)S:c2)S:c3)S:c4)Andreas Thor: NewSQL, SQL on Hadoop(1, [S:c1])(2, [R:a4, S:c2,R:a6, R:a7,R:a8])(3, [R:a1, R:a2,R:a3, S:c3,R:a5])reduceaa1a2a3a4a5a6a7a8shuffle sortRmap– Map output: key join attribute, value relation tuple (relevant attributes)– reduce: all pairs from different c3(4, [S:c4])17

Repartition Join: Extended Key Reducer needs to buffer all values per key– No specific order of reduce values in list; sequential access to list only Key extension ( adjusted grouping and sorting comparators)– Extend map output key by relation name; group by attribute only– Sorting so that keys of small relation (S) are before keys of large relation (R) Reduce buffering for S keys only ExampleNaïve(2, R:a4)(2, S:c2)(2, R:a6)(2, R:a7)(2, R:a8)(2, S:c9)Andreas Thor: NewSQL, SQL on HadoopExtended Key(2:S, c2)(2:R, c(2:S,a94)(2:R, a6)(2:R, a7)(2:R, a8)18

Repartition Join: Large map output– All tuples are sorted between map andreduce high network traffic Common scenario: R S – Example: Logfile User Join computation in the map phase;no reduce phaseRaa1a2a3a4a5a6a7a8b33323222S– Use small relation (S) asadditional map input Data transfermapBroadcast c2cc1c2c3c4– Small relation is sent to all n nodes n S – No transfer of R: map task consumes local map partition– Repartition-Join: R S Andreas Thor: NewSQL, SQL on Hadoop19

EvaluationData size sent through the network#record(Relation S)Repartition(ext. Key)Broadcast0.3 106145 GB6 GB10 106145 GB195 GB300 106151 GB6240 GB Prefer broadcast for small S Repartitioning: Benefit ofextended keyBlanas et al.: A Comparison of Join Algorithms for Log Processing in MapReduce. SIGMOD 2010Andreas Thor: NewSQL, SQL on Hadoop20

SQL on HadoopApache HiveApache Spark SQLApache DrillOperation se-likequeriesETL processingComplex Data Analysis Interactive DataAlgorithms (e.g.,DiscoveryMachine eQL (SQL-like)Mix Spark code (Java / ANSI SQLScale) with SQLData SourcesHadoopHadoop, Hive Tables,JDBCHadoop, NoSQL(joining different ,Pre-definedJSON, On-the-fly(„schema-free“)Translates intoMapReduce & SparkSpark--Andreas Thor: NewSQL, SQL on Hadooplow21

NewSQLFrom SQL on Hadoop to NewSQLShared Nothing ClusterAndreas Thor: NewSQL, SQL on Hadoop22

NewSQL: Definition “ delivers the scalability and flexibility promised by NoSQL whileretaining the support for SQL queries and/or ACID, or to improveperformance for appropriate workloads.” (451 group) NewSQL: An Alternative to NoSQL and Old SQL for New OLTP Apps(by Michael Stonebraker)–––––SQL as the primary interfaceACID support for transactionsNon-locking concurrency controlHigh per-node performanceScalable, shared nothing architectureMatt Asslet, 451 Group, 2011: https://www.451research.com/report-short?entityId 66963Michael Stonebraker, 2011: http://cacm.acm.org/blogs/blog-cacm/109710Andreas Thor: NewSQL, SQL on Hadoop23

RBDMS Design Principles RBDMS developed for shared-memory and (later) shared-diskarchitectures– Cloud / Data Center: Shared Nothing RDBMS store data on hard-drive disks; main memory for caching only– Cloud / Data Center: large amount of main memory affordable; solid state disksAmazon EC2 price historyfor 1TB main memory RDBMS implement Recovery using disk-based Logfiles– Cloud / Data Center: Fast recovery via data copying through the network possible RDBMS support Multi-Threading (on a single core)– T2 can be started if T1 is still waiting for data (from disk) long transactions shouldnot block short transactions low latency– Cloud / Data Center: Multi core nodes, large main memoryAndreas Thor: NewSQL, SQL on Hadoop24

RDBMS Overhead “Removing those overheads and running the database in main memorywould yield orders of magnitude improvements in database performance”Useful work Retrieve / update dataIndex Management12%11%28%29%20%Locking & Latching Concurrency control(locking protocols), deadlock handling Short-term locks in multi-threading (latching) Reduce overhead for Isolated Execution(e.g., no multi-threading)Buffer Management Mapping records to pages forblock-wise storage on disk Not needed anymore for InMemory-DatabasesLogging Write & read log files (writeahead logging) ReDo Recovery (after outage),UnDo Recovery (aftertransaction failures) ReDo by “Copy from Replica”possible; avoid UnDo casesHarizopoulos, S. et. al., “OLTP: Through the Looking Glass and What We Found There,” SIGMOD, June 2008.Andreas Thor: NewSQL, SQL on Hadoop25

HStore: Overview Distributed, row-store-based, main memory relational database– Cluster of nodes (shared-nothing); multiple sites per node– Site single-threaded daemon on a single CPU no latching– Row-store (B-Tree) in main memory no buffer management Transactions–––––No ad-hoc SQL queries; pre-defined stored Procedures (SP) onlyClassification of transactions (e.g., “single / multi partition”, “two phase”)Global ordering of transactions strong consistencyACIDDirect data access / transfer (no ODBC) Recovery– Replica-based recovery no logging needed VoltDB (commercial) HStore (open source / research prototype)Andreas Thor: NewSQL, SQL on Hadoop26

HStore: Site ArchitectureJones, Abadi, and Madden, "Low overhead concurrency control for partitioned main memory databases,“ SIGMOD 2010Andreas Thor: NewSQL, SQL on Hadoop27

OLTP transaction in Web Applications Focus of web applications: Scalability, scalability, scalability– Limited flexibility on transactions is ok Observations: Transactions –––– often touch data of current user only modify few records only are known a-priori, i.e., no ad-hoc queries needed are comparatively simpleAndreas Thor: NewSQL, SQL on Hadoop28

Data Partitioning: Tree Schema Most schemas (for web applications) are “tree schemas”– One (or more) root tables (e.g., warehouse)– Other tables have (multiple) one-to-may relationships to root tableAndy Pavlo: NewSQL, 2012Andreas Thor: NewSQL, SQL on Hadoop29

Horizontal Partitioning Horizontal partitioning of the root tableGoal: Single-PartitionTransactions– Child tables are partitioned accordingly– Replication of unrelated tablesAndreas Thor: NewSQL, SQL on Hadoop30

HStore: InfrastructureAndreas Thor: NewSQL, SQL on Hadoop31

Single Partition Transactions Client sends single partition transaction to (node of) primary partition– Primary forwards to Secondary (Backup)– Execute transactions by node id timestamp (nodes are time-synchronized) Independent, parallel execution on all partitions– Each nodes achieve the same result (commit oder abort)– Primary sends back result to client after receiving “acknowledge” from allsecondaries Strong Consistency– If node fails copy partition replica No ReDo logging Transactions are executed sequentially on every node (single-thread) No Concurrency Controlx read(a) “Two phase” transaction– Format: “read(s), check for consistency, write(s)”– No UnDo logging necessaryy read(b)y 100 ?write(a, x 100)write(b, y-100)Andreas Thor: NewSQL, SQL on Hadoop32

Multi Partition Transactions Multi Partition Transaction are controlled by a central Coordinator– Multiple coordinators possible but preserving global order of transactions Execution– Divide Multi Partition Transaction in fragments that are sent to all partitions– UnDo buffer for undoing transactions in case of failures (e.g., consistency violations) Two-Phase Commit Protocol– Coordination protocol to achieve global result (commit / abort) in distributedenvironmentAndreas Thor: NewSQL, SQL on Hadoop33

NewSQL: OverviewNew ArchitecturesNew SQL EnginesMiddlewareTypeDeveloped “fromscratch”“Plugin” to existingRDBMS (e.g., MySQL)Additional layer on topof RDBMSExamplesH-Store / VoltDBGoogle SpannerMemSQLNuoDBClustrix MySQL ClusterScaleDBTokutek Schooner MySQLScaleArcScaleBasedbShards CharacteristicsDesigned for inmemory (or flash) asprimary data storeReuse componentsfrom RDBMSframeworkTransparent clustering/sharding for scalabilityAndreas Thor: NewSQL, SQL on Hadoop34

Summary SQL on Hadoop: „Add SQL to NoSQL“– Frameworks leveraging (parts of) the Hadoop infrastructure– SQL-like queries on (semi-)structured data (files) and NoSQL (OLAP)– Techniques: SQL-to-MR-translation, Query optimization, Metadata NewSQL: „Add Scalability to RDBMS“– New type of RDBMS in a shared-nothing cluster– SQL and ACID transactions (OLTP)– Techniques: In-Memory, Data Partitioning, Pre-defined SQL statementsAndreas Thor: NewSQL, SQL on Hadoop35

Abadi et. al: SQL-on-Hadoop Tutorial. VLDB 2015 1. User issues SQL query 2. Hive parses and plans query 3. Query converted to MapReduce 4. MapReduce run by Hadoop. Andreas Thor: NewSQL, SQL on Hadoop 12 Hive: Query SELECT g1.x, g1.avg, g2.cnt FROM (SELECT a.x, AVG(a.y) AS avg FROM a GROUP BY a.x) g1 JOIN