Skip to content

Commit

Permalink
add TransactionType to SQLiteEngine (piccolo-orm#688)
Browse files Browse the repository at this point in the history
* added ``TransactionType``

* make `deferred` the default, to match SQLite's default behaviour

* add docs for SQLite transaction types

* added `TestTransactionType`

Making sure that SQLite transaction types work correctly.

* transaction and atomic improvements

* add test for running `add_m2m` inside a transaction
  • Loading branch information
dantownsend committed Dec 2, 2022
1 parent 6a5ef4e commit 13c9d69
Show file tree
Hide file tree
Showing 11 changed files with 483 additions and 240 deletions.
8 changes: 8 additions & 0 deletions docs/src/piccolo/engines/sqlite_engine.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,11 @@ Source
.. currentmodule:: piccolo.engine.sqlite

.. autoclass:: SQLiteEngine

-------------------------------------------------------------------------------

Production tips
---------------

If you're planning on using SQLite in production with Piccolo, with lots of
concurrent queries, then here are some :ref:`useful tips <UsingSQLitAndAsyncioEffectively>`.
22 changes: 22 additions & 0 deletions docs/src/piccolo/query_types/transactions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,25 @@ async.
If an exception is raised within the body of the context manager, then the
transaction is automatically rolled back. The exception is still propagated
though.

``transaction_exists``
~~~~~~~~~~~~~~~~~~~~~~

You can check whether your code is currently inside a transaction using the
following:

.. code-block:: python
>>> Band._meta.db.transaction_exists()
True
-------------------------------------------------------------------------------

Transaction types
-----------------

SQLite
~~~~~~

For SQLite you may want to specify the :ref:`transaction type <SQLiteTransactionTypes>`,
as it can have an effect on how well the database handles concurrent requests.
1 change: 1 addition & 0 deletions docs/src/piccolo/tutorials/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ help you solve common problems:
:maxdepth: 1

./migrate_existing_project
./using_sqlite_and_asyncio_effectively
103 changes: 103 additions & 0 deletions docs/src/piccolo/tutorials/using_sqlite_and_asyncio_effectively.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
.. _UsingSQLitAndAsyncioEffectively:

Using SQLite and asyncio effectively
====================================

When using Piccolo with SQLite, there are some best practices to follow.

asyncio => lots of connections
------------------------------

With asyncio, we can potentially open lots of database connections, and attempt
to perform concurrent database writes.

SQLite doesn't support such concurrent behavior as effectively as Postgres, so
we need to be careful.

One write at a time
~~~~~~~~~~~~~~~~~~~

SQLite can easily support lots of transactions concurrently if they are reading,
but only one write can be performed at a time.

-------------------------------------------------------------------------------

.. _SQLiteTransactionTypes:

Transactions
------------

SQLite has several transaction types, as specified by Piccolo's
``TransactionType`` enum:

.. currentmodule:: piccolo.engine.sqlite

.. autoclass:: TransactionType
:members:
:undoc-members:

Which to use?
~~~~~~~~~~~~~

When creating a transaction, Piccolo uses ``DEFERRED`` by default (to be
consistent with SQLite).

This means that the first SQL query executed within the transaction determines
whether it's a **READ** or **WRITE**:

* **READ** - if the first query is a ``SELECT``
* **WRITE** - if the first query is something like an ``INSERT`` / ``UPDATE`` / ``DELETE``

If a transaction starts off with a ``SELECT``, but then tries to perform an ``INSERT`` / ``UPDATE`` / ``DELETE``,
SQLite tries to 'promote' the transaction so it can write.

The problem is, if multiple concurrent connections try doing this at the same time,
SQLite will return a database locked error.

So if you're creating a transaction which you know will perform writes, then
create an ``IMMEDIATE`` transaction:

.. code-block:: python
from piccolo.engine.sqlite import TransactionType
async with Band._meta.db.transaction(
transaction_type=TransactionType.immediate
):
# We perform a SELECT first, but as it's an IMMEDIATE transaction,
# we can later perform writes without getting a database locked
# error.
if not await Band.exists().where(Band.name == 'Pythonistas'):
await Band.objects().create(name="Pythonistas")
Multiple ``IMMEDIATE`` transactions can exist concurrently - SQLite uses a lock
to make sure only one of them writes at a time.

If your transaction will just be performing ``SELECT`` queries, then just use
the default ``DEFERRED`` transactions - you will get improved performance, as
no locking is involved:

.. code-block:: python
async with Band._meta.db.transaction():
bands = await Band.select()
managers = await Manager.select()
-------------------------------------------------------------------------------

timeout
-------

It's recommended to specify the ``timeout`` argument in :class:`SQLiteEngine <piccolo.engine.sqlite.SQLiteEngine>`.

.. code-block:: python
DB = SQLiteEngine(timeout=60)
Imagine you have a web app, and each endpoint creates a transaction which runs
multiple queries. With SQLite, only a single write operation can happen at a
time, so if several connections are open, they may be queued for a while.

By increasing ``timeout`` it means that queries are less likely to timeout.

To find out more about ``timeout`` see the Python :func:`sqlite3 docs <sqlite3.connect>`.
63 changes: 37 additions & 26 deletions piccolo/columns/m2m.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,39 +254,50 @@ def __post_init__(self):
for i, j in self.extra_column_values.items()
}

async def run(self):
async def _run(self):
rows = self.rows
unsaved = [i for i in rows if not i._exists_in_db]

