Skip to content

Transaction

Introduction

MongoDB transactions are needed to ensure data consistency in complex operations that involve multiple documents and collections. By grouping multiple operations into a single transaction, you can ensure that all operations either succeed or fail together, preventing data inconsistencies and race conditions.

We suggest reading the doc from MongoDB about transactions.

Start transaction

To implement transactions, we will use the default mechanism of PyMongo Transactions.

First, we will start a session using with to implement transactions.

Then start the transaction using the session object.

import os
from typing import Optional

from mongodb_odm import Document, connect
from pymongo.errors import OperationFailure


class Player(Document):
    name: str
    country_code: str
    rating: Optional[int] = None


def create_documents():
    with Document.start_session() as session:
        with session.start_transaction():
            try:
                Player(
                    name="Pelé",
                    country_code="BRA",
                ).create(session=session)

                maradona = Player(name="Diego Maradona", country_code="ARG")
                maradona.create(session=session)
            except OperationFailure:
                session.abort_transaction()

# Code omitted below
Full file preview
import os
from typing import Optional

from mongodb_odm import Document, connect
from pymongo.errors import OperationFailure


class Player(Document):
    name: str
    country_code: str
    rating: Optional[int] = None


def create_documents():
    with Document.start_session() as session:
        with session.start_transaction():
            try:
                Player(
                    name="Pelé",
                    country_code="BRA",
                ).create(session=session)

                maradona = Player(name="Diego Maradona", country_code="ARG")
                maradona.create(session=session)
            except OperationFailure:
                session.abort_transaction()


def update_documents():
    with Document.start_session() as session:
        with session.start_transaction():
            try:
                pele = Player.get({Player.name: "Pelé"})
                pele.rating = 98
                pele.update(session=session)

                maradona = Player.get({Player.name: "Diego Maradona"})
                pele.rating = 97
                maradona.update(session=session)
            except OperationFailure:
                session.abort_transaction()


def delete_documents():
    with Document.start_session() as session:
        with session.start_transaction():
            try:
                pele = Player.get({Player.name: "Pelé"})
                pele.delete(session=session)

                maradona = Player.get({Player.name: "Diego Maradona"})
                maradona.delete(session=session)
            except OperationFailure:
                session.abort_transaction()


def main():
    # Use a database that has a replica set.
    connect(os.environ.get("MONGO_URL", "mongodb://localhost:27017/testdb"))

    create_documents()
    update_documents()
    delete_documents()


if __name__ == "__main__":
    main()

Create document using transaction

To create an object with a transaction mechanism, we will pass the session object as kwargs in the create method.

If something goes wrong, then we need to abort the actions by manually calling the abort function. Otherwise, partial actions will be applied.

# Code omitted above

def create_documents():
    with Document.start_session() as session:
        with session.start_transaction():
            try:
                Player(
                    name="Pelé",
                    country_code="BRA",
                ).create(session=session)

                maradona = Player(name="Diego Maradona", country_code="ARG")
                maradona.create(session=session)
            except OperationFailure:
                session.abort_transaction()

# Code omitted below
Full file preview
import os
from typing import Optional

from mongodb_odm import Document, connect
from pymongo.errors import OperationFailure


class Player(Document):
    name: str
    country_code: str
    rating: Optional[int] = None


def create_documents():
    with Document.start_session() as session:
        with session.start_transaction():
            try:
                Player(
                    name="Pelé",
                    country_code="BRA",
                ).create(session=session)

                maradona = Player(name="Diego Maradona", country_code="ARG")
                maradona.create(session=session)
            except OperationFailure:
                session.abort_transaction()


def update_documents():
    with Document.start_session() as session:
        with session.start_transaction():
            try:
                pele = Player.get({Player.name: "Pelé"})
                pele.rating = 98
                pele.update(session=session)

                maradona = Player.get({Player.name: "Diego Maradona"})
                pele.rating = 97
                maradona.update(session=session)
            except OperationFailure:
                session.abort_transaction()


def delete_documents():
    with Document.start_session() as session:
        with session.start_transaction():
            try:
                pele = Player.get({Player.name: "Pelé"})
                pele.delete(session=session)

                maradona = Player.get({Player.name: "Diego Maradona"})
                maradona.delete(session=session)
            except OperationFailure:
                session.abort_transaction()


def main():
    # Use a database that has a replica set.
    connect(os.environ.get("MONGO_URL", "mongodb://localhost:27017/testdb"))

    create_documents()
    update_documents()
    delete_documents()


if __name__ == "__main__":
    main()

Warning

We need to call the session.abort_transaction() explicitly to abort the actions. If a partial function is executed and something goes wrong, then we need to call the abort_transaction; otherwise, partial actions will be applied.

Update document using transaction

We can use transactions on update operations.

We need to pass the session object as kwargs in the update method.

# Code omitted above

def update_documents():
    with Document.start_session() as session:
        with session.start_transaction():
            try:
                pele = Player.get({Player.name: "Pelé"})
                pele.rating = 98
                pele.update(session=session)

                maradona = Player.get({Player.name: "Diego Maradona"})
                pele.rating = 97
                maradona.update(session=session)
            except OperationFailure:
                session.abort_transaction()

