pgsql connection manager for scalability freaks.
pip install hv
This is the implementation of the pattern described by Instagram in "Sharding & IDs at Instagram" article.
An instance of the
Key represents a unique key (64-bits long) for an entity and has the following attributes:
created returns the UTC datetime corresponding to the first 41-bits of the numeric id.
shard_id holds a 13-bits integer and represents the logical shard.
added_id is the remaining 10-bits and represents an auto-incrementing sequence, modulus 1024. This means we can generate 1024 IDs, per shard, per millisecond.
hv.datastore.Datastore(connections, pool_max, pool_block_timeout, logger)
connections holds an array of dicts which are being passed to psycopg2 respectively. However those dicts should also contain a special
shards value which adds meaning to all that fuss going around.
This example shows the bare minimum you need to create a
connections = [ dict(shards='1-9', host='192.168.2.23', port='5432', user='x', password='x', database='x'), dict(shards='9-17', host='192.168.2.24', port='5432', user='x', password='x', database='x') ] db = Datastore(connections)
In this case, we assume PostgreSQL running on 192.168.2.23 contains shards (schemas) starting from 1 to 9 (9 not included) and on 192.168.2.24 we have shards from 9 to 17.
pool_max is the maximum number of psycopg2 connections that are going to be kept alive for every connection we have passed. (default: 10)
pool_block_timeout is the maximum number of seconds to wait for getting a connection from pool before the request is dropped. (default: 5)
Datastore instance has the following methods:
Returns a psycopg2 connection for the given
Beware that this connection should be sent back into the pool when you are finished, or otherwise you know- universe will collapse and Trinity will die :(
connection back into the pool where it belonged.
Returns a context manager delivering a connection for the given
This is a convenience method that saves you from forgetting to call
with db.cursor(5) as cur: cur.execute('SELECT version()') ver = cur.fetchone()
put(shard_id, kind, **kwargs)
Writes data to the specified shard, where
kind is an integer which is not stored within hstore field and used for differentiating between entity types.
data = dict(beep='boop') key = db.put(12, 1, **data)
Fetches the data with the given
key must be of type
key = Key(307821103844175873) res = db.get(key)
Closes every connection in every pool.
Reinstantiates connection pools. Make sure you have closed every connection before calling this method.