async with rows[0]._meta.db.transaction():
if unsaved:
await rows[0].__class__.insert(*unsaved).run()
if unsaved:
await rows[0].__class__.insert(*unsaved).run()

joining_table = self.m2m._meta.resolved_joining_table
joining_table = self.m2m._meta.resolved_joining_table

joining_table_rows = []
joining_table_rows = []

for row in rows:
joining_table_row = joining_table(**self.extra_column_values)
setattr(
joining_table_row,
self.m2m._meta.primary_foreign_key._meta.name,
getattr(
self.target_row,
self.target_row._meta.primary_key._meta.name,
),
)
setattr(
joining_table_row,
self.m2m._meta.secondary_foreign_key._meta.name,
getattr(
row,
row._meta.primary_key._meta.name,
),
)
joining_table_rows.append(joining_table_row)
for row in rows:
joining_table_row = joining_table(**self.extra_column_values)
setattr(
joining_table_row,
self.m2m._meta.primary_foreign_key._meta.name,
getattr(
self.target_row,
self.target_row._meta.primary_key._meta.name,
),
)
setattr(
joining_table_row,
self.m2m._meta.secondary_foreign_key._meta.name,
getattr(
row,
row._meta.primary_key._meta.name,
),
)
joining_table_rows.append(joining_table_row)

return await joining_table.insert(*joining_table_rows).run()

return await joining_table.insert(*joining_table_rows).run()
async def run(self):
"""
Run the queries, making sure they are either within an existing
transaction, or wrapped in a new transaction.
"""
engine = self.rows[0]._meta.db
if engine.transaction_exists():
await self._run()
else:
async with engine.transaction():
await self._run()

def run_sync(self):
return run_sync(self.run())
Expand Down
23 changes: 22 additions & 1 deletion piccolo/engine/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import contextvars
import logging
import typing as t
from abc import ABCMeta, abstractmethod
Expand All @@ -19,7 +20,10 @@ class Batch:
pass


class Engine(metaclass=ABCMeta):
TransactionClass = t.TypeVar("TransactionClass")


class Engine(t.Generic[TransactionClass], metaclass=ABCMeta):

__slots__ = ()

Expand Down Expand Up @@ -116,3 +120,20 @@ async def close_connection_pool(self):
The database driver doesn't implement connection pooling.
"""
self._connection_pool_warning()

###########################################################################

current_transaction: contextvars.ContextVar[t.Optional[TransactionClass]]

def transaction_exists(self) -> bool:
"""
Find out if a transaction is currently active.
:returns:
``True`` if a transaction is already active for the current
asyncio task. This is useful to know, because nested transactions
aren't currently supported, so you can check if an existing
transaction is already active, before creating a new one.
"""
return self.current_transaction.get() is not None
86 changes: 8 additions & 78 deletions piccolo/engine/cockroach.py
Original file line number Diff line number Diff line change
@@ -1,80 +1,19 @@
from __future__ import annotations

import contextvars
import typing as t

from piccolo.engine.exceptions import TransactionError
from piccolo.query.base import Query
from piccolo.utils.lazy_loader import LazyLoader
from piccolo.utils.warnings import Level, colored_warning

from .postgres import Atomic as PostgresAtomic
from .postgres import PostgresEngine
from .postgres import Transaction as PostgresTransaction

asyncpg = LazyLoader("asyncpg", globals(), "asyncpg")

if t.TYPE_CHECKING: # pragma: no cover
from asyncpg.pool import Pool


###############################################################################


class Atomic(PostgresAtomic):
"""
This is useful if you want to build up a transaction programatically, by
adding queries to it.
Usage::
transaction = engine.atomic()
transaction.add(Foo.create_table())
# Either:
transaction.run_sync()
await transaction.run()
"""

def __init__(self, engine: CockroachEngine):
self.engine = engine
self.queries: t.List[Query] = []
super(Atomic, self).__init__(engine)


###############################################################################


class Transaction(PostgresTransaction):
"""
Used for wrapping queries in a transaction, using a context manager.
Currently it's async only.
Usage::
async with engine.transaction():
# Run some queries:
await Band.select().run()
"""

def __init__(self, engine: CockroachEngine):
self.engine = engine
if self.engine.transaction_connection.get():
raise TransactionError(
"A transaction is already active - nested transactions aren't "
"currently supported."
)
super(Transaction, self).__init__(engine)


###############################################################################


class CockroachEngine(PostgresEngine):
"""
An extension of the cockroach backend.
An extension of
:class:`PostgresEngine <piccolo.engine.postgres.PostgresEngine>`.
"""

engine_type = "cockroach"
Expand All @@ -85,23 +24,14 @@ def __init__(
config: t.Dict[str, t.Any],
extensions: t.Sequence[str] = (),
log_queries: bool = False,
extra_nodes: t.Dict[str, PostgresEngine] = None,
extra_nodes: t.Dict[str, CockroachEngine] = None,
) -> None:
if extra_nodes is None:
extra_nodes = {}

self.config = config
self.extensions = extensions
self.log_queries = log_queries
self.extra_nodes = extra_nodes
self.pool: t.Optional[Pool] = None
database_name = config.get("database", "Unknown")
self.transaction_connection = contextvars.ContextVar(
f"pg_transaction_connection_{database_name}", default=None
super().__init__(
config=config,
extensions=extensions,
log_queries=log_queries,
extra_nodes=extra_nodes,
)
super(
PostgresEngine, self
).__init__() # lgtm[py/super-not-enclosing-class]

async def prep_database(self):
try:
Expand Down
Loading

0 comments on commit 13c9d69

Please sign in to comment.