Distributed Computing On PostgreSQL

Transcription

Distributed Computing onPostgreSQLMarco Slot marco@citusdata.com

Small data architecture

Big data architecture

Big data architecture using postgresReal-time analyticsMessagingRecordsData warehouse

PostgreSQL is a perfect building blockfor distributed systems

Features!PostgreSQL contains many useful features for building a distributed system: Well-defined protocol, libpqCrash safetyConcurrent executionTransactionsAccess controls2PCReplicationCustom functions

Extensions!Built-in / contrib: postgres fdwdblink RPC!plpgsqlThird-party open source: pglogicalpg croncitus

Extensions!Built-in / contrib: postgres fdwdblink RPC!plpgsqlThird-party open source: pglogicalpg croncitusYours!

dblinkRun queries on remote postgres serverSELECT dblink connect(node id,format('host %s port %s dbname postgres', node name, node port))FROM nodes;SELECT dblink send query(node id, SELECT pg database size('postgres') )FROM nodes;SELECT sum(size::bigint)FROM nodes, dblink get result(nodes.node id) AS r(size text);SELECT dblink disconnect(node id)FROM nodes;

RPC using dblinkFor every postgres function, we can create a client-side stub using dblink.CREATE FUNCTION func(input text).CREATE FUNCTION remote func(host text, port int, input text) RETURNS textLANGUAGE sql AS function SELECT res FROM dblink(format('host %s port %s', host, port),format('SELECT * FROM func(%L)', input))AS res(output text); function ;

PL/pgSQLProcedural language for Postgres:CREATE FUNCTION distributed database size(dbname text)RETURNS bigint LANGUAGE plpgsql AS function DECLAREtotal size bigint;BEGINPERFORM dblink send query(node id, format('SELECT pg database size(%L)', dbname)FROM nodes;SELECT sum(size::bigint) INTO total sizeFROM nodes, dblink get result(nodes.node id) AS r(size text);RETURN total sizeEND; function ;

Distributed system in progress.With these extensions, we can already create a simple distributed computingsystem.Nodespostgres fdw?SELECTtransform data()NodesNodesNodesData 1Data 2Data 3Parallel operation using dblink

pglogical / logical replicationAsynchronously replicate changes to another database.NodesNodesNodesNodes

pg paxosConsistently replicate changes between databases.NodesNodesNodes

pg cronCron-based job scheduler for postgres:CREATE EXTENSION pg cron;SELECT cron.schedule('* * * * */10', 'SELECT transform data()');Internally uses libpq, meaning it can also schedule jobs on other nodes.pg cron provides a way for nodes to act autonomously

CitusTransparently shards tables across multiple nodesCoordinatorEventsE1E4E2E5create distributed table('events','event id');E2E5

Citus MXNodes can have the distributed tables tooCoordinatorEventsEventsE1E4EventsE2E5EventsE2E5

How to build a distributed systemusing only PostgreSQL & extensions?

Building a streaming publish-subscribe systemProducersPostgres nodestopic: adclickConsumers

Storage nodesCoordinatorCREATE TABLEEventsE1E4EventsE2E5EventsE2Use Citus to create a distributed tableE5Events

Distributed Table Creation psql -h coordinatorCREATE TABLE events (event id bigserial,ingest time timestamptz default now(),topic name text not null,payload jsonb);SELECT create distributed table('events', 'event id'); psql -h any-nodeINSERT INTO events (topic name, payload) VALUES ('adclick','{.}');

Sharding strategyShard is chosen by hashing the value in the partition column.Application-defined: stream id text not nullOptimise data distribution: event id bigserialOptimise ingest capacity and availability: sid int default pick local value()

ProducersCOPY / INSERTEventsE1E4EventsE2E5EventsE2E5Producers connect to a random node and perform COPY or INSERT into events

ConsumersConsumers in a group together consume events at least / exactly once.E1E4E2topic: adclick%ConsumergroupE5E2E5

Consumer leasesConsumers obtain leases for consuming a shard.Lease are kept in a separate table on each node:CREATE TABLE leases (consumer group text not null,shard id bigint not null,owner text,new owner text,last heartbeat timestamptz,PRIMARY KEY (consumer group, shard id));

Consumer leasesConsumers obtain leases for consuming a shard.SELECT * FROM claim lease('click-analytics', 'node-2', 102008);Under the covers: Insert a new lease or set new owner to steal lease.CREATE FUNCTION claim lease(group name text, node name text, shard id int) INSERT INTO leases (consumer group, shard id, owner, last heartbeat)VALUES (group name, shard, node name, now())ON CONFLICT (consumer group, shard id) DO UPDATESET new owner node nameWHERE leases.new owner IS NULL;

Distributing leases across consumersDistributed algorithm for distributing leases across nodesSELECT * FROM obtain leases('click-analytics', 'node-2')-- gets all available lease tables-- claim all unclaimed shards-- claim random shards until #claims #shards/#consumersNot perfect, but ensures all shards are consumed with load balancing (unless C S)

ConsumersFirst consumer consumes allE1E4leasesobtain leasesE2E5leasesE2E5leases

ConsumersFirst consumer consumes allE1E4leasesE2E5leasesE2E5leases

ConsumersSecond consumer steals leases from first consumerE1E4leasesE2E5leasesE2E5leasesobtain leases

ConsumersSecond consumer steals leases from first consumerE1E4E2E5E2E5

Consuming eventsConsumer wants to receive all events once.Several options: SQL levelLogical decoding utility functionsUse a replication connectionPG10 logical replication / pglogical

Consuming eventsGet a batch of events from a shard:SELECT * FROM poll events('click-analytics', 'node-2', 102008, 'adclick',' last-processed-event-id ');-- Check if node has the leaseSet owner new owner if new owner is set-- Get all pending events(pg logical slot peek changes)-- Progress the replication slot (pg logical slot get changes)-- Return remaining events if still owner

Consumer loop1.2.3.Call poll events for each leased shardProcess events from each batchRepeat with event IDs of last event in each batchE1poll eventsE4E2E5E2E5

Failure handlingProducer / consumer fails to connect to storage node: Connect to different nodeStorage node fails: Use pick local value() for partition column, failover to hot standbyConsumer fails to consume batch Events are repeated until confirmedConsumer fails and does not come back Consumers periodically call obtain leases Old leases expire

Maintenance: Lease expirationUse pg cron to periodically expire leases on coordinator:SELECT cron.schedule('* * * * *', 'SELECT expire leases()');CREATE FUNCTION expire leases().UPDATE leasesSET owner new owner, last heartbeat now()WHERE last heartbeat now() - interval '2 minutes'

Maintenance: Delete old eventsUse pg cron to periodically expire leases on coordinator: psql -h coordinatorSELECT cron.schedule('* * * * *', 'SELECT expire events()');CREATE FUNCTION expire events().DELETE FROM eventsWHERE ingest time now() - interval '1 day';

Prototyped a functional, highly available publish-subscribe systems in 300 lines of codehttps://goo.gl/R1suAo

Demo

Big data architecture using postgresReal-time analyticsMessagingRecordsData warehouse

Questions?marco@citusdata.com

Data warehouse Real-time analytics Big data architecture using postgres Messaging. PostgreSQL is a perfect building block for distributed systems. Features! PostgreSQL contains many useful features for building a distributed system: Well-