# Code omitted below
Full file preview
import os
from typing import Optional

from mongodb_odm import Document, connect
from pymongo.errors import OperationFailure


class Player(Document):
    name: str
    country_code: str
    rating: Optional[int] = None


def create_documents():
    with Document.start_session() as session:
        with session.start_transaction():
            try:
                Player(
                    name="Pelé",
                    country_code="BRA",
                ).create(session=session)

                maradona = Player(name="Diego Maradona", country_code="ARG")
                maradona.create(session=session)
            except OperationFailure:
                session.abort_transaction()


def update_documents():
    with Document.start_session() as session:
        with session.start_transaction():
            try:
                pele = Player.get({Player.name: "Pelé"})
                pele.rating = 98
                pele.update(session=session)

                maradona = Player.get({Player.name: "Diego Maradona"})
                pele.rating = 97
                maradona.update(session=session)
            except OperationFailure:
                session.abort_transaction()


def delete_documents():
    with Document.start_session() as session:
        with session.start_transaction():
            try:
                pele = Player.get({Player.name: "Pelé"})
                pele.delete(session=session)

                maradona = Player.get({Player.name: "Diego Maradona"})
                maradona.delete(session=session)
            except OperationFailure:
                session.abort_transaction()


def main():
    # Use a database that has a replica set.
    connect(os.environ.get("MONGO_URL", "mongodb://localhost:27017/testdb"))

    create_documents()
    update_documents()
    delete_documents()


if __name__ == "__main__":
    main()

Delete document using transaction

We can use transactions on delete operations.

We need to pass the session object as kwargs in the delete method.

# Code omitted above

def delete_documents():
    with Document.start_session() as session:
        with session.start_transaction():
            try:
                pele = Player.get({Player.name: "Pelé"})
                pele.delete(session=session)

                maradona = Player.get({Player.name: "Diego Maradona"})
                maradona.delete(session=session)
            except OperationFailure:
                session.abort_transaction()

# Code omitted below
Full file preview
import os
from typing import Optional

from mongodb_odm import Document, connect
from pymongo.errors import OperationFailure


class Player(Document):
    name: str
    country_code: str
    rating: Optional[int] = None


def create_documents():
    with Document.start_session() as session:
        with session.start_transaction():
            try:
                Player(
                    name="Pelé",
                    country_code="BRA",
                ).create(session=session)

                maradona = Player(name="Diego Maradona", country_code="ARG")
                maradona.create(session=session)
            except OperationFailure:
                session.abort_transaction()


def update_documents():
    with Document.start_session() as session:
        with session.start_transaction():
            try:
                pele = Player.get({Player.name: "Pelé"})
                pele.rating = 98
                pele.update(session=session)

                maradona = Player.get({Player.name: "Diego Maradona"})
                pele.rating = 97
                maradona.update(session=session)
            except OperationFailure:
                session.abort_transaction()


def delete_documents():
    with Document.start_session() as session:
        with session.start_transaction():
            try:
                pele = Player.get({Player.name: "Pelé"})
                pele.delete(session=session)

                maradona = Player.get({Player.name: "Diego Maradona"})
                maradona.delete(session=session)
            except OperationFailure:
                session.abort_transaction()


def main():
    # Use a database that has a replica set.
    connect(os.environ.get("MONGO_URL", "mongodb://localhost:27017/testdb"))

    create_documents()
    update_documents()
    delete_documents()


if __name__ == "__main__":
    main()

Full Code

import os
from typing import Optional

from mongodb_odm import Document, connect
from pymongo.errors import OperationFailure


class Player(Document):
    name: str
    country_code: str
    rating: Optional[int] = None


def create_documents():
    with Document.start_session() as session:
        with session.start_transaction():
            try:
                Player(
                    name="Pelé",
                    country_code="BRA",
                ).create(session=session)

                maradona = Player(name="Diego Maradona", country_code="ARG")
                maradona.create(session=session)
            except OperationFailure:
                session.abort_transaction()


def update_documents():
    with Document.start_session() as session:
        with session.start_transaction():
            try:
                pele = Player.get({Player.name: "Pelé"})
                pele.rating = 98
                pele.update(session=session)

                maradona = Player.get({Player.name: "Diego Maradona"})
                pele.rating = 97
                maradona.update(session=session)
            except OperationFailure:
                session.abort_transaction()


def delete_documents():
    with Document.start_session() as session:
        with session.start_transaction():
            try:
                pele = Player.get({Player.name: "Pelé"})
                pele.delete(session=session)

                maradona = Player.get({Player.name: "Diego Maradona"})
                maradona.delete(session=session)
            except OperationFailure:
                session.abort_transaction()


def main():
    # Use a database that has a replica set.
    connect(os.environ.get("MONGO_URL", "mongodb://localhost:27017/testdb"))

    create_documents()
    update_documents()
    delete_documents()


if __name__ == "__main__":
    main()