Playhouse, extensions to Peewee

Peewee comes with numerous extension modules which are collected under the playhouse namespace. Despite the silly name, there are some very useful extensions, particularly those that expose vendor-specific database features like the Sqlite Extensions and Postgresql Extensions extensions.

Below you will find a loosely organized listing of the various modules that make up the playhouse.

Database drivers / vendor-specific database functionality

High-level features

Database management and framework integration

Sqlite Extensions

The SQLite extensions module provides support for some interesting sqlite-only features:

  • Define custom aggregates, collations and functions.
  • Support for FTS3/4 (sqlite full-text search) with BM25 ranking.
  • C extension providing fast implementations of ranking and other utility functions.
  • Support for the new FTS5 search extension.
  • Specify isolation level in transactions.
  • Support for virtual tables and SQLite C extensions.
  • Support for the closure table extension, which allows efficient querying of heirarchical tables.

sqlite_ext API notes

class SqliteExtDatabase(database, pragmas=(), c_extensions=True, **kwargs)
Parameters:
  • pragmas – A list or tuple of 2-tuples containing PRAGMA settings to configure on a per-connection basis.
  • c_extensions (bool) – Boolean flag indicating whether to use the fast implementations of various SQLite user-defined functions. If Cython was installed when you built peewee, then these functions should be available. If not, Peewee will fall back to using the slower pure-Python functions.

Subclass of the SqliteDatabase that provides some advanced features only offered by Sqlite.

  • Register custom aggregates, collations and functions
  • Support for SQLite virtual tables and C extensions
  • Specify a row factory
  • Advanced transactions (specify isolation level)
aggregate([name=None[, num_params=-1]])

Class-decorator for registering custom aggregation functions.

Parameters:
  • name – string name for the aggregate, defaults to the name of the class.
  • num_params – integer representing number of parameters the aggregate function accepts. The default value, -1, indicates the aggregate can accept any number of parameters.
@db.aggregate('product', 1)
class Product(object):
    """Like sum, except calculate the product of a series of numbers."""
    def __init__(self):
        self.product = 1

    def step(self, value):
        self.product *= value

    def finalize(self):
        return self.product

# To use this aggregate:
product = (Score
           .select(fn.product(Score.value))
           .scalar())
unregister_aggregate(name):

Unregister the given aggregate function.

collation([name])

Function decorator for registering a custom collation.

Parameters:name – string name to use for this collation.
@db.collation()
def collate_reverse(s1, s2):
    return -cmp(s1, s2)

# To use this collation:
Book.select().order_by(collate_reverse.collation(Book.title))

As you might have noticed, the original collate_reverse function has a special attribute called collation attached to it. This extra attribute provides a shorthand way to generate the SQL necessary to use our custom collation.

unregister_collation(name):

Unregister the given collation function.

func([name[, num_params]])

Function decorator for registering user-defined functions.

Parameters:
  • name – name to use for this function.
  • num_params – number of parameters this function accepts. If not provided, peewee will introspect the function for you.
@db.func()
def title_case(s):
    return s.title()

# Use in the select clause...
titled_books = Book.select(fn.title_case(Book.title))

@db.func()
def sha1(s):
    return hashlib.sha1(s).hexdigest()

# Use in the where clause...
user = User.select().where(
    (User.username == username) &
    (fn.sha1(User.password) == password_hash)).get()
unregister_function(name):

Unregister the given user-defiend function.

load_extension(extension)

Load the given C extension. If a connection is currently open in the calling thread, then the extension will be loaded for that connection as well as all subsequent connections.

For example, if you’ve compiled the closure table extension and wish to use it in your application, you might write:

db = SqliteExtDatabase('my_app.db')
db.load_extension('closure')
unload_extension(name):

Unload the given SQLite extension.

class VirtualModel

Subclass of Model that signifies the model operates using a virtual table provided by a sqlite extension.

Creating a virtual model is easy, simply subclass VirtualModel and specify the extension module and any options:

class MyVirtualModel(VirtualModel):
    class Meta:
        database = db
        extension_module = 'nextchar'
        extension_options = {}
Meta.extension_module = 'name of sqlite extension'
Meta.extension_options = {'tokenize': 'porter', etc}

SQLite virtual tables often support configuration via arbitrary key/value options which are included in the CREATE TABLE statement. To configure a virtual table, you can specify options like this:

class SearchIndex(FTSModel):
    content = SearchField()
    metadata = SearchField()

    class Meta:
        database = my_db
        extension_options = {
            'prefix': [2, 3],
            'tokenize': 'porter',
        }
class FTSModel

Model class that provides support for Sqlite’s full-text search extension. Models should be defined normally, however there are a couple caveats:

  • Unique constraints, not null constraints, check constraints and foreign keys are not supported.
  • Indexes on fields and multi-column indexes are ignored completely
  • Sqlite will treat all column types as TEXT (although you can store other data types, Sqlite will treat them as text).
  • FTS models contain a docid field which is automatically created and managed by SQLite (unless you choose to explicitly set it during model creation). Lookups on this column are performant.

sqlite_ext provides a SearchField field class which should be used on FTSModel implementations instead of the regular peewee field types. This will help prevent you accidentally creating invalid column constraints.

Because of the lack of secondary indexes, it usually makes sense to use the docid primary key as a pointer to a row in a regular table. For example:

class Document(Model):
    author = ForeignKeyField(User, related_name='documents')
    title = TextField(null=False, unique=True)
    content = TextField(null=False)
    timestamp = DateTimeField()

    class Meta:
        database = db


class DocumentIndex(FTSModel):
    title = SearchField()
    content = SearchField()

    class Meta:
        database = db
        # Use the porter stemming algorithm to tokenize content.
        extension_options = {'tokenize': 'porter'}

To store a document in the document index, we will INSERT a row into the DocumentIndex table, manually setting the docid:

def store_document(document):
    DocumentIndex.insert({
        DocumentIndex.docid: document.id,
        DocumentIndex.title: document.title,
        DocumentIndex.content: document.content}).execute()

To perform a search and return ranked results, we can query the Document table and join on the DocumentIndex:

def search(phrase):
    # Query the search index and join the corresponding Document
    # object on each search result.
    return (Document
            .select()
            .join(
                DocumentIndex,
                on=(Document.id == DocumentIndex.docid))
            .where(DocumentIndex.match(phrase))
            .order_by(DocumentIndex.bm25()))

Warning

All SQL queries on FTSModel classes will be slow except full-text searches and docid lookups.

Continued examples:

# Use the "match" operation for FTS queries.
matching_docs = (DocumentIndex
                 .select()
                 .where(DocumentIndex.match('some query')))

# To sort by best match, use the custom "rank" function.
best = (DocumentIndex
        .select()
        .where(DocumentIndex.match('some query'))
        .order_by(DocumentIndex.rank()))

# Or use the shortcut method:
best = DocumentIndex.search('some phrase')

# Peewee allows you to specify weights for columns.
# Matches in the title will be 2x more valuable than matches
# in the content field:
best = DocumentIndex.search(
    'some phrase',
    weights=[2.0, 1.0],
)

Examples using the BM25 ranking algorithm:

# you can also use the BM25 algorithm to rank documents:
best = (DocumentIndex
        .select()
        .where(DocumentIndex.match('some query'))
        .order_by(DocumentIndex.bm25()))

# There is a shortcut method for bm25 as well:
best_bm25 = DocumentIndex.search_bm25('some phrase')

# BM25 allows you to specify weights for columns.
# Matches in the title will be 2x more valuable than matches
# in the content field:
best_bm25 = DocumentIndex.search_bm25(
    'some phrase',
    weights=[2.0, 1.0],
)

If the primary source of the content you are indexing exists in a separate table, you can save some disk space by instructing SQLite to not store an additional copy of the search index content. SQLite will still create the metadata and data-structures needed to perform searches on the content, but the content itself will not be stored in the search index.

To accomplish this, you can specify a table or column using the content option. The FTS4 documentation has more information.

Here is a short code snippet illustrating how to implement this with peewee:

class Blog(Model):
    title = CharField()
    pub_date = DateTimeField()
    content = TextField()  # we want to search this.

    class Meta:
        database = db

class BlogIndex(FTSModel):
    content = SearchField()

    class Meta:
        database = db
        extension_options = {'content': Blog.content}

db.create_tables([Blog, BlogIndex])

# Now, we can manage content in the FTSBlog.  To populate it with
# content:
BlogIndex.rebuild()

# Optimize the index.
BlogIndex.optimize()

The content option accepts either a single Field or a Model and can reduce the amount of storage used. However, content will need to be manually moved to/from the associated FTSModel.

FTSModel API methods:

classmethod create_table([fail_silently=False[, **options]])
Parameters:
  • fail_silently (boolean) – do not re-create if table already exists.
  • options – options passed along when creating the table, e.g. content.
classmethod match(term)

Shorthand for generating a MATCH expression for the given term(s).

query = (DocumentIndex
         .select()
         .where(DocumentIndex.match('search phrase')))
for doc in query:
    print 'match: ', doc.title
classmethod search(term[, weights=None[, with_score=False[, score_alias='score']]])

Shorthand way of searching for a term and sorting results by the quality of the match. This is equivalent to the rank() example code presented below.

Parameters:
  • term (str) – Search term to use.
  • weights – A list of weights for the columns, ordered with respect to the column’s position in the table. Or, a dictionary keyed by the field or field name and mapped to a value.
  • with_score – Whether the score should be returned as part of the SELECT statement.
  • score_alias (str) – Alias to use for the calculated rank score. This is the attribute you will use to access the score if with_score=True.
# Simple search.
docs = DocumentIndex.search('search term')
for result in docs:
    print result.title

# More complete example.
docs = DocumentIndex.search(
    'search term',
    weights={'title': 2.0, 'content': 1.0},
    with_score=True,
    score_alias='search_score')
for result in docs:
    print result.title, result.search_score
classmethod rank([col1_weight, col2_weight...coln_weight])

Generate an expression that will calculate and return the quality of the search match. This rank can be used to sort the search results. The lower the rank, the better the match.

The rank function accepts optional parameters that allow you to specify weights for the various columns. If no weights are specified, all columns are considered of equal importance.

query = (DocumentIndex
         .select(
             DocumentIndex,
             DocumentIndex.rank().alias('score'))
         .where(DocumentIndex.match('search phrase'))
         .order_by(DocumentIndex.rank()))

for search_result in query:
    print search_result.title, search_result.score
classmethod search_bm25(term[, weights=None[, with_score=False[, score_alias='score']]])

Shorthand way of searching for a term and sorting results by the quality of the match, as determined by the BM25 algorithm. This is equivalent to the bm25() example code presented below.

Parameters:
  • term (str) – Search term to use.
  • weights – A list of weights for the columns, ordered with respect to the column’s position in the table. Or, a dictionary keyed by the field or field name and mapped to a value.
  • with_score – Whether the score should be returned as part of the SELECT statement.
  • score_alias (str) – Alias to use for the calculated rank score. This is the attribute you will use to access the score if with_score=True.
# Simple search.
docs = DocumentIndex.search('search term')
for result in docs:
    print result.title

# More complete example.
docs = DocumentIndex.search(
    'search term',
    weights={'title': 2.0, 'content': 1.0},
    with_score=True,
    score_alias='search_score')
for result in docs:
    print result.title, result.search_score
classmethod bm25([col1_weight, col2_weight...coln_weight])

Generate an expression that will calculate and return the quality of the search match using the BM25 algorithm. This value can be used to sort the search results, and the lower the value the better the match.

The bm25 function accepts optional parameters that allow you to specify weights for the various columns. If no weights are specified, all columns are considered of equal importance.

query = (DocumentIndex
         .select(
             DocumentIndex,
             DocumentIndex.bm25().alias('score'))
         .where(DocumentIndex.match('search phrase'))
         .order_by(DocumentIndex.bm25()))

for search_result in query:
    print search_result.title, search_result.score
classmethod rebuild()

Rebuild the search index – this only works when the content option was specified during table creation.

classmethod optimize()

Optimize the search index.

class SearchField([unindexed=False[, db_column=None[, coerce=None]]])
Parameters:
  • unindexed – Whether the contents of this field should be excluded from the full-text search index.
  • db_column – Name of the underlying database column.
  • coerce – Function used to convert the value from the database into the appropriate Python format.
class JSONField

Field class suitable for working with JSON stored and manipulated using the JSON1 extension.

Most functions that operate on JSON fields take a path argument. The JSON documents specify that the path should begin with '$' followed by zero or more instances of '.objectlabel' or '[arrayindex]'. Peewee simplifies this by allowing you to omit the '$' character and just specify the path you need or None for an empty path:

  • path='' –> '$'
  • path='tags' –> '$.tags'
  • path='[0][1].bar' –> '$[0][1].bar'
  • path='metadata[0]' –> '$.metadata[0]'
  • path='user.data.email' –> '$.user.data.email'
length([path=None])

Return the number of items in a JSON array at the given path. If the path is omitted, then return the number of items in the top-level array.

SQLite documentation.

extract(path)

Return the value at the given path. If the value is a JSON object or array, it will be decoded into a dict or list. If the value is a scalar type, string or null then it will be returned as the appropriate Python type.

SQLite documentation.

Example:

# data looks like {'post': {'title': 'post 1', 'body': '...'}, ...}
query = (Post
         .select(Post.data.json_extract('post.title'))
         .tuples())

# Only the `title` value is extracted from the JSON data.
for title, in query:
    print title
set(path, value[, path2, value2...])

Set values stored in the input JSON string using the given path/value pairs. The set function returns a new JSON string formed by updating the input JSON with the given path/value pairs.

If the path does not exist, it will be created.

Similarly, if the path does exist, it will be overwritten.

SQLite documentation.

Example:

PostAlias = Post.alias()
set_query = (PostAlias
             .select(PostAlias.data.set(
                 'title', 'New title',
                 'tags', ['list', 'of', 'new', 'tags'],
                 'totally.new.field', 3,
                 'status.published', True))
             .where(PostAlias.id == Post.id))

# Update multiple fields at one time on the Post
# with the title "Old title".
query = (Post
         .update(data=set_query)
         .where(Post.data.extract('title') == 'Old title'))
query.execute()

post = (Post
        .select()
        .where(Post.data.extract('title') == 'New title')
        .get())

# Our new data has been added, even nested objects that did not
# exist before. Any pre-existing data has also been preserved,
# provided it was not over-written.
assert post.data == {
    'title': 'New title',
    'tags': ['list', 'of', 'new', 'tags'],
    'totally': {'new': {'field: 3}},
    'status': {'published': True, 'draft': False},
    'other-field': ['this', 'was', 'here', 'before'],
    'another-old-field': 'etc, etc'}
insert(path, value[, path2, value2...])

Insert the given path/value pairs into the JSON string stored in the field. The insert function returns a new JSON string formed by updating the input JSON with the given path/value pairs.

If the path already exists, it will not be overwritten.

SQLite documentation.

replace(path, value[, path2, value2...])

Replace values stored in the input JSON string using the given path/value pairs. The replace function returns a new JSON string formed by updating the input JSON with the given path/value pairs.

If the path does not exist, it will not be created.

SQLite documentation.

remove(*paths)

Remove values referenced by the given path(s). The remove function returns a new JSON string formed by removing the specified paths from the input JSON string.

The process for removing fields from a JSON column is similar to the way you set() them. For a code example, see updating JSON data.

SQLite documentation.

json_type([path=None])

Return a string indicating the type of object stored in the field. You can optionally supply a path to specify a sub-item. The types of objects are:

  • object
  • array
  • integer
  • real
  • true
  • false
  • text
  • null <– the string “null” means an actual NULL value
  • NULL <– an actual NULL value means the path was not found

SQLite documentation.

children([path=None])

The children function corresponds to json_each, a table-valued function that walks the JSON value provided and returns the immediate children of the top-level array or object. If a path is specified, then that path is treated as the top-most element.

The rows returned by calls to children() have the following attributes:

  • key: the key of the current element relative to its parent.
  • value: the value of the current element.
  • type: one of the data-types (see json_type()).
  • atom: the scalar value for primitive types, NULL for arrays and objects.
  • id: a unique ID referencing the current node in the tree.
  • parent: the ID of the containing node.
  • fullkey: the full path describing the current element.
  • path: the path to the container of the current row.

For examples, see my blog post on JSON1.

SQLite documentation.

tree([path=None])

The tree function corresponds to json_tree, a table-valued function that walks the JSON value provided and recursively returns all descendants of the given root node. If a path is specified, then that path is treated as the root node element.

The rows returned by calls to tree() have the same attributes as rows returned by calls to children().

For examples, see my blog post on JSON1.

SQLite documentation.

class PrimaryKeyAutoIncrementField

Subclass of PrimaryKeyField that uses a monotonically-increasing value for the primary key. This differs from the default SQLite primary key, which simply uses the “max + 1” approach to determining the next ID.

class RowIDField

Subclass of PrimaryKeyField that provides access to the underlying rowid field used internally by SQLite.

Note

When added to a Model, this field will act as the primary key. However, this field will not be included by default when selecting rows from the table.

class DocIDField

Subclass of PrimaryKeyField that provides access to the underlying docid field used internally by SQLite’s FTS3/4 virtual tables.

Note

This field should not be created manually, as it is only needed on FTSModel classes, which include it already.

match(lhs, rhs)

Generate a SQLite MATCH expression for use in full-text searches.

Document.select().where(match(Document.content, 'search term'))
class FTS5Model

Model class that should be used to implement virtual tables using the FTS5 extension. Documentation on the FTS5 extension can be found here. This extension behaves very similarly to the FTS3 and FTS4 extensions, and the FTS5Model supports many of the same APIs as FTSModel.

The FTS5 extension is more strict in enforcing that no column define any type or constraints. For this reason, only SearchField objects can be used with FTS5Model implementations.

Additionally, FTS5 comes with a built-in implementation of the BM25 ranking function. Therefore, the search and search_bm25 methods have been overridden to use the builtin ranking functions rather than user-defined functions.

classmethod fts5_installed()

Return a boolean indicating whether the FTS5 extension is installed. If it is not installed, an attempt will be made to load the extension.

classmethod search(term[, weights=None[, with_score=False[, score_alias='score']]])

Shorthand way of searching for a term and sorting results by the quality of the match. This is equivalent to the built-in rank value provided by the FTS5 extension.

Parameters:
  • term (str) – Search term to use.
  • weights – A list of weights for the columns, ordered with respect to the column’s position in the table. Or, a dictionary keyed by the field or field name and mapped to a value.
  • with_score – Whether the score should be returned as part of the SELECT statement.
  • score_alias (str) – Alias to use for the calculated rank score. This is the attribute you will use to access the score if with_score=True.
# Simple search.
docs = DocumentIndex.search('search term')
for result in docs:
    print result.title

# More complete example.
docs = DocumentIndex.search(
    'search term',
    weights={'title': 2.0, 'content': 1.0},
    with_score=True,
    score_alias='search_score')
for result in docs:
    print result.title, result.search_score
classmethod search_bm25(term[, weights=None[, with_score=False[, score_alias='score']]])

With FTS5, the search_bm25 method is the same as the FTS5Model.search() method.

classmethod VocabModel([table_type='row'|'col'[, table_name=None]])
Parameters:
  • table_type – Either 'row' or 'col'.
  • table_name – Name for the vocab table. If not specified, will be “fts5tablename_v”.
ClosureTable(model_class[, foreign_key=None[, referencing_class=None, referencing_key=None]])

Factory function for creating a model class suitable for working with a transitive closure table. Closure tables are VirtualModel subclasses that work with the transitive closure SQLite extension. These special tables are designed to make it easy to efficiently query heirarchical data. The SQLite extension manages an AVL tree behind-the-scenes, transparently updating the tree when your table changes and making it easy to perform common queries on heirarchical data.

To use the closure table extension in your project, you need:

  1. A copy of the SQLite extension. The source code can be found in the SQLite code repository or by cloning this gist:

    $ git clone https://gist.github.com/coleifer/7f3593c5c2a645913b92 closure
    $ cd closure/
    
  2. Compile the extension as a shared library, e.g.

    $ gcc -g -fPIC -shared closure.c -o closure.so
    
  3. Create a model for your hierarchical data. The only requirement here is that the model has an integer primary key and a self-referential foreign key. Any additional fields are fine.

    class Category(Model):
        name = CharField()
        metadata = TextField()
        parent = ForeignKeyField('self', index=True, null=True)  # Required.
    
    # Generate a model for the closure virtual table.
    CategoryClosure = ClosureTable(Category)
    

    The self-referentiality can also be achieved via an intermediate table (for a many-to-many relation).

    class User(Model):
        name = CharField()
    
    class UserRelations(Model):
        user = ForeignKeyField(User)
        knows = ForeignKeyField(User, related_name='_known_by')
    
        class Meta:
            primary_key = CompositeKey('user', 'knows') # Alternatively, a unique index on both columns.
    
    # Generate a model for the closure virtual table, specifying the UserRelations as the referencing table
    UserClosure = ClosureTable(
        User,
        referencing_class=UserRelations,
        foreign_key=UserRelations.knows,
        referencing_key=UserRelations.user)
    
  4. In your application code, make sure you load the extension when you instantiate your Database object. This is done by passing the path to the shared library to the load_extension() method.

    db = SqliteExtDatabase('my_database.db')
    db.load_extension('/path/to/closure')
    
Parameters:
  • model_class – The model class containing the nodes in the tree.
  • foreign_key – The self-referential parent-node field on the model class. If not provided, peewee will introspect the model to find a suitable key.
  • referencing_class – The intermediate table for a many-to-many relationship.
  • referencing_key – For a many-to-many relationship: the originating side of the relation.
Returns:

Returns a VirtualModel for working with a closure table.

Warning

There are two caveats you should be aware of when using the transitive_closure extension. First, it requires that your source model have an integer primary key. Second, it is strongly recommended that you create an index on the self-referential foreign key.

Example code:

db = SqliteExtDatabase('my_database.db')
db.load_extension('/path/to/closure')

class Category(Model):
    name = CharField()
    parent = ForiegnKeyField('self', index=True, null=True)  # Required.

    class Meta:
        database = db

CategoryClosure = ClosureTable(Category)

# Create the tables if they do not exist.
db.create_tables([Category, CategoryClosure], True)

It is now possible to perform interesting queries using the data from the closure table:

# Get all ancestors for a particular node.
laptops = Category.get(Category.name == 'Laptops')
for parent in Closure.ancestors(laptops):
    print parent.name

# Computer Hardware
# Computers
# Electronics
# All products

# Get all descendants for a particular node.
hardware = Category.get(Category.name == 'Computer Hardware')
for node in Closure.descendants(hardware):
    print node.name

# Laptops
# Desktops
# Hard-drives
# Monitors
# LCD Monitors
# LED Monitors

The VirtualTable returned by this function contains a handful of interesting methods. The model will be a subclass of BaseClosureTable.

class BaseClosureTable
id

A field for the primary key of the given node.

depth

A field representing the relative depth of the given node.

root

A field representing the relative root node.

descendants(node[, depth=None[, include_node=False]])

Retrieve all descendants of the given node. If a depth is specified, only nodes at that depth (relative to the given node) will be returned.

node = Category.get(Category.name == 'Electronics')

# Direct child categories.
children = CategoryClosure.descendants(node, depth=1)

# Grand-child categories.
children = CategoryClosure.descendants(node, depth=2)

# Descendants at all depths.
all_descendants = CategoryClosure.descendants(node)
ancestors(node[, depth=None[, include_node=False]])

Retrieve all ancestors of the given node. If a depth is specified, only nodes at that depth (relative to the given node) will be returned.

node = Category.get(Category.name == 'Laptops')

# All ancestors.
all_ancestors = CategoryClosure.ancestors(node)

# Grand-parent category.
grandparent = CategoryClosure.ancestores(node, depth=2)
siblings(node[, include_node=False])

Retrieve all nodes that are children of the specified node’s parent.

Note

For an in-depth discussion of the SQLite transitive closure extension, check out this blog post, Querying Tree Structures in SQLite using Python and the Transitive Closure Extension.

SqliteQ

The playhouse.sqliteq module provides a subclass of SqliteExtDatabase, that will serialize concurrent writes to a SQLite database. SqliteQueueDatabase can be used as a drop-in replacement for the regular SqliteDatabase if you want simple read and write access to a SQLite database from multiple threads.

SQLite only allows one connection to write to the database at any given time. As a result, if you have a multi-threaded application (like a web-server, for example) that needs to write to the database, you may see occasional errors when one or more of the threads attempting to write cannot acquire the lock.

SqliteQueueDatabase is designed to simplify things by sending all write queries through a single, long-lived connection. The benefit is that you get the appearance of multiple threads writing to the database without conflicts or timeouts. The downside, however, is that you cannot issue write transactions that encompass multiple queries – all writes run in autocommit mode, essentially.

Note

The module gets its name from the fact that all write queries get put into a thread-safe queue. A single worker thread listens to the queue and executes all queries that are sent to it.

Transactions

Because all queries are serialized and executed by a single worker thread, it is possible for transactional SQL from separate threads to be executed out-of-order. In the example below, the transaction started by thread “B” is rolled back by thread “A” (with bad consequences!):

  • Thread A: UPDATE transplants SET organ=’liver’, …;
  • Thread B: BEGIN TRANSACTION;
  • Thread B: UPDATE life_support_system SET timer += 60 …;
  • Thread A: ROLLBACK; – Oh no….

Since there is a potential for queries from separate transactions to be interleaved, the transaction() and atomic() methods are disabled on SqliteQueueDatabase.

For cases when you wish to temporarily write to the database from a different thread, you can use the pause() and unpause() methods. These methods block the caller until the writer thread is finished with its current workload. The writer then disconnects and the caller takes over until unpause is called.

The stop(), start(), and is_stopped() methods can also be used to control the writer thread.

Note

Take a look at SQLite’s isolation documentation for more information about how SQLite handles concurrent connections.

Code sample

Creating a database instance does not require any special handling. The SqliteQueueDatabase accepts some special parameters which you should be aware of, though. If you are using gevent, you must specify use_gevent=True when instantiating your database – this way Peewee will know to use the appropriate objects for handling queueing, thread creation, and locking.

from playhouse.sqliteq import SqliteQueueDatabase

db = SqliteQueueDatabase(
    'my_app.db',
    use_gevent=False,  # Use the standard library "threading" module.
    autostart=False,  # The worker thread now must be started manually.
    queue_max_size=64,  # Max. # of pending writes that can accumulate.
    results_timeout=5.0)  # Max. time to wait for query to be executed.

If autostart=False, as in the above example, you will need to call start() to bring up the worker threads that will do the actual write query execution.

@app.before_first_request
def _start_worker_threads():
    db.start()

If you plan on performing SELECT queries or generally wanting to access the database, you will need to call connect() and close() as you would with any other database instance.

When your application is ready to terminate, use the stop() method to shut down the worker thread. If there was a backlog of work, then this method will block until all pending work is finished (though no new work is allowed).

import atexit

@atexit.register
def _stop_worker_threads():
    db.stop()

Lastly, the is_stopped() method can be used to determine whether the database writer is up and running.

Sqlite User-Defined Functions

The sqlite_udf playhouse module contains a number of user-defined functions, aggregates, and table-valued functions, which you may find useful. The functions are grouped in collections and you can register these user-defined extensions individually, by collection, or register everything.

Scalar functions are functions which take a number of parameters and return a single value. For example, converting a string to upper-case, or calculating the MD5 hex digest.

Aggregate functions are like scalar functions that operate on multiple rows of data, producing a single result. For example, calculating the sum of a list of integers, or finding the smallest value in a particular column.

Table-valued functions are simply functions that can return multiple rows of data. For example, a regular-expression search function that returns all the matches in a given string, or a function that accepts two dates and generates all the intervening days.

Note

To use table-valued functions, you will need to install the vtfunc module. The vtfunc module is available on GitHub or can be installed using pip.

Functions, listed by collection name

Scalar functions are indicated by (f), aggregate functions by (a), and table-valued functions by (t).

  • CONTROL_FLOW * if_then_else() (f)
  • DATE * strip_tz() (f) * human_delta() (f) * mintdiff() (a) * avgtdiff() (a) * duration() (a) * date_series() (t)
  • FILE * file_ext() (f) * file_read() (f)
  • HELPER * gzip() (f) * gunzip() (f) * hostname() (f) * toggle() (f) * setting() (f) * clear_toggles() (f) * clear_settings() (f)
  • MATH * randomrange() (f) * gauss_distribution() (f) * sqrt() (f) * tonumber() (f) * mode() (a) * minrange() (a) * avgrange() (a) * range() (a) * median() (a) (requires cython)
  • STRING * substr_count() (f) * strip_chars() (f) * md5() (f) * sha1() (f) * sha256() (f) * sha512() (f) * adler32() (f) * crc32() (f) * damerau_levenshtein_dist() (f) (requires cython) * levenshtein_dist() (f) (requires cython) * str_dist() (f) (requires cython) * regex_search() (t)

apsw, an advanced sqlite driver

The apsw_ext module contains a database class suitable for use with the apsw sqlite driver.

APSW Project page: https://github.com/rogerbinns/apsw

APSW is a really neat library that provides a thin wrapper on top of SQLite’s C interface, making it possible to use all of SQLite’s advanced features.

Here are just a few reasons to use APSW, taken from the documentation:

  • APSW gives all functionality of SQLite, including virtual tables, virtual file system, blob i/o, backups and file control.
  • Connections can be shared across threads without any additional locking.
  • Transactions are managed explicitly by your code.
  • APSW can handle nested transactions.
  • Unicode is handled correctly.
  • APSW is faster.

For more information on the differences between apsw and pysqlite, check the apsw docs.

How to use the APSWDatabase

from apsw_ext import *

db = APSWDatabase(':memory:')

class BaseModel(Model):
    class Meta:
        database = db

class SomeModel(BaseModel):
    col1 = CharField()
    col2 = DateTimeField()

apsw_ext API notes

APSWDatabase extends the SqliteExtDatabase and inherits its advanced features.

class APSWDatabase(database, **connect_kwargs)
Parameters:
  • database (string) – filename of sqlite database
  • connect_kwargs – keyword arguments passed to apsw when opening a connection
register_module(mod_name, mod_inst)

Provides a way of globally registering a module. For more information, see the documentation on virtual tables.

Parameters:
  • mod_name (string) – name to use for module
  • mod_inst (object) – an object implementing the Virtual Table interface
unregister_module(mod_name)

Unregister a module.

Parameters:mod_name (string) – name to use for module

Note

Be sure to use the Field subclasses defined in the apsw_ext module, as they will properly handle adapting the data types for storage.

For example, instead of using peewee.DateTimeField, be sure you are importing and using playhouse.apsw_ext.DateTimeField.

BerkeleyDB backend

BerkeleyDB provides a SQLite-compatible API. BerkeleyDB’s SQL API has many advantages over SQLite:

  • Higher transactions-per-second in multi-threaded environments.
  • Built-in replication and hot backup.
  • Fewer system calls, less resource utilization.
  • Multi-version concurrency control.

For more details, Oracle has published a short technical overview.

In order to use peewee with BerkeleyDB, you need to compile BerkeleyDB with the SQL API enabled. Then compile the Python SQLite driver against BerkeleyDB’s sqlite replacement.

Begin by downloading and compiling BerkeleyDB:

wget http://download.oracle.com/berkeley-db/db-6.0.30.tar.gz
tar xzf db-6.0.30.tar.gz
cd db-6.0.30/build_unix
export CFLAGS='-DSQLITE_ENABLE_FTS3=1 -DSQLITE_ENABLE_FTS3_PARENTHESIS=1 -DSQLITE_ENABLE_UPDATE_DELETE_LIMIT -DSQLITE_SECURE_DELETE -DSQLITE_SOUNDEX -DSQLITE_ENABLE_RTREE=1 -fPIC'
../dist/configure --enable-static --enable-shared --enable-sql --enable-sql-compat
make
sudo make prefix=/usr/local/ install

Then get a copy of the standard library SQLite driver and build it against BerkeleyDB:

git clone https://github.com/ghaering/pysqlite
cd pysqlite
sed -i "s|#||g" setup.cfg
python setup.py build
sudo python setup.py install

You can also find up-to-date step by step instructions on my blog.

class BerkeleyDatabase(database, **kwargs)
Parameters:
  • multiversion (bool) – Enable multiversion concurrency control. Default is False.
  • page_size (int) – Set the page size PRAGMA. This option only works on new databases.
  • cache_size (int) – Set the cache size PRAGMA.

Subclass of the SqliteExtDatabase that supports connecting to BerkeleyDB-backed version of SQLite.

classmethod check_pysqlite()

Check whether pysqlite2 was compiled against the BerkeleyDB SQLite. Returns True or False.

classmethod check_libsqlite()

Check whether libsqlite3 is the BerkeleyDB SQLite implementation. Returns True or False.

Sqlcipher backend

  • Although this extention’s code is short, it has not been properly peer-reviewed yet and may have introduced vulnerabilities.
  • The code contains minimum values for passphrase length and kdf_iter, as well as a default value for the later. Do not regard these numbers as advice. Consult the docs at http://sqlcipher.net/sqlcipher-api/ and security experts.

Also note that this code relies on pysqlcipher and sqlcipher, and the code there might have vulnerabilities as well, but since these are widely used crypto modules, we can expect “short zero days” there.

sqlcipher_ext API notes

class SqlCipherDatabase(database, passphrase, kdf_iter=64000, **kwargs)

Subclass of SqliteDatabase that stores the database encrypted. Instead of the standard sqlite3 backend, it uses pysqlcipher: a python wrapper for sqlcipher, which – in turn – is an encrypted wrapper around sqlite3, so the API is identical to SqliteDatabase’s, except for object construction parameters:

Parameters:
  • database – Path to encrypted database filename to open [or create].
  • passphrase – Database encryption passphrase: should be at least 8 character long (or an error is raised), but it is strongly advised to enforce better passphrase strength criteria in your implementation.
  • kdf_iter – [Optional] number of PBKDF2 iterations.
  • If the database file doesn’t exist, it will be created with encryption by a key derived from passhprase with kdf_iter PBKDF2 iterations.
  • When trying to open an existing database, passhprase and kdf_iter should be identical to the ones used when it was created.

Notes:

  • [Hopefully] there’s no way to tell whether the passphrase is wrong or the file is corrupt. In both cases – the first time we try to acces the database – a DatabaseError error is raised, with the exact message: "file is encrypted or is not a database".

    As mentioned above, this only happens when you access the databse, so if you need to know right away whether the passphrase was correct, you can trigger this check by calling [e.g.] get_tables() (see example below).

  • Most applications can expect failed attempts to open the database (common case: prompting the user for passphrase), so the database can’t be hardwired into the Meta of model classes. To defer initialization, pass None in to the database.

Example:

db = SqlCipherDatabase(None)

class BaseModel(Model):
    """Parent for all app's models"""
    class Meta:
        # We won't have a valid db until user enters passhrase.
        database = db

# Derive our model subclasses
class Person(BaseModel):
    name = CharField(primary_key=True)

right_passphrase = False
while not right_passphrase:
    db.init(
        'testsqlcipher.db',
        passphrase=get_passphrase_from_user())

    try:  # Actually execute a query against the db to test passphrase.
        db.get_tables()
    except DatabaseError as exc:
        # We only allow a specific [somewhat cryptic] error message.
        if exc.args[0] != 'file is encrypted or is not a database':
            raise exc
        else:
            tell_user_the_passphrase_was_wrong()
            db.init(None)  # Reset the db.
    else:
        # The password was correct.
        right_passphrase = True

See also: a slightly more elaborate example.

Postgresql Extensions

The postgresql extensions module provides a number of “postgres-only” functions, currently:

In the future I would like to add support for more of postgresql’s features. If there is a particular feature you would like to see added, please open a Github issue.

Warning

In order to start using the features described below, you will need to use the extension PostgresqlExtDatabase class instead of PostgresqlDatabase.

The code below will assume you are using the following database and base model:

from playhouse.postgres_ext import *

ext_db = PostgresqlExtDatabase('peewee_test', user='postgres')

class BaseExtModel(Model):
    class Meta:
        database = ext_db

hstore support

Postgresql hstore is an embedded key/value store. With hstore, you can store arbitrary key/value pairs in your database alongside structured relational data.

Currently the postgres_ext module supports the following operations:

  • Store and retrieve arbitrary dictionaries
  • Filter by key(s) or partial dictionary
  • Update/add one or more keys to an existing dictionary
  • Delete one or more keys from an existing dictionary
  • Select keys, values, or zip keys and values
  • Retrieve a slice of keys/values
  • Test for the existence of a key
  • Test that a key has a non-NULL value

Using hstore

To start with, you will need to import the custom database class and the hstore functions from playhouse.postgres_ext (see above code snippet). Then, it is as simple as adding a HStoreField to your model:

class House(BaseExtModel):
    address = CharField()
    features = HStoreField()

You can now store arbitrary key/value pairs on House instances:

>>> h = House.create(address='123 Main St', features={'garage': '2 cars', 'bath': '2 bath'})
>>> h_from_db = House.get(House.id == h.id)
>>> h_from_db.features
{'bath': '2 bath', 'garage': '2 cars'}

You can filter by keys or partial dictionary:

>>> f = House.features
>>> House.select().where(f.contains('garage')) # <-- all houses w/garage key
>>> House.select().where(f.contains(['garage', 'bath'])) # <-- all houses w/garage & bath
>>> House.select().where(f.contains({'garage': '2 cars'})) # <-- houses w/2-car garage

Suppose you want to do an atomic update to the house:

>>> f = House.features
>>> new_features = House.features.update({'bath': '2.5 bath', 'sqft': '1100'})
>>> query = House.update(features=new_features)
>>> query.where(House.id == h.id).execute()
1
>>> h = House.get(House.id == h.id)
>>> h.features
{'bath': '2.5 bath', 'garage': '2 cars', 'sqft': '1100'}

Or, alternatively an atomic delete:

>>> query = House.update(features=f.delete('bath'))
>>> query.where(House.id == h.id).execute()
1
>>> h = House.get(House.id == h.id)
>>> h.features
{'garage': '2 cars', 'sqft': '1100'}

Multiple keys can be deleted at the same time:

>>> query = House.update(features=f.delete('garage', 'sqft'))

You can select just keys, just values, or zip the two:

>>> f = House.features
>>> for h in House.select(House.address, f.keys().alias('keys')):
...     print h.address, h.keys

123 Main St [u'bath', u'garage']

>>> for h in House.select(House.address, f.values().alias('vals')):
...     print h.address, h.vals

123 Main St [u'2 bath', u'2 cars']

>>> for h in House.select(House.address, f.items().alias('mtx')):
...     print h.address, h.mtx

123 Main St [[u'bath', u'2 bath'], [u'garage', u'2 cars']]

You can retrieve a slice of data, for example, all the garage data:

>>> f = House.features
>>> for h in House.select(House.address, f.slice('garage').alias('garage_data')):
...     print h.address, h.garage_data

123 Main St {'garage': '2 cars'}

You can check for the existence of a key and filter rows accordingly:

>>> for h in House.select(House.address, f.exists('garage').alias('has_garage')):
...     print h.address, h.has_garage

123 Main St True

>>> for h in House.select().where(f.exists('garage')):
...     print h.address, h.features['garage'] # <-- just houses w/garage data

123 Main St 2 cars

Interval support

Postgres supports durations through the INTERVAL data-type (docs).

class IntervalField([null=False[, ...]])

Field class capable of storing Python datetime.timedelta instances.

Example:

from datetime import timedelta

from playhouse.postgres_ext import *

db = PostgresqlExtDatabase('my_db')

class Event(Model):
    location = CharField()
    duration = IntervalField()
    start_time = DateTimeField()

    class Meta:
        database = db

    @classmethod
    def get_long_meetings(cls):
        return cls.select().where(cls.duration > timedelta(hours=1))

JSON Support

peewee has basic support for Postgres’ native JSON data type, in the form of JSONField. As of version 2.4.7, peewee also supports the Postgres 9.4 binary json jsonb type, via BinaryJSONField.

Warning

Postgres supports a JSON data type natively as of 9.2 (full support in 9.3). In order to use this functionality you must be using the correct version of Postgres with psycopg2 version 2.5 or greater.

To use BinaryJSONField, which has many performance and querying advantages, you must have Postgres 9.4 or later.

Note

You must be sure your database is an instance of PostgresqlExtDatabase in order to use the JSONField.

Here is an example of how you might declare a model with a JSON field:

import json
import urllib2
from playhouse.postgres_ext import *

db = PostgresqlExtDatabase('my_database')  # note

class APIResponse(Model):
    url = CharField()
    response = JSONField()

    class Meta:
        database = db

    @classmethod
    def request(cls, url):
        fh = urllib2.urlopen(url)
        return cls.create(url=url, response=json.loads(fh.read()))

APIResponse.create_table()

# Store a JSON response.
offense = APIResponse.request('http://wtf.charlesleifer.com/api/offense/')
booking = APIResponse.request('http://wtf.charlesleifer.com/api/booking/')

# Query a JSON data structure using a nested key lookup:
offense_responses = APIResponse.select().where(
  APIResponse.response['meta']['model'] == 'offense')

# Retrieve a sub-key for each APIResponse. By calling .as_json(), the
# data at the sub-key will be returned as Python objects (dicts, lists,
# etc) instead of serialized JSON.
q = (APIResponse
     .select(
       APIResponse.data['booking']['person'].as_json().alias('person'))
     .where(
       APIResponse.data['meta']['model'] == 'booking'))

for result in q:
    print result.person['name'], result.person['dob']

The BinaryJSONField works the same and supports the same operations as the regular JSONField, but provides several additional operations for testing containment. Using the binary json field, you can test whether your JSON data contains other partial JSON structures (contains(), contains_any(), contains_all()), or whether it is a subset of a larger JSON document (contained_by()).

For more examples, see the JSONField and BinaryJSONField API documents below.

Server-side cursors

When psycopg2 executes a query, normally all results are fetched and returned to the client by the backend. This can cause your application to use a lot of memory when making large queries. Using server-side cursors, results are returned a little at a time (by default 2000 records). For the definitive reference, please see the psycopg2 documentation.

Note

To use server-side (or named) cursors, you must be using PostgresqlExtDatabase.

To execute a query using a server-side cursor, simply wrap your select query using the ServerSide() helper:

large_query = PageView.select()  # Build query normally.

# Iterate over large query inside a transaction.
for page_view in ServerSide(large_query):
    # do some interesting analysis here.
    pass

# Server-side resources are released.

If you would like all SELECT queries to automatically use a server-side cursor, you can specify this when creating your PostgresqlExtDatabase:

from postgres_ext import PostgresqlExtDatabase

ss_db = PostgresqlExtDatabase('my_db', server_side_cursors=True)

Note

Server-side cursors live only as long as the transaction, so for this reason peewee will not automatically call commit() after executing a SELECT query. If you do not commit after you are done iterating, you will not release the server-side resources until the connection is closed (or the transaction is committed later). Furthermore, since peewee will by default cache rows returned by the cursor, you should always call .iterator() when iterating over a large query.

If you are using the ServerSide() helper, the transaction and call to iterator() will be handled transparently.

postgres_ext API notes

class PostgresqlExtDatabase(database[, server_side_cursors=False[, register_hstore=True[, ...]]])

Identical to PostgresqlDatabase but required in order to support:

Parameters:
  • database (str) – Name of database to connect to.
  • server_side_cursors (bool) – Whether SELECT queries should utilize server-side cursors.
  • register_hstore (bool) – Register the HStore extension with the connection.

If using server_side_cursors, also be sure to wrap your queries with ServerSide().

If you do not wish to use the HStore extension, you can specify register_hstore=False.

Warning

The PostgresqlExtDatabase by default will attempt to register the HSTORE extension. Most distributions and recent versions include this, but in some cases the extension may not be available. If you do not plan to use the HStore features of peewee, you can pass register_hstore=False when initializing your PostgresqlExtDatabase.

ServerSide(select_query)

Wrap the given select query in a transaction, and call it’s iterator() method to avoid caching row instances. In order for the server-side resources to be released, be sure to exhaust the generator (iterate over all the rows).

Parameters:select_query – a SelectQuery instance.
Return type:generator

Usage:

large_query = PageView.select()
for page_view in ServerSide(large_query):
    # Do something interesting.
    pass

# At this point server side resources are released.
class ArrayField([field_class=IntegerField[, dimensions=1]])

Field capable of storing arrays of the provided field_class.

Parameters:
  • field_class – a subclass of Field, e.g. IntegerField.
  • dimensions (int) – dimensions of array.

You can store and retrieve lists (or lists-of-lists):

class BlogPost(BaseModel):
    content = TextField()
    tags = ArrayField(CharField)


post = BlogPost(content='awesome', tags=['foo', 'bar', 'baz'])

Additionally, you can use the __getitem__ API to query values or slices in the database:

# Get the first tag on a given blog post.
first_tag = (BlogPost
             .select(BlogPost.tags[0].alias('first_tag'))
             .where(BlogPost.id == 1)
             .dicts()
             .get())

# first_tag = {'first_tag': 'foo'}

Get a slice of values:

# Get the first two tags.
two_tags = (BlogPost
            .select(BlogPost.tags[:2].alias('two'))
            .dicts()
            .get())
# two_tags = {'two': ['foo', 'bar']}
contains(*items)
Parameters:items – One or more items that must be in the given array field.
# Get all blog posts that are tagged with both "python" and "django".
Blog.select().where(Blog.tags.contains('python', 'django'))
contains_any(*items)
Parameters:items – One or more items to search for in the given array field.

Like contains(), except will match rows where the array contains any of the given items.

# Get all blog posts that are tagged with "flask" and/or "django".
Blog.select().where(Blog.tags.contains_any('flask', 'django'))
class DateTimeTZField(*args, **kwargs)

A timezone-aware subclass of DateTimeField.

class HStoreField(*args, **kwargs)

A field for storing and retrieving arbitrary key/value pairs. For details on usage, see hstore support.

keys()

Returns the keys for a given row.

>>> f = House.features
>>> for h in House.select(House.address, f.keys().alias('keys')):
...     print h.address, h.keys

123 Main St [u'bath', u'garage']
values()

Return the values for a given row.

>>> for h in House.select(House.address, f.values().alias('vals')):
...     print h.address, h.vals

123 Main St [u'2 bath', u'2 cars']
items()

Like python’s dict, return the keys and values in a list-of-lists:

>>> for h in House.select(House.address, f.items().alias('mtx')):
...     print h.address, h.mtx

123 Main St [[u'bath', u'2 bath'], [u'garage', u'2 cars']]
slice(*args)

Return a slice of data given a list of keys.

>>> f = House.features
>>> for h in House.select(House.address, f.slice('garage').alias('garage_data')):
...     print h.address, h.garage_data

123 Main St {'garage': '2 cars'}
exists(key)

Query for whether the given key exists.

>>> for h in House.select(House.address, f.exists('garage').alias('has_garage')):
...     print h.address, h.has_garage

123 Main St True

>>> for h in House.select().where(f.exists('garage')):
...     print h.address, h.features['garage'] # <-- just houses w/garage data

123 Main St 2 cars
defined(key)

Query for whether the given key has a value associated with it.

update(**data)

Perform an atomic update to the keys/values for a given row or rows.

>>> query = House.update(features=House.features.update(
...     sqft=2000,
...     year_built=2012))
>>> query.where(House.id == 1).execute()
delete(*keys)

Delete the provided keys for a given row or rows.

Note

We will use an UPDATE query.


>>> query = House.update(features=House.features.delete(
...     'sqft', 'year_built'))
>>> query.where(House.id == 1).execute()
contains(value)
Parameters:value – Either a dict, a list of keys, or a single key.

Query rows for the existence of either:

  • a partial dictionary.
  • a list of keys.
  • a single key.
>>> f = House.features
>>> House.select().where(f.contains('garage')) # <-- all houses w/garage key
>>> House.select().where(f.contains(['garage', 'bath'])) # <-- all houses w/garage & bath
>>> House.select().where(f.contains({'garage': '2 cars'})) # <-- houses w/2-car garage
contains_any(*keys)
Parameters:keys – One or more keys to search for.

Query rows for the existince of any key.

class JSONField(dumps=None, *args, **kwargs)

Field class suitable for storing and querying arbitrary JSON. When using this on a model, set the field’s value to a Python object (either a dict or a list). When you retrieve your value from the database it will be returned as a Python data structure.

Parameters:dumps – The default is to call json.dumps() or the dumps function. You can override this method to create a customized JSON wrapper.

Note

You must be using Postgres 9.2 / psycopg2 2.5 or greater.

Note

If you are using Postgres 9.4, strongly consider using the BinaryJSONField instead as it offers better performance and more powerful querying options.

Example model declaration:

db = PostgresqlExtDatabase('my_db')

class APIResponse(Model):
    url = CharField()
    response = JSONField()

    class Meta:
        database = db

Example of storing JSON data:

url = 'http://foo.com/api/resource/'
resp = json.loads(urllib2.urlopen(url).read())
APIResponse.create(url=url, response=resp)

APIResponse.create(url='http://foo.com/baz/', response={'key': 'value'})

To query, use Python’s [] operators to specify nested key or array lookups:

APIResponse.select().where(
    APIResponse.response['key1']['nested-key'] == 'some-value')

To illustrate the use of the [] operators, imagine we have the following data stored in an APIResponse:

{
  "foo": {
    "bar": ["i1", "i2", "i3"],
    "baz": {
      "huey": "mickey",
      "peewee": "nugget"
    }
  }
}

Here are the results of a few queries:

def get_data(expression):
    # Helper function to just retrieve the results of a
    # particular expression.
    query = (APIResponse
             .select(expression.alias('my_data'))
             .dicts()
             .get())
    return query['my_data']

# Accessing the foo -> bar subkey will return a JSON
# representation of the list.
get_data(APIResponse.data['foo']['bar'])
# '["i1", "i2", "i3"]'

# In order to retrieve this list as a Python list,
# we will call .as_json() on the expression.
get_data(APIResponse.data['foo']['bar'].as_json())
# ['i1', 'i2', 'i3']

# Similarly, accessing the foo -> baz subkey will
# return a JSON representation of the dictionary.
get_data(APIResponse.data['foo']['baz'])
# '{"huey": "mickey", "peewee": "nugget"}'

# Again, calling .as_json() will return an actual
# python dictionary.
get_data(APIResponse.data['foo']['baz'].as_json())
# {'huey': 'mickey', 'peewee': 'nugget'}

# When dealing with simple values, either way works as
# you expect.
get_data(APIResponse.data['foo']['bar'][0])
# 'i1'

# Calling .as_json() when the result is a simple value
# will return the same thing as the previous example.
get_data(APIResponse.data['foo']['bar'][0].as_json())
# 'i1'
class BinaryJSONField(dumps=None, *args, **kwargs)

Store and query arbitrary JSON documents. Data should be stored using normal Python dict and list objects, and when data is returned from the database, it will be returned using dict and list as well.

For examples of basic query operations, see the above code samples for JSONField. The example queries below will use the same APIResponse model described above.

Parameters:dumps – The default is to call json.dumps() or the dumps function. You can override this method to create a customized JSON wrapper.

Note

You must be using Postgres 9.4 / psycopg2 2.5 or newer. If you are using Postgres 9.2 or 9.3, you can use the regular JSONField instead.

contains(other)

Test whether the given JSON data contains the given JSON fragment or key.

Example:

search_fragment = {
    'foo': {'bar': ['i2']}
}
query = (APIResponse
         .select()
         .where(APIResponse.data.contains(search_fragment)))

# If we're searching for a list, the list items do not need to
# be ordered in a particular way:
query = (APIResponse
         .select()
         .where(APIResponse.data.contains({
             'foo': {'bar': ['i2', 'i1']}})))

We can pass in simple keys as well. To find APIResponses that contain the key foo at the top-level:

APIResponse.select().where(APIResponse.data.contains('foo'))

We can also search sub-keys using square-brackets:

APIResponse.select().where(
    APIResponse.data['foo']['bar'].contains(['i2', 'i1']))
contains_any(*items)

Search for the presence of one or more of the given items.

APIResponse.select().where(
    APIResponse.data.contains_any('foo', 'baz', 'nugget'))

Like contains(), we can also search sub-keys:

APIResponse.select().where(
    APIResponse.data['foo']['bar'].contains_any('i2', 'ix'))
contains_all(*items)

Search for the presence of all of the given items.

APIResponse.select().where(
    APIResponse.data.contains_all('foo'))

Like contains_any(), we can also search sub-keys:

APIResponse.select().where(
    APIResponse.data['foo']['bar'].contains_all('i1', 'i2', 'i3'))
contained_by(other)

Test whether the given JSON document is contained by (is a subset of) the given JSON document. This method is the inverse of contains().

big_doc = {
    'foo': {
        'bar': ['i1', 'i2', 'i3'],
        'baz': {
            'huey': 'mickey',
            'peewee': 'nugget',
        }
    },
    'other_key': ['nugget', 'bear', 'kitten'],
}
APIResponse.select().where(
    APIResponse.data.contained_by(big_doc))
Match(field, query)

Generate a full-text search expression, automatically converting the left-hand operand to a tsvector, and the right-hand operand to a tsquery.

Example:

def blog_search(query):
    return Blog.select().where(
        (Blog.status == Blog.STATUS_PUBLISHED) &
        Match(Blog.content, query))
class TSVectorField

Field type suitable for storing tsvector data. This field will automatically be created with a GIN index for improved search performance.

Note

Data stored in this field will still need to be manually converted to the tsvector type.

Example usage:

class Blog(Model):
    content = TextField()
    search_content = TSVectorField()

content = 'this is a sample blog entry.'
blog_entry = Blog.create(
    content=content,
    search_content=fn.to_tsvector(content))  # Note `to_tsvector()`.

DataSet

The dataset module contains a high-level API for working with databases modeled after the popular project of the same name. The aims of the dataset module are to provide:

  • A simplified API for working with relational data, along the lines of working with JSON.
  • An easy way to export relational data as JSON or CSV.
  • An easy way to import JSON or CSV data into a relational database.

A minimal data-loading script might look like this:

from playhouse.dataset import DataSet

db = DataSet('sqlite:///:memory:')

table = db['sometable']
table.insert(name='Huey', age=3)
table.insert(name='Mickey', age=5, gender='male')

huey = table.find_one(name='Huey')
print huey
# {'age': 3, 'gender': None, 'id': 1, 'name': 'Huey'}

for obj in table:
    print obj
# {'age': 3, 'gender': None, 'id': 1, 'name': 'Huey'}
# {'age': 5, 'gender': 'male', 'id': 2, 'name': 'Mickey'}

You can export or import data using freeze() and thaw():

# Export table content to the `users.json` file.
db.freeze(table.all(), format='json', filename='users.json')

# Import data from a CSV file into a new table. Columns will be automatically
# created for each field in the CSV file.
new_table = db['stats']
new_table.thaw(format='csv', filename='monthly_stats.csv')

Getting started

DataSet objects are initialized by passing in a database URL of the format dialect://user:password@host/dbname. See the Database URL section for examples of connecting to various databases.

# Create an in-memory SQLite database.
db = DataSet('sqlite:///:memory:')

Storing data

To store data, we must first obtain a reference to a table. If the table does not exist, it will be created automatically:

# Get a table reference, creating the table if it does not exist.
table = db['users']

We can now insert() new rows into the table. If the columns do not exist, they will be created automatically:

table.insert(name='Huey', age=3, color='white')
table.insert(name='Mickey', age=5, gender='male')

To update existing entries in the table, pass in a dictionary containing the new values and filter conditions. The list of columns to use as filters is specified in the columns argument. If no filter columns are specified, then all rows will be updated.

# Update the gender for "Huey".
table.update(name='Huey', gender='male', columns=['name'])

# Update all records. If the column does not exist, it will be created.
table.update(favorite_orm='peewee')

Importing data

To import data from an external source, such as a JSON or CSV file, you can use the thaw() method. By default, new columns will be created for any attributes encountered. If you wish to only populate columns that are already defined on a table, you can pass in strict=True.

# Load data from a JSON file containing a list of objects.
table = dataset['stock_prices']
table.thaw(filename='stocks.json', format='json')
table.all()[:3]

# Might print...
[{'id': 1, 'ticker': 'GOOG', 'price': 703},
 {'id': 2, 'ticker': 'AAPL', 'price': 109},
 {'id': 3, 'ticker': 'AMZN', 'price': 300}]

Using transactions

DataSet supports nesting transactions using a simple context manager.

table = db['users']
with db.transaction() as txn:
    table.insert(name='Charlie')

    with db.transaction() as nested_txn:
        # Set Charlie's favorite ORM to Django.
        table.update(name='Charlie', favorite_orm='django', columns=['name'])

        # jk/lol
        nested_txn.rollback()

Inspecting the database

You can use the tables() method to list the tables in the current database:

>>> print db.tables
['sometable', 'user']

And for a given table, you can print the columns:

>>> table = db['user']
>>> print table.columns
['id', 'age', 'name', 'gender', 'favorite_orm']

We can also find out how many rows are in a table:

>>> print len(db['user'])
3

Reading data

To retrieve all rows, you can use the all() method:

# Retrieve all the users.
users = db['user'].all()

# We can iterate over all rows without calling `.all()`
for user in db['user']:
    print user['name']

Specific objects can be retrieved using find() and find_one().

# Find all the users who like peewee.
peewee_users = db['user'].find(favorite_orm='peewee')

# Find Huey.
huey = db['user'].find_one(name='Huey')

Exporting data

To export data, use the freeze() method, passing in the query you wish to export:

peewee_users = db['user'].find(favorite_orm='peewee')
db.freeze(peewee_users, format='json', filename='peewee_users.json')

API

class DataSet(url)

The DataSet class provides a high-level API for working with relational databases.

Parameters:url (str) – A database URL. See Database URL for examples.
tables

Return a list of tables stored in the database. This list is computed dynamically each time it is accessed.

__getitem__(table_name)

Provide a Table reference to the specified table. If the table does not exist, it will be created.

query(sql[, params=None[, commit=True]])
Parameters:
  • sql (str) – A SQL query.
  • params (list) – Optional parameters for the query.
  • commit (bool) – Whether the query should be committed upon execution.
Returns:

A database cursor.

Execute the provided query against the database.

transaction()

Create a context manager representing a new transaction (or savepoint).

freeze(query[, format='csv'[, filename=None[, file_obj=None[, **kwargs]]]])
Parameters:
  • query – A SelectQuery, generated using all() or ~Table.find.
  • format – Output format. By default, csv and json are supported.
  • filename – Filename to write output to.
  • file_obj – File-like object to write output to.
  • kwargs – Arbitrary parameters for export-specific functionality.
thaw(table[, format='csv'[, filename=None[, file_obj=None[, strict=False[, **kwargs]]]]])
Parameters:
  • table (str) – The name of the table to load data into.
  • format – Input format. By default, csv and json are supported.
  • filename – Filename to read data from.
  • file_obj – File-like object to read data from.
  • strict (bool) – Whether to store values for columns that do not already exist on the table.
  • kwargs – Arbitrary parameters for import-specific functionality.
connect()

Open a connection to the underlying database. If a connection is not opened explicitly, one will be opened the first time a query is executed.

close()

Close the connection to the underlying database.

class Table(dataset, name, model_class)

The Table class provides a high-level API for working with rows in a given table.

columns

Return a list of columns in the given table.

model_class

A dynamically-created Model class.

create_index(columns[, unique=False])

Create an index on the given columns:

# Create a unique index on the `username` column.
db['users'].create_index(['username'], unique=True)
insert(**data)

Insert the given data dictionary into the table, creating new columns as needed.

update(columns=None, conjunction=None, **data)

Update the table using the provided data. If one or more columns are specified in the columns parameter, then those columns’ values in the data dictionary will be used to determine which rows to update.

# Update all rows.
db['users'].update(favorite_orm='peewee')

# Only update Huey's record, setting his age to 3.
db['users'].update(name='Huey', age=3, columns=['name'])
find(**query)

Query the table for rows matching the specified equality conditions. If no query is specified, then all rows are returned.

peewee_users = db['users'].find(favorite_orm='peewee')
find_one(**query)

Return a single row matching the specified equality conditions. If no matching row is found then None will be returned.

huey = db['users'].find_one(name='Huey')
all()

Return all rows in the given table.

delete(**query)

Delete all rows matching the given equality conditions. If no query is provided, then all rows will be deleted.

# Adios, Django!
db['users'].delete(favorite_orm='Django')

# Delete all the secret messages.
db['secret_messages'].delete()
freeze([format='csv'[, filename=None[, file_obj=None[, **kwargs]]]])
Parameters:
  • format – Output format. By default, csv and json are supported.
  • filename – Filename to write output to.
  • file_obj – File-like object to write output to.
  • kwargs – Arbitrary parameters for export-specific functionality.
thaw([format='csv'[, filename=None[, file_obj=None[, strict=False[, **kwargs]]]]])
Parameters:
  • format – Input format. By default, csv and json are supported.
  • filename – Filename to read data from.
  • file_obj – File-like object to read data from.
  • strict (bool) – Whether to store values for columns that do not already exist on the table.
  • kwargs – Arbitrary parameters for import-specific functionality.

Django Integration

The Django ORM provides a very high-level abstraction over SQL and as a consequence is in some ways limited in terms of flexibility or expressiveness. I wrote a blog post describing my search for a “missing link” between Django’s ORM and the SQL it generates, concluding that no such layer exists. The djpeewee module attempts to provide an easy-to-use, structured layer for generating SQL queries for use with Django’s ORM.

A couple use-cases might be:

  • Joining on fields that are not related by foreign key (for example UUID fields).
  • Performing aggregate queries on calculated values.
  • Features that Django does not support such as CASE statements.
  • Utilizing SQL functions that Django does not support, such as SUBSTR.
  • Replacing nearly-identical SQL queries with reusable, composable data-structures.

Below is an example of how you might use this:

# Django model.
class Event(models.Model):
    start_time = models.DateTimeField()
    end_time = models.DateTimeField()
    title = models.CharField(max_length=255)

# Suppose we want to find all events that are longer than an hour.  Django
# does not support this, but we can use peewee.
from playhouse.djpeewee import translate
P = translate(Event)
query = (P.Event
         .select()
         .where(
             (P.Event.end_time - P.Event.start_time) > timedelta(hours=1)))

# Now feed our peewee query into Django's `raw()` method:
sql, params = query.sql()
Event.objects.raw(sql, params)

Foreign keys and Many-to-many relationships

The translate() function will recursively traverse the graph of models and return a dictionary populated with everything it finds. Back-references are not searched by default, but can be included by specifying backrefs=True.

Example:

>>> from django.contrib.auth.models import User, Group
>>> from playhouse.djpeewee import translate
>>> translate(User, Group)
{'ContentType': peewee.ContentType,
 'Group': peewee.Group,
 'Group_permissions': peewee.Group_permissions,
 'Permission': peewee.Permission,
 'User': peewee.User,
 'User_groups': peewee.User_groups,
 'User_user_permissions': peewee.User_user_permissions}

As you can see in the example above, although only User and Group were passed in to translate(), several other models which are related by foreign key were also created. Additionally, the many-to-many “through” tables were created as separate models since peewee does not abstract away these types of relationships.

Using the above models it is possible to construct joins. The following example will get all users who belong to a group that starts with the letter “A”:

>>> P = translate(User, Group)
>>> query = P.User.select().join(P.User_groups).join(P.Group).where(
...     fn.Lower(fn.Substr(P.Group.name, 1, 1)) == 'a')
>>> sql, params = query.sql()
>>> print sql  # formatted for legibility
SELECT t1."id", t1."password", ...
FROM "auth_user" AS t1
INNER JOIN "auth_user_groups" AS t2 ON (t1."id" = t2."user_id")
INNER JOIN "auth_group" AS t3 ON (t2."group_id" = t3."id")
WHERE (Lower(Substr(t3."name", %s, %s)) = %s)

djpeewee API

translate(*models, **options)

Translate the given Django models into roughly equivalent peewee models suitable for use constructing queries. Foreign keys and many-to-many relationships will be followed and models generated, although back references are not traversed.

Parameters:
  • models – One or more Django model classes.
  • options – A dictionary of options, see note below.
Returns:

A dict-like object containing the generated models, but which supports dotted-name style lookups.

The following are valid options:

  • recurse: Follow foreign keys and many to many (default: True).
  • max_depth: Maximum depth to recurse (default: None, unlimited).
  • backrefs: Follow backrefs (default: False).
  • exclude: A list of models to exclude.

Fields

This module also contains several field classes that implement additional logic like encryption and compression. There is also a ManyToManyField that makes it easy to work with simple many-to-many relationships.

These fields can be found in the playhouse.fields module.

class ManyToManyField(rel_model[, related_name=None[, through_model=None]])
Parameters:
  • rel_modelModel class.
  • related_name (str) – Name for the automatically-created backref. If not provided, the pluralized version of the model will be used.
  • through_modelModel to use for the intermediary table. If not provided, a simple through table will be automatically created.

The ManyToManyField provides a simple interface for working with many-to-many relationships, inspired by Django. A many-to-many relationship is typically implemented by creating a junction table with foreign keys to the two models being related. For instance, if you were building a syllabus manager for college students, the relationship between students and courses would be many-to-many. Here is the schema using standard APIs:

class Student(Model):
    name = CharField()

class Course(Model):
    name = CharField()

class StudentCourse(Model):
    student = ForeignKeyField(Student)
    course = ForeignKeyField(Course)

To query the courses for a particular student, you would join through the junction table:

# List the courses that "Huey" is enrolled in:
courses = (Course
           .select()
           .join(StudentCourse)
           .join(Student)
           .where(Student.name == 'Huey'))
for course in courses:
    print course.name

The ManyToManyField is designed to simplify this use-case by providing a field-like API for querying and modifying data in the junction table. Here is how our code looks using ManyToManyField:

class Student(Model):
    name = CharField()

class Course(Model):
    name = CharField()
    students = ManyToManyField(Student, related_name='courses')

Note

It does not matter from Peewee’s perspective which model the ManyToManyField goes on, since the back-reference is just the mirror image. In order to write valid Python, though, you will need to add the ManyToManyField on the second model so that the name of the first model is in the scope.

We still need a junction table to store the relationships between students and courses. This model can be accessed by calling the get_through_model() method. This is useful when creating tables.

# Create tables for the students, courses, and relationships between
# the two.
db.create_tables([
    Student,
    Course,
    Course.students.get_through_model()])

When accessed from a model instance, the ManyToManyField exposes a SelectQuery representing the set of related objects. Let’s use the interactive shell to see how all this works:

>>> huey = Student.get(Student.name == 'huey')
>>> [course.name for course in huey.courses]
['English 101', 'CS 101']

>>> engl_101 = Course.get(Course.name == 'English 101')
>>> [student.name for student in engl_101.students]
['Huey', 'Mickey', 'Zaizee']

To add new relationships between objects, you can either assign the objects directly to the ManyToManyField attribute, or call the add() method. The difference between the two is that simply assigning will clear out any existing relationships, whereas add() can preserve existing relationships.

>>> huey.courses = Course.select().where(Course.name.contains('english'))
>>> for course in huey.courses.order_by(Course.name):
...     print course.name
English 101
English 151
English 201
English 221

>>> cs_101 = Course.get(Course.name == 'CS 101')
>>> cs_151 = Course.get(Course.name == 'CS 151')
>>> huey.courses.add([cs_101, cs_151])
>>> [course.name for course in huey.courses.order_by(Course.name)]
['CS 101', 'CS151', 'English 101', 'English 151', 'English 201',
 'English 221']

This is quite a few courses, so let’s remove the 200-level english courses. To remove objects, use the remove() method.

>>> huey.courses.remove(Course.select().where(Course.name.contains('2'))
2
>>> [course.name for course in huey.courses.order_by(Course.name)]
['CS 101', 'CS151', 'English 101', 'English 151']

To remove all relationships from a collection, you can use the clear() method. Let’s say that English 101 is canceled, so we need to remove all the students from it:

>>> engl_101 = Course.get(Course.name == 'English 101')
>>> engl_101.students.clear()

Note

For an overview of implementing many-to-many relationships using standard Peewee APIs, check out the Implementing Many to Many section. For all but the most simple cases, you will be better off implementing many-to-many using the standard APIs.

add(value[, clear_existing=True])
Parameters:
  • value – Either a Model instance, a list of model instances, or a SelectQuery.
  • clear_existing (bool) – Whether to remove existing relationships first.

Associate value with the current instance. You can pass in a single model instance, a list of model instances, or even a SelectQuery.

Example code:

# Huey needs to enroll in a bunch of courses, including all
# the English classes, and a couple Comp-Sci classes.
huey = Student.get(Student.name == 'Huey')

# We can add all the objects represented by a query.
english_courses = Course.select().where(
    Course.name.contains('english'))
huey.courses.add(english_courses)

# We can also add lists of individual objects.
cs101 = Course.get(Course.name == 'CS 101')
cs151 = Course.get(Course.name == 'CS 151')
huey.courses.add([cs101, cs151])
remove(value)
Parameters:value – Either a Model instance, a list of model instances, or a SelectQuery.

Disassociate value from the current instance. Like add(), you can pass in a model instance, a list of model instances, or even a SelectQuery.

Example code:

# Huey is currently enrolled in a lot of english classes
# as well as some Comp-Sci. He is changing majors, so we
# will remove all his courses.
english_courses = Course.select().where(
    Course.name.contains('english'))
huey.courses.remove(english_courses)

# Remove the two Comp-Sci classes Huey is enrolled in.
cs101 = Course.get(Course.name == 'CS 101')
cs151 = Course.get(Course.name == 'CS 151')
huey.courses.remove([cs101, cs151])
clear()

Remove all associated objects.

Example code:

# English 101 is canceled this semester, so remove all
# the enrollments.
english_101 = Course.get(Course.name == 'English 101')
english_101.students.clear()
get_through_model()

Return the Model representing the many-to-many junction table. This can be specified manually when the field is being instantiated using the through_model parameter. If a through_model is not specified, one will automatically be created.

When creating tables for an application that uses ManyToManyField, you must create the through table expicitly.

# Get a reference to the automatically-created through table.
StudentCourseThrough = Course.students.get_through_model()

# Create tables for our two models as well as the through model.
db.create_tables([
    Student,
    Course,
    StudentCourseThrough])
class DeferredThroughModel

In some instances, you may need to obtain a reference to a through model before that model is actually defined. In order to avoid weird circular logic, you can use the DeferredThroughModel as a placeholder, then “fill it in” when you’re ready.

Example:

class User(Model):
    username = CharField()

NoteThroughDeferred = DeferredThroughModel()  # Create placeholder.

class Note(Model):
    text = TextField()
    users = ManyToManyField(User, through_model=NoteThroughDeferred)

class NoteThrough(Model):
    user = ForeignKeyField(User)
    note = ForeignKeyField(Note)
    sort_order = IntegerField(default=0)

# Now that all the models are defined, we can replace the placeholder
# with the actual through model implementation.
NoteThroughDeferred.set_model(NoteThrough)
set_model(model_class)

Initialize the deferred placeholder with the appropriate model class.

class CompressedField([compression_level=6[, algorithm='zlib'[, **kwargs]]])

CompressedField stores compressed data using the specified algorithm. This field extends BlobField, transparently storing a compressed representation of the data in the database.

Parameters:
  • compression_level (int) – A value from 0 to 9.
  • algorithm (str) – Either 'zlib' or 'bz2'.
class PasswordField([iterations=12[, **kwargs]])

PasswordField stores a password hash and lets you verify it. The password is hashed when it is saved to the database and after reading it from the database you can call check_password (password) -> bool on it.

Parameters:iterations (int) – Indicates the work factor, it does 2^n iterations.

Note

This field requires bcrypt, which can be installed by running pip install bcrypt.

class PickledField([**kwargs])

A field capable of storing arbitrary Python objects.

Note

If the cPickle module is available, it will be used.

Generic foreign keys

The gfk module provides a Generic ForeignKey (GFK), similar to Django. A GFK is composed of two columns: an object ID and an object type identifier. The object types are collected in a global registry (all_models).

How a GFKField is resolved:

  1. Look up the object type in the global registry (returns a model class)
  2. Look up the model instance by object ID

Note

In order to use Generic ForeignKeys, your application’s models must subclass playhouse.gfk.Model. This ensures that the model class will be added to the global registry.

Note

GFKs themselves are not actually a field and will not add a column to your table.

Like regular ForeignKeys, GFKs support a “back-reference” via the ReverseGFK descriptor.

How to use GFKs

  1. Be sure your model subclasses playhouse.gfk.Model
  2. Add a CharField to store the object_type
  3. Add a field to store the object_id (usually a IntegerField)
  4. Add a GFKField and instantiate it with the names of the object_type and object_id fields.
  5. (optional) On any other models, add a ReverseGFK descriptor

Example:

from playhouse.gfk import *

class Tag(Model):
    tag = CharField()
    object_type = CharField(null=True)
    object_id = IntegerField(null=True)
    object = GFKField('object_type', 'object_id')

class Blog(Model):
    tags = ReverseGFK(Tag, 'object_type', 'object_id')

class Photo(Model):
    tags = ReverseGFK(Tag, 'object_type', 'object_id')

How you use these is pretty straightforward hopefully:

>>> b = Blog.create(name='awesome post')
>>> Tag.create(tag='awesome', object=b)
>>> b2 = Blog.create(name='whiny post')
>>> Tag.create(tag='whiny', object=b2)

>>> b.tags # <-- a select query
<class '__main__.Tag'> SELECT t1."id", t1."tag", t1."object_type", t1."object_id" FROM "tag" AS t1 WHERE ((t1."object_type" = ?) AND (t1."object_id" = ?)) [u'blog', 1]

>>> [x.tag for x in b.tags]
[u'awesome']

>>> [x.tag for x in b2.tags]
[u'whiny']

>>> p = Photo.create(name='picture of cat')
>>> Tag.create(object=p, tag='kitties')
>>> Tag.create(object=p, tag='cats')

>>> [x.tag for x in p.tags]
[u'kitties', u'cats']

>>> [x.tag for x in Blog.tags]
[u'awesome', u'whiny']

>>> t = Tag.get(Tag.tag == 'awesome')
>>> t.object
<__main__.Blog at 0x268f450>

>>> t.object.name
u'awesome post'

GFK API

class GFKField([model_type_field='object_type'[, model_id_field='object_id']])

Provide a clean API for storing “generic” foreign keys. Generic foreign keys are comprised of an object type, which maps to a model class, and an object id, which maps to the primary key of the related model class.

Setting the GFKField on a model will automatically populate the model_type_field and model_id_field. Similarly, getting the GFKField on a model instance will “resolve” the two fields, first looking up the model class, then looking up the instance by ID.

class ReverseGFK(model[, model_type_field='object_type'[, model_id_field='object_id']])

Back-reference support for GFKField.

Hybrid Attributes

Hybrid attributes encapsulate functionality that operates at both the Python and SQL levels. The idea for hybrid attributes comes from a feature of the same name in SQLAlchemy. Consider the following example:

class Interval(Model):
    start = IntegerField()
    end = IntegerField()

    @hybrid_property
    def length(self):
        return self.end - self.start

    @hybrid_method
    def contains(self, point):
        return (self.start <= point) & (point < self.end)

The hybrid attribute gets its name from the fact that the length attribute will behave differently depending on whether it is accessed via the Interval class or an Interval instance.

If accessed via an instance, then it behaves just as you would expect.

If accessed via the Interval.length class attribute, however, the length calculation will be expressed as a SQL expression. For example:

query = Interval.select().where(Interval.length > 5)

This query will be equivalent to the following SQL:

SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (("t1"."end" - "t1"."start") > 5)

The hybrid module also contains a decorator for implementing hybrid methods which can accept parameters. As with hybrid properties, when accessed via a model instance, then the function executes normally as-written. When the hybrid method is called on the class, however, it will generate a SQL expression.

Example:

query = Interval.select().where(Interval.contains(2))

This query is equivalent to the following SQL:

SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (("t1"."start" <= 2) AND (2 < "t1"."end"))

There is an additional API for situations where the python implementation differs slightly from the SQL implementation. Let’s add a radius method to the Interval model. Because this method calculates an absolute value, we will use the Python abs() function for the instance portion and the fn.ABS() SQL function for the class portion.

class Interval(Model):
    start = IntegerField()
    end = IntegerField()

    @hybrid_property
    def length(self):
        return self.end - self.start

    @hybrid_property
    def radius(self):
        return abs(self.length) / 2

    @radius.expression
    def radius(cls):
        return fn.ABS(cls.length) / 2

What is neat is that both the radius implementations refer to the length hybrid attribute! When accessed via an Interval instance, the radius calculation will be executed in Python. When invoked via an Interval class, we will get the appropriate SQL.

Example:

query = Interval.select().where(Interval.radius < 3)

This query is equivalent to the following SQL:

SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE ((abs("t1"."end" - "t1"."start") / 2) < 3)

Pretty neat, right? Thanks for the cool idea, SQLAlchemy!

Hybrid API

class hybrid_method(func[, expr=None])

Method decorator that allows the definition of a Python object method with both instance-level and class-level behavior.

Example:

class Interval(Model):
    start = IntegerField()
    end = IntegerField()

    @hybrid_method
    def contains(self, point):
        return (self.start <= point) & (point < self.end)

When called with an Interval instance, the contains method will behave as you would expect. When called as a classmethod, though, a SQL expression will be generated:

query = Interval.select().where(Interval.contains(2))

Would generate the following SQL:

SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (("t1"."start" <= 2) AND (2 < "t1"."end"))
expression(expr)

Method decorator for specifying the SQL-expression producing method.

class hybrid_property(fget[, fset=None[, fdel=None[, expr=None]]])

Method decorator that allows the definition of a Python object property with both instance-level and class-level behavior.

Examples:

class Interval(Model):
    start = IntegerField()
    end = IntegerField()

    @hybrid_property
    def length(self):
        return self.end - self.start

    @hybrid_property
    def radius(self):
        return abs(self.length) / 2

    @radius.expression
    def radius(cls):
        return fn.ABS(cls.length) / 2

When accessed on an Interval instance, the length and radius properties will behave as you would expect. When accessed as class attributes, though, a SQL expression will be generated instead:

query = (Interval
         .select()
         .where(
             (Interval.length > 6) &
             (Interval.radius >= 3)))

Would generate the following SQL:

SELECT "t1"."id", "t1"."start", "t1"."end"
FROM "interval" AS t1
WHERE (
    (("t1"."end" - "t1"."start") > 6) AND
    ((abs("t1"."end" - "t1"."start") / 2) >= 3)
)

Key/Value Store

Provides a simple key/value store, using a dictionary API. By default the the KeyStore will use an in-memory sqlite database, but any database will work.

To start using the key-store, create an instance and pass it a field to use for the values.

>>> kv = KeyStore(TextField())
>>> kv['a'] = 'A'
>>> kv['a']
'A'

Note

To store arbitrary python objects, use the PickledKeyStore, which stores values in a pickled BlobField.

If your objects are JSON-serializable, you can also use the JSONKeyStore, which stores the values as JSON-encoded strings.

Using the KeyStore it is possible to use “expressions” to retrieve values from the dictionary. For instance, imagine you want to get all keys which contain a certain substring:

>>> keys_matching_substr = kv[kv.key % '%substr%']
>>> keys_start_with_a = kv[fn.Lower(fn.Substr(kv.key, 1, 1)) == 'a']

KeyStore API

class KeyStore(value_field[, ordered=False[, database=None]])

Lightweight dictionary interface to a model containing a key and value. Implements common dictionary methods, such as __getitem__, __setitem__, get, pop, items, keys, and values.

Parameters:
  • value_field (Field) – Field instance to use as value field, e.g. an instance of TextField.
  • ordered (boolean) – Whether the keys should be returned in sorted order
  • database (Database) – Database class to use for the storage backend. If none is supplied, an in-memory Sqlite DB will be used.

Example:

>>> from playhouse.kv import KeyStore
>>> kv = KeyStore(TextField())
>>> kv['a'] = 'foo'
>>> for k, v in kv:
...     print k, v
a foo

>>> 'a' in kv
True
>>> 'b' in kv
False
class JSONKeyStore([ordered=False[, database=None]])

Identical to the KeyStore except the values are stored as JSON-encoded strings, so you can store complex data-types like dictionaries and lists.

Example:

>>> from playhouse.kv import JSONKeyStore
>>> jkv = JSONKeyStore()
>>> jkv['a'] = 'A'
>>> jkv['b'] = [1, 2, 3]
>>> list(jkv.items())
[(u'a', 'A'), (u'b', [1, 2, 3])]
class PickledKeyStore([ordered=False[, database=None]])

Identical to the KeyStore except anything can be stored as a value in the dictionary. The storage for the value will be a pickled BlobField.

Example:

>>> from playhouse.kv import PickledKeyStore
>>> pkv = PickledKeyStore()
>>> pkv['a'] = 'A'
>>> pkv['b'] = 1.0
>>> list(pkv.items())
[(u'a', 'A'), (u'b', 1.0)]

Shortcuts

This module contains helper functions for expressing things that would otherwise be somewhat verbose or cumbersome using peewee’s APIs. There are also helpers for serializing models to dictionaries and vice-versa.

case(predicate, expression_tuples, default=None)
Parameters:
  • predicate – A SQL expression or can be None.
  • expression_tuples – An iterable containing one or more 2-tuples comprised of an expression and return value.
  • default – default if none of the cases match.

Example SQL case statements:

-- case with predicate --
SELECT "username",
  CASE "user_id"
    WHEN 1 THEN "one"
    WHEN 2 THEN "two"
    ELSE "?"
  END
FROM "users";

-- case with no predicate (inline expressions) --
SELECT "username",
  CASE
    WHEN "user_id" = 1 THEN "one"
    WHEN "user_id" = 2 THEN "two"
    ELSE "?"
  END
FROM "users";

Equivalent function invocations:

User.select(User.username, case(User.user_id, (
  (1, "one"),
  (2, "two")), "?"))

User.select(User.username, case(None, (
  (User.user_id == 1, "one"),  # note the double equals
  (User.user_id == 2, "two")), "?"))

You can specify a value for the CASE expression using the alias() method:

User.select(User.username, case(User.user_id, (
  (1, "one"),
  (2, "two")), "?").alias("id_string"))
cast(node, as_type)
Parameters:
  • node – A peewee Node, for instance a Field or an Expression.
  • as_type (str) – The type name to cast to, e.g. 'int'.
Returns:

a function call to cast the node as the given type.

Example:

# Find all data points whose numbers are palindromes. We do this by
# casting the number to string, reversing it, then casting the reversed
# string back to an integer.
reverse_val = cast(fn.REVERSE(cast(DataPoint.value, 'str')), 'int')

query = (DataPoint
         .select()
         .where(DataPoint.value == reverse_val))
model_to_dict(model[, recurse=True[, backrefs=False[, only=None[, exclude=None[, extra_attrs=None[, fields_from_query=None]]]]]])

Convert a model instance (and optionally any related instances) to a dictionary.

Parameters:
  • recurse (bool) – Whether foreign-keys should be recursed.
  • backrefs (bool) – Whether lists of related objects should be recursed.
  • only – A list (or set) of field instances which should be included in the result dictionary.
  • exclude – A list (or set) of field instances which should be excluded from the result dictionary.
  • extra_attrs – A list of attribute or method names on the instance which should be included in the dictionary.
  • fields_from_query (SelectQuery) – The SelectQuery that created this model instance. Only the fields and values explicitly selected by the query will be serialized.

Examples:

>>> user = User.create(username='charlie')
>>> model_to_dict(user)
{'id': 1, 'username': 'charlie'}

>>> model_to_dict(user, backrefs=True)
{'id': 1, 'tweets': [], 'username': 'charlie'}

>>> t1 = Tweet.create(user=user, message='tweet-1')
>>> t2 = Tweet.create(user=user, message='tweet-2')
>>> model_to_dict(user, backrefs=True)
{
  'id': 1,
  'tweets': [
    {'id': 1, 'message': 'tweet-1'},
    {'id': 2, 'message': 'tweet-2'},
  ],
  'username': 'charlie'
}

>>> model_to_dict(t1)
{
  'id': 1,
  'message': 'tweet-1',
  'user': {
    'id': 1,
    'username': 'charlie'
  }
}

>>> model_to_dict(t2, recurse=False)
{'id': 1, 'message': 'tweet-2', 'user': 1}
dict_to_model(model_class, data[, ignore_unknown=False])

Convert a dictionary of data to a model instance, creating related instances where appropriate.

Parameters:
  • model_class (Model) – The model class to construct.
  • data (dict) – A dictionary of data. Foreign keys can be included as nested dictionaries, and back-references as lists of dictionaries.
  • ignore_unknown (bool) – Whether to allow unrecognized (non-field) attributes.

Examples:

>>> user_data = {'id': 1, 'username': 'charlie'}
>>> user = dict_to_model(User, user_data)
>>> user
<__main__.User at 0x7fea8fa4d490>

>>> user.username
'charlie'

>>> note_data = {'id': 2, 'text': 'note text', 'user': user_data}
>>> note = dict_to_model(Note, note_data)
>>> note.text
'note text'
>>> note.user.username
'charlie'

>>> user_with_notes = {
...     'id': 1,
...     'username': 'charlie',
...     'notes': [{'id': 1, 'text': 'note-1'}, {'id': 2, 'text': 'note-2'}]}
>>> user = dict_to_model(User, user_with_notes)
>>> user.notes[0].text
'note-1'
>>> user.notes[0].user.username
'charlie'
class RetryOperationalError

When mixed-in with a vendor-specific Database subclass, this class overrides the execute_sql() method to automatically reconnect and retry queries that fail due to an OperationalError. The query that failed will be retried only once, and if it fails twice an exception will be raised.

Usage:

from peewee import *
from playhouse.shortcuts import RetryOperationalError


class MyRetryDB(RetryOperationalError, MySQLDatabase):
    pass


db = MyRetryDB('my_app')

Signal support

Models with hooks for signals (a-la django) are provided in playhouse.signals. To use the signals, you will need all of your project’s models to be a subclass of playhouse.signals.Model, which overrides the necessary methods to provide support for the various signals.

from playhouse.signals import Model, post_save


class MyModel(Model):
    data = IntegerField()

@post_save(sender=MyModel)
def on_save_handler(model_class, instance, created):
    put_data_in_cache(instance.data)

Warning

For what I hope are obvious reasons, Peewee signals do not work when you use the Model.insert(), Model.update(), or Model.delete() methods. These methods generate queries that execute beyond the scope of the ORM, and the ORM does not know about which model instances might or might not be affected when the query executes.

Signals work by hooking into the higher-level peewee APIs like Model.save() and Model.delete_instance(), where the affected model instance is known ahead of time.

The following signals are provided:

pre_save
Called immediately before an object is saved to the database. Provides an additional keyword argument created, indicating whether the model is being saved for the first time or updated.
post_save
Called immediately after an object is saved to the database. Provides an additional keyword argument created, indicating whether the model is being saved for the first time or updated.
pre_delete
Called immediately before an object is deleted from the database when Model.delete_instance() is used.
post_delete
Called immediately after an object is deleted from the database when Model.delete_instance() is used.
pre_init
Called when a model class is first instantiated
post_init
Called after a model class has been instantiated and the fields have been populated, for example when being selected as part of a database query.

Connecting handlers

Whenever a signal is dispatched, it will call any handlers that have been registered. This allows totally separate code to respond to events like model save and delete.

The Signal class provides a connect() method, which takes a callback function and two optional parameters for “sender” and “name”. If specified, the “sender” parameter should be a single model class and allows your callback to only receive signals from that one model class. The “name” parameter is used as a convenient alias in the event you wish to unregister your signal handler.

Example usage:

from playhouse.signals import *

def post_save_handler(sender, instance, created):
    print '%s was just saved' % instance

# our handler will only be called when we save instances of SomeModel
post_save.connect(post_save_handler, sender=SomeModel)

All signal handlers accept as their first two arguments sender and instance, where sender is the model class and instance is the actual model being acted upon.

If you’d like, you can also use a decorator to connect signal handlers. This is functionally equivalent to the above example:

@post_save(sender=SomeModel)
def post_save_handler(sender, instance, created):
    print '%s was just saved' % instance

Signal API

class Signal

Stores a list of receivers (callbacks) and calls them when the “send” method is invoked.

connect(receiver[, sender=None[, name=None]])

Add the receiver to the internal list of receivers, which will be called whenever the signal is sent.

Parameters:
  • receiver (callable) – a callable that takes at least two parameters, a “sender”, which is the Model subclass that triggered the signal, and an “instance”, which is the actual model instance.
  • sender (Model) – if specified, only instances of this model class will trigger the receiver callback.
  • name (string) – a short alias
from playhouse.signals import post_save
from project.handlers import cache_buster

post_save.connect(cache_buster, name='project.cache_buster')
disconnect([receiver=None[, name=None]])

Disconnect the given receiver (or the receiver with the given name alias) so that it no longer is called. Either the receiver or the name must be provided.

Parameters:
  • receiver (callable) – the callback to disconnect
  • name (string) – a short alias
post_save.disconnect(name='project.cache_buster')
send(instance, *args, **kwargs)

Iterates over the receivers and will call them in the order in which they were connected. If the receiver specified a sender, it will only be called if the instance is an instance of the sender.

Parameters:instance – a model instance

pwiz, a model generator

pwiz is a little script that ships with peewee and is capable of introspecting an existing database and generating model code suitable for interacting with the underlying data. If you have a database already, pwiz can give you a nice boost by generating skeleton code with correct column affinities and foreign keys.

If you install peewee using setup.py install, pwiz will be installed as a “script” and you can just run:

python -m pwiz -e postgresql -u postgres my_postgres_db

This will print a bunch of models to standard output. So you can do this:

python -m pwiz -e postgresql my_postgres_db > mymodels.py
python # <-- fire up an interactive shell
>>> from mymodels import Blog, Entry, Tag, Whatever
>>> print [blog.name for blog in Blog.select()]
Option Meaning Example
-h show help  
-e database backend -e mysql
-H host to connect to -H remote.db.server
-p port to connect on -p 9001
-u database user -u postgres
-P database password -P secret
-s postgres schema -s public

The following are valid parameters for the engine:

  • sqlite
  • mysql
  • postgresql

Schema Migrations

Peewee now supports schema migrations, with well-tested support for Postgresql, SQLite and MySQL. Unlike other schema migration tools, peewee’s migrations do not handle introspection and database “versioning”. Rather, peewee provides a number of helper functions for generating and running schema-altering statements. This engine provides the basis on which a more sophisticated tool could some day be built.

Migrations can be written as simple python scripts and executed from the command-line. Since the migrations only depend on your applications Database object, it should be easy to manage changing your model definitions and maintaining a set of migration scripts without introducing dependencies.

Example usage

Begin by importing the helpers from the migrate module:

from playhouse.migrate import *

Instantiate a migrator. The SchemaMigrator class is responsible for generating schema altering operations, which can then be run sequentially by the migrate() helper.

# Postgres example:
my_db = PostgresqlDatabase(...)
migrator = PostgresqlMigrator(my_db)

# SQLite example:
my_db = SqliteDatabase('my_database.db')
migrator = SqliteMigrator(my_db)

Use migrate() to execute one or more operations:

title_field = CharField(default='')
status_field = IntegerField(null=True)

migrate(
    migrator.add_column('some_table', 'title', title_field),
    migrator.add_column('some_table', 'status', status_field),
    migrator.drop_column('some_table', 'old_column'),
)

Warning

Migrations are not run inside a transaction. If you wish the migration to run in a transaction you will need to wrap the call to migrate in a transaction block, e.g.

with my_db.transaction():
    migrate(...)

Supported Operations

Add new field(s) to an existing model:

# Create your field instances. For non-null fields you must specify a
# default value.
pubdate_field = DateTimeField(null=True)
comment_field = TextField(default='')

# Run the migration, specifying the database table, field name and field.
migrate(
    migrator.add_column('comment_tbl', 'pub_date', pubdate_field),
    migrator.add_column('comment_tbl', 'comment', comment_field),
)

Renaming a field:

# Specify the table, original name of the column, and its new name.
migrate(
    migrator.rename_column('story', 'pub_date', 'publish_date'),
    migrator.rename_column('story', 'mod_date', 'modified_date'),
)

Dropping a field:

migrate(
    migrator.drop_column('story', 'some_old_field'),
)

Making a field nullable or not nullable:

# Note that when making a field not null that field must not have any
# NULL values present.
migrate(
    # Make `pub_date` allow NULL values.
    migrator.drop_not_null('story', 'pub_date'),

    # Prevent `modified_date` from containing NULL values.
    migrator.add_not_null('story', 'modified_date'),
)

Renaming a table:

migrate(
    migrator.rename_table('story', 'stories_tbl'),
)

Adding an index:

# Specify the table, column names, and whether the index should be
# UNIQUE or not.
migrate(
    # Create an index on the `pub_date` column.
    migrator.add_index('story', ('pub_date',), False),

    # Create a multi-column index on the `pub_date` and `status` fields.
    migrator.add_index('story', ('pub_date', 'status'), False),

    # Create a unique index on the category and title fields.
    migrator.add_index('story', ('category_id', 'title'), True),
)

Dropping an index:

# Specify the index name.
migrate(migrator.drop_index('story', 'story_pub_date_status'))

Migrations API

migrate(*operations)

Execute one or more schema altering operations.

Usage:

migrate(
    migrator.add_column('some_table', 'new_column', CharField(default='')),
    migrator.create_index('some_table', ('new_column',)),
)
class SchemaMigrator(database)
Parameters:database – a Database instance.

The SchemaMigrator is responsible for generating schema-altering statements.

add_column(table, column_name, field)
Parameters:
  • table (str) – Name of the table to add column to.
  • column_name (str) – Name of the new column.
  • field (Field) – A Field instance.

Add a new column to the provided table. The field provided will be used to generate the appropriate column definition.

Note

If the field is not nullable it must specify a default value.

Note

For non-null fields, the field will initially be added as a null field, then an UPDATE statement will be executed to populate the column with the default value. Finally, the column will be marked as not null.

drop_column(table, column_name[, cascade=True])
Parameters:
  • table (str) – Name of the table to drop column from.
  • column_name (str) – Name of the column to drop.
  • cascade (bool) – Whether the column should be dropped with CASCADE.
rename_column(table, old_name, new_name)
Parameters:
  • table (str) – Name of the table containing column to rename.
  • old_name (str) – Current name of the column.
  • new_name (str) – New name for the column.
add_not_null(table, column)
Parameters:
  • table (str) – Name of table containing column.
  • column (str) – Name of the column to make not nullable.
drop_not_null(table, column)
Parameters:
  • table (str) – Name of table containing column.
  • column (str) – Name of the column to make nullable.
rename_table(old_name, new_name)
Parameters:
  • old_name (str) – Current name of the table.
  • new_name (str) – New name for the table.
add_index(table, columns[, unique=False])
Parameters:
  • table (str) – Name of table on which to create the index.
  • columns (list) – List of columns which should be indexed.
  • unique (bool) – Whether the new index should specify a unique constraint.
drop_index(table, index_name)

:param str table Name of the table containing the index to be dropped. :param str index_name: Name of the index to be dropped.

class PostgresqlMigrator(database)

Generate migrations for Postgresql databases.

class SqliteMigrator(database)

Generate migrations for SQLite databases.

class MySQLMigrator(database)

Generate migrations for MySQL databases.

Reflection

The reflection module contains helpers for introspecting existing databases. This module is used internally by several other modules in the playhouse, including DataSet and pwiz, a model generator.

class Introspector(metadata[, schema=None])

Metadata can be extracted from a database by instantiating an Introspector. Rather than instantiating this class directly, it is recommended to use the factory method from_database().

classmethod from_database(database[, schema=None])

Creates an Introspector instance suitable for use with the given database.

Parameters:
  • database – a Database instance.
  • schema (str) – an optional schema (supported by some databases).

Usage:

db = SqliteDatabase('my_app.db')
introspector = Introspector.from_database(db)
models = introspector.generate_models()

# User and Tweet (assumed to exist in the database) are
# peewee Model classes generated from the database schema.
User = models['user']
Tweet = models['tweet']
generate_models()

Introspect the database, reading in the tables, columns, and foreign key constraints, then generate a dictionary mapping each database table to a dynamically-generated Model class.

Returns:A dictionary mapping table-names to model classes.

Database URL

This module contains a helper function to generate a database connection from a URL connection string.

connect(url, **connect_params)

Create a Database instance from the given connection URL.

Examples:

  • sqlite:///my_database.db will create a SqliteDatabase instance for the file my_database.db in the current directory.
  • sqlite:///:memory: will create an in-memory SqliteDatabase instance.
  • postgresql://postgres:my_password@localhost:5432/my_database will create a PostgresqlDatabase instance. A username and password are provided, as well as the host and port to connect to.
  • mysql://user:passwd@ip:port/my_db will create a MySQLDatabase instance for the local MySQL database my_db.
  • mysql+pool://user:passwd@ip:port/my_db?max_connections=20&stale_timeout=300 will create a PooledMySQLDatabase instance for the local MySQL database my_db with max_connections set to 20 and a stale_timeout setting of 300 seconds.

Supported schemes:

Usage:

import os
from playhouse.db_url import connect

# Connect to the database URL defined in the environment, falling
# back to a local Sqlite database if no database URL is specified.
db = connect(os.environ.get('DATABASE') or 'sqlite:///default.db')
parse(url)

Parse the information in the given URL into a dictionary containing database, host, port, user and/or password. Additional connection arguments can be passed in the URL query string.

If you are using a custom database class, you can use the parse() function to extract information from a URL which can then be passed in to your database object.

register_database(db_class, *names)
Parameters:
  • db_class – A subclass of Database.
  • names – A list of names to use as the scheme in the URL, e.g. ‘sqlite’ or ‘firebird’

Register additional database class under the specified names. This function can be used to extend the connect() function to support additional schemes. Suppose you have a custom database class for Firebird named FirebirdDatabase.

from playhouse.db_url import connect, register_database

register_database(FirebirdDatabase, 'firebird')
db = connect('firebird://my-firebird-db')

CSV Utils

This module contains helpers for dumping queries into CSV, and for loading CSV data into a database. CSV files can be introspected to generate an appropriate model class for working with the data. This makes it really easy to explore the data in a CSV file using Peewee and SQL.

Here is how you would load a CSV file into an in-memory SQLite database. The call to load_csv() returns a Model instance suitable for working with the CSV data:

from peewee import *
from playhouse.csv_loader import load_csv
db = SqliteDatabase(':memory:')
ZipToTZ = load_csv(db, 'zip_to_tz.csv')

Now we can run queries using the new model.

# Get the timezone for a zipcode.
>>> ZipToTZ.get(ZipToTZ.zip == 66047).timezone
'US/Central'

# Get all the zipcodes for my town.
>>> [row.zip for row in ZipToTZ.select().where(
...     (ZipToTZ.city == 'Lawrence') && (ZipToTZ.state == 'KS'))]
[66044, 66045, 66046, 66047, 66049]

For more information and examples check out this blog post.

CSV Loader API

load_csv(db_or_model, filename[, fields=None[, field_names=None[, has_header=True[, sample_size=10[, converter=None[, db_table=None[, **reader_kwargs]]]]]]])

Load a CSV file into the provided database or model class, returning a Model suitable for working with the CSV data.

Parameters:
  • db_or_model – Either a Database instance or a Model class. If a model is not provided, one will be automatically generated for you.
  • filename (str) – Path of CSV file to load.
  • fields (list) – A list of Field instances mapping to each column in the CSV. This allows you to manually specify the column types. If not provided, and a model is not provided, the field types will be determined automatically.
  • field_names (list) – A list of strings to use as field names for each column in the CSV. If not provided, and a model is not provided, the field names will be determined by looking at the header row of the file. If no header exists, then the fields will be given generic names.
  • has_header (bool) – Whether the first row is a header.
  • sample_size (int) – Number of rows to look at when introspecting data types. If set to 0, then a generic field type will be used for all fields.
  • converter (RowConverter) – a RowConverter instance to use for introspecting the CSV. If not provided, one will be created.
  • db_table (str) – The name of the database table to load data into. If this value is not provided, it will be determined using the filename of the CSV file. If a model is provided, this value is ignored.
  • reader_kwargs – Arbitrary keyword arguments to pass to the csv.reader object, such as the dialect, separator, etc.
Return type:

A Model suitable for querying the CSV data.

Basic example – field names and types will be introspected:

from peewee import *
from playhouse.csv_loader import *
db = SqliteDatabase(':memory:')
User = load_csv(db, 'users.csv')

Using a pre-defined model:

class ZipToTZ(Model):
    zip = IntegerField()
    timezone = CharField()

load_csv(ZipToTZ, 'zip_to_tz.csv')

Specifying fields:

fields = [DecimalField(), IntegerField(), IntegerField(), DateField()]
field_names = ['amount', 'from_acct', 'to_acct', 'timestamp']
Payments = load_csv(db, 'payments.csv', fields=fields, field_names=field_names, has_header=False)

Dumping CSV

dump_csv(query, file_or_name[, include_header=True[, close_file=True[, append=True[, csv_writer=None]]]])
Parameters:
  • query – A peewee SelectQuery to dump as CSV.
  • file_or_name – Either a filename or a file-like object.
  • include_header – Whether to generate a CSV header row consisting of the names of the selected columns.
  • close_file – Whether the file should be closed after writing the query data.
  • append – Whether new data should be appended to the end of the file.
  • csv_writer – A python csv.writer instance to use.

Example usage:

with open('account-export.csv', 'w') as fh:
    query = Account.select().order_by(Account.id)
    dump_csv(query, fh)

Connection pool

The pool module contains a number of Database classes that provide connection pooling for PostgreSQL and MySQL databases. The pool works by overriding the methods on the Database class that open and close connections to the backend. The pool can specify a timeout after which connections are recycled, as well as an upper bound on the number of open connections.

In a multi-threaded application, up to max_connections will be opened. Each thread (or, if using gevent, greenlet) will have it’s own connection.

In a single-threaded application, only one connection will be created. It will be continually recycled until either it exceeds the stale timeout or is closed explicitly (using .manual_close()).

By default, all your application needs to do is ensure that connections are closed when you are finished with them, and they will be returned to the pool. For web applications, this typically means that at the beginning of a request, you will open a connection, and when you return a response, you will close the connection.

Simple Postgres pool example code:

# Use the special postgresql extensions.
from playhouse.pool import PooledPostgresqlExtDatabase

db = PooledPostgresqlExtDatabase(
    'my_app',
    max_connections=32,
    stale_timeout=300,  # 5 minutes.
    user='postgres')

class BaseModel(Model):
    class Meta:
        database = db

That’s it! If you would like finer-grained control over the pool of connections, check out the Advanced Connection Management section.

Pool APIs

class PooledDatabase(database[, max_connections=20[, stale_timeout=None[, timeout=None[, **kwargs]]]])

Mixin class intended to be used with a subclass of Database.

Parameters:
  • database (str) – The name of the database or database file.
  • max_connections (int) – Maximum number of connections. Provide None for unlimited.
  • stale_timeout (int) – Number of seconds to allow connections to be used.
  • timeout (int) – Number of seconds block when pool is full. By default peewee does not block when the pool is full but simply throws an exception. To block indefinitely set this value to 0.
  • kwargs – Arbitrary keyword arguments passed to database class.

Note

Connections will not be closed exactly when they exceed their stale_timeout. Instead, stale connections are only closed when a new connection is requested.

Note

If the number of open connections exceeds max_connections, a ValueError will be raised.

_connect(*args, **kwargs)

Request a connection from the pool. If there are no available connections a new one will be opened.

_close(conn[, close_conn=False])

By default conn will not be closed and instead will be returned to the pool of available connections. If close_conn=True, then conn will be closed and not be returned to the pool.

manual_close()

Close the currently-open connection without returning it to the pool.

class PooledPostgresqlDatabase

Subclass of PostgresqlDatabase that mixes in the PooledDatabase helper.

class PooledPostgresqlExtDatabase

Subclass of PostgresqlExtDatabase that mixes in the PooledDatabase helper. The PostgresqlExtDatabase is a part of the Postgresql Extensions module and provides support for many Postgres-specific features.

class PooledMySQLDatabase

Subclass of MySQLDatabase that mixes in the PooledDatabase helper.

class PooledSqliteDatabase

Persistent connections for SQLite apps.

class PooledSqliteExtDatabase

Persistent connections for SQLite apps, using the Sqlite Extensions advanced database driver SqliteExtDatabase.

Read Slaves

The read_slave module contains a Model subclass that can be used to automatically execute SELECT queries against different database(s). This might be useful if you have your databases in a master / slave configuration.

class ReadSlaveModel

Model subclass that will route SELECT queries to a different database.

Master and read-slaves are specified using Model.Meta:

# Declare a master and two read-replicas.
master = PostgresqlDatabase('master')
replica_1 = PostgresqlDatabase('replica_1')
replica_2 = PostgresqlDatabase('replica_2')

# Declare a BaseModel, the normal best-practice.
class BaseModel(ReadSlaveModel):
    class Meta:
        database = master
        read_slaves = (replica_1, replica_2)

# Declare your models.
class User(BaseModel):
    username = CharField()

When you execute writes (or deletes), they will be executed against the master database:

User.create(username='Peewee')  # Executed against master.

When you execute a read query, it will run against one of the replicas:

users = User.select().where(User.username == 'Peewee')

Note

To force a SELECT query against the master database, manually create the SelectQuery.

SelectQuery(User)  # master database.

Note

Queries will be dispatched among the read_slaves in round-robin fashion.

Test Utils

Contains utilities helpful when testing peewee projects.

class test_database(db, models[, create_tables=True[, fail_silently=False]])

Context manager that lets you use a different database with a set of models. Models can also be automatically created and dropped.

This context manager helps make it possible to test your peewee models using a “test-only” database.

Parameters:
  • db (Database) – Database to use with the given models
  • models – a list or tuple of Model classes to use with the db
  • create_tables (boolean) – Whether tables should be automatically created and dropped.
  • fail_silently (boolean) – Whether the table create / drop should fail silently.

Example:

from unittest import TestCase
from playhouse.test_utils import test_database
from peewee import *

from my_app.models import User, Tweet

test_db = SqliteDatabase(':memory:')

class TestUsersTweets(TestCase):
    def create_test_data(self):
        # ... create a bunch of users and tweets
        for i in range(10):
            User.create(username='user-%d' % i)

    def test_timeline(self):
        with test_database(test_db, (User, Tweet)):
            # This data will be created in `test_db`
            self.create_test_data()

            # Perform assertions on test data inside ctx manager.
            self.assertEqual(Tweet.timeline('user-0') [...])

        with test_database(test_db, (User,)):
            # Test something that just affects user.
            self.test_some_user_thing()

        # once we exit the context manager, we're back to using the normal database
class count_queries([only_select=False])

Context manager that will count the number of queries executed within the context.

Parameters:only_select (bool) – Only count SELECT queries.
with count_queries() as counter:
    huey = User.get(User.username == 'huey')
    huey_tweets = [tweet.message for tweet in huey.tweets]

assert counter.count == 2
count

The number of queries executed.

get_queries()

Return a list of 2-tuples consisting of the SQL query and a list of parameters.

assert_query_count(expected[, only_select=False])

Function or method decorator that will raise an AssertionError if the number of queries executed in the decorated function does not equal the expected number.

class TestMyApp(unittest.TestCase):
    @assert_query_count(1)
    def test_get_popular_blogs(self):
        popular_blogs = Blog.get_popular()
        self.assertEqual(
            [blog.title for blog in popular_blogs],
            ["Peewee's Playhouse!", "All About Huey", "Mickey's Adventures"])

This function can also be used as a context manager:

class TestMyApp(unittest.TestCase):
    def test_expensive_operation(self):
        with assert_query_count(1):
            perform_expensive_operation()

pskel

I often find myself writing very small scripts with peewee. pskel will generate the boilerplate code for a basic peewee script.

Usage:

pskel [options] model1 model2 ...

pskel accepts the following options:

Option Default Description
-l,--logging False Log all queries to stdout.
-e,--engine sqlite Database driver to use.
-d,--database :memory: Database to connect to.

Example:

$ pskel -e postgres -d my_database User Tweet

This will print the following code to stdout (which you can redirect into a file using >):

#!/usr/bin/env python

import logging

from peewee import *
from peewee import create_model_tables

db = PostgresqlDatabase('my_database')

class BaseModel(Model):
    class Meta:
        database = db

class User(BaseModel):
    pass

class Tweet(BaseModel):
    pass

def main():
    create_model_tables([User, Tweet], fail_silently=True)

if __name__ == '__main__':
    main()

Flask Utils

The playhouse.flask_utils module contains several helpers for integrating peewee with the Flask web framework.

Database Wrapper

The FlaskDB class is a wrapper for configuring and referencing a Peewee database from within a Flask application. Don’t let it’s name fool you: it is not the same thing as a peewee database. FlaskDB is designed to remove the following boilerplate from your flask app:

  • Dynamically create a Peewee database instance based on app config data.
  • Create a base class from which all your application’s models will descend.
  • Register hooks at the start and end of a request to handle opening and closing a database connection.

Basic usage:

import datetime
from flask import Flask
from peewee import *
from playhouse.flask_utils import FlaskDB

DATABASE = 'postgresql://postgres:password@localhost:5432/my_database'

app = Flask(__name__)
app.config.from_object(__name__)

db_wrapper = FlaskDB(app)

class User(db_wrapper.Model):
    username = CharField(unique=True)

class Tweet(db_wrapper.Model):
    user = ForeignKeyField(User, related_name='tweets')
    content = TextField()
    timestamp = DateTimeField(default=datetime.datetime.now)

The above code example will create and instantiate a peewee PostgresqlDatabase specified by the given database URL. Request hooks will be configured to establish a connection when a request is received, and automatically close the connection when the response is sent. Lastly, the FlaskDB class exposes a FlaskDB.Model property which can be used as a base for your application’s models.

Here is how you can access the wrapped Peewee database instance that is configured for you by the FlaskDB wrapper:

# Obtain a reference to the Peewee database instance.
peewee_db = db_wrapper.database

@app.route('/transfer-funds/', methods=['POST'])
def transfer_funds():
    with peewee_db.atomic():
        # ...

    return jsonify({'transfer-id': xid})

Note

The actual peewee database can be accessed using the FlaskDB.database attribute.

Here is another way to configure a Peewee database using FlaskDB:

app = Flask(__name__)
db_wrapper = FlaskDB(app, 'sqlite:///my_app.db')

While the above examples show using a database URL, for more advanced usages you can specify a dictionary of configuration options, or simply pass in a peewee Database instance:

DATABASE = {
    'name': 'my_app_db',
    'engine': 'playhouse.pool.PooledPostgresqlDatabase',
    'user': 'postgres',
    'max_connections': 32,
    'stale_timeout': 600,
}

app = Flask(__name__)
app.config.from_object(__name__)

wrapper = FlaskDB(app)
pooled_postgres_db = wrapper.database

Using a peewee Database object:

peewee_db = PostgresqlExtDatabase('my_app')
app = Flask(__name__)
db_wrapper = FlaskDB(app, peewee_db)

Database with Application Factory

If you prefer to use the application factory pattern, the FlaskDB class implements an init_app() method.

Using as a factory:

db_wrapper = FlaskDB()

# Even though the database is not yet initialized, you can still use the
# `Model` property to create model classes.
class User(db_wrapper.Model):
    username = CharField(unique=True)


def create_app():
    app = Flask(__name__)
    app.config['DATABASE'] = 'sqlite:////home/code/apps/my-database.db'
    db_wrapper.init_app(app)
    return app

Query utilities

The flask_utils module provides several helpers for managing queries in your web app. Some common patterns include:

get_object_or_404(query_or_model, *query)

Retrieve the object matching the given query, or return a 404 not found response. A common use-case might be a detail page for a weblog. You want to either retrieve the post matching the given URL, or return a 404.

Parameters:
  • query_or_model – Either a Model class or a pre-filtered SelectQuery.
  • query – An arbitrarily complex peewee expression.

Example:

@app.route('/blog/<slug>/')
def post_detail(slug):
    public_posts = Post.select().where(Post.published == True)
    post = get_object_or_404(public_posts, (Post.slug == slug))
    return render_template('post_detail.html', post=post)
object_list(template_name, query[, context_variable='object_list'[, paginate_by=20[, page_var='page'[, check_bounds=True[, **kwargs]]]]])

Retrieve a paginated list of objects specified by the given query. The paginated object list will be dropped into the context using the given context_variable, as well as metadata about the current page and total number of pages, and finally any arbitrary context data passed as keyword-arguments.

The page is specified using the page GET argument, e.g. /my-object-list/?page=3 would return the third page of objects.

Parameters:
  • template_name – The name of the template to render.
  • query – A SelectQuery instance to paginate.
  • context_variable – The context variable name to use for the paginated object list.
  • paginate_by – Number of objects per-page.
  • page_var – The name of the GET argument which contains the page.
  • check_bounds – Whether to check that the given page is a valid page. If check_bounds is True and an invalid page is specified, then a 404 will be returned.
  • kwargs – Arbitrary key/value pairs to pass into the template context.

Example:

@app.route('/blog/')
def post_index():
    public_posts = (Post
                    .select()
                    .where(Post.published == True)
                    .order_by(Post.timestamp.desc()))

    return object_list(
        'post_index.html',
        query=public_posts,
        context_variable='post_list',
        paginate_by=10)

The template will have the following context:

  • post_list, which contains a list of up to 10 posts.
  • page, which contains the current page based on the value of the page GET parameter.
  • pagination, a PaginatedQuery instance.
class PaginatedQuery(query_or_model, paginate_by[, page_var='page'[, check_bounds=False]])

Helper class to perform pagination based on GET arguments.

Parameters:
  • query_or_model – Either a Model or a SelectQuery instance containing the collection of records you wish to paginate.
  • paginate_by – Number of objects per-page.
  • page_var – The name of the GET argument which contains the page.
  • check_bounds – Whether to check that the given page is a valid page. If check_bounds is True and an invalid page is specified, then a 404 will be returned.
get_page()

Return the currently selected page, as indicated by the value of the page_var GET parameter. If no page is explicitly selected, then this method will return 1, indicating the first page.

get_page_count()

Return the total number of possible pages.

get_object_list()

Using the value of get_page(), return the page of objects requested by the user. The return value is a SelectQuery with the appropriate LIMIT and OFFSET clauses.

If check_bounds was set to True and the requested page contains no objects, then a 404 will be raised.