Transaction¶
Introduction¶
MongoDB transactions are needed to ensure data consistency in complex operations that involve multiple documents and collections. By grouping multiple operations into a single transaction, you can ensure that all operations either succeed or fail together, preventing data inconsistencies and race conditions.
We suggest reading the doc from MongoDB about transactions.
Start transaction¶
To implement transactions, we will use the default mechanism of PyMongo Transactions.
First, we will start a session using with to implement transactions.
Then start the transaction using the session object.
import os
from typing import Optional
from mongodb_odm import Document, connect
from pymongo.errors import OperationFailure
class Player(Document):
name: str
country_code: str
rating: Optional[int] = None
def create_documents():
with Document.start_session() as session:
with session.start_transaction():
try:
Player(
name="Pelé",
country_code="BRA",
).create(session=session)
maradona = Player(name="Diego Maradona", country_code="ARG")
maradona.create(session=session)
except OperationFailure:
session.abort_transaction()
# Code omitted below
Full file preview
import os
from typing import Optional
from mongodb_odm import Document, connect
from pymongo.errors import OperationFailure
class Player(Document):
name: str
country_code: str
rating: Optional[int] = None
def create_documents():
with Document.start_session() as session:
with session.start_transaction():
try:
Player(
name="Pelé",
country_code="BRA",
).create(session=session)
maradona = Player(name="Diego Maradona", country_code="ARG")
maradona.create(session=session)
except OperationFailure:
session.abort_transaction()
def update_documents():
with Document.start_session() as session:
with session.start_transaction():
try:
pele = Player.get({Player.name: "Pelé"})
pele.rating = 98
pele.update(session=session)
maradona = Player.get({Player.name: "Diego Maradona"})
pele.rating = 97
maradona.update(session=session)
except OperationFailure:
session.abort_transaction()
def delete_documents():
with Document.start_session() as session:
with session.start_transaction():
try:
pele = Player.get({Player.name: "Pelé"})
pele.delete(session=session)
maradona = Player.get({Player.name: "Diego Maradona"})
maradona.delete(session=session)
except OperationFailure:
session.abort_transaction()
def main():
# Use a database that has a replica set.
connect(os.environ.get("MONGO_URL", "mongodb://localhost:27017/testdb"))
create_documents()
update_documents()
delete_documents()
if __name__ == "__main__":
main()
Create document using transaction¶
To create an object with a transaction mechanism, we will pass the session object as kwargs in the create method.
If something goes wrong, then we need to abort the actions by manually calling the abort function. Otherwise, partial actions will be applied.
# Code omitted above
def create_documents():
with Document.start_session() as session:
with session.start_transaction():
try:
Player(
name="Pelé",
country_code="BRA",
).create(session=session)
maradona = Player(name="Diego Maradona", country_code="ARG")
maradona.create(session=session)
except OperationFailure:
session.abort_transaction()
# Code omitted below
Full file preview
import os
from typing import Optional
from mongodb_odm import Document, connect
from pymongo.errors import OperationFailure
class Player(Document):
name: str
country_code: str
rating: Optional[int] = None
def create_documents():
with Document.start_session() as session:
with session.start_transaction():
try:
Player(
name="Pelé",
country_code="BRA",
).create(session=session)
maradona = Player(name="Diego Maradona", country_code="ARG")
maradona.create(session=session)
except OperationFailure:
session.abort_transaction()
def update_documents():
with Document.start_session() as session:
with session.start_transaction():
try:
pele = Player.get({Player.name: "Pelé"})
pele.rating = 98
pele.update(session=session)
maradona = Player.get({Player.name: "Diego Maradona"})
pele.rating = 97
maradona.update(session=session)
except OperationFailure:
session.abort_transaction()
def delete_documents():
with Document.start_session() as session:
with session.start_transaction():
try:
pele = Player.get({Player.name: "Pelé"})
pele.delete(session=session)
maradona = Player.get({Player.name: "Diego Maradona"})
maradona.delete(session=session)
except OperationFailure:
session.abort_transaction()
def main():
# Use a database that has a replica set.
connect(os.environ.get("MONGO_URL", "mongodb://localhost:27017/testdb"))
create_documents()
update_documents()
delete_documents()
if __name__ == "__main__":
main()
Warning
We need to call the session.abort_transaction() explicitly to abort the actions. If a partial function is executed and something goes wrong, then we need to call the abort_transaction; otherwise, partial actions will be applied.
Update document using transaction¶
We can use transactions on update operations.
We need to pass the session object as kwargs in the update method.
# Code omitted above
def update_documents():
with Document.start_session() as session:
with session.start_transaction():
try:
pele = Player.get({Player.name: "Pelé"})
pele.rating = 98
pele.update(session=session)
maradona = Player.get({Player.name: "Diego Maradona"})
pele.rating = 97
maradona.update(session=session)
except OperationFailure:
session.abort_transaction()
# Code omitted below
Full file preview
import os
from typing import Optional
from mongodb_odm import Document, connect
from pymongo.errors import OperationFailure
class Player(Document):
name: str
country_code: str
rating: Optional[int] = None
def create_documents():
with Document.start_session() as session:
with session.start_transaction():
try:
Player(
name="Pelé",
country_code="BRA",
).create(session=session)
maradona = Player(name="Diego Maradona", country_code="ARG")
maradona.create(session=session)
except OperationFailure:
session.abort_transaction()
def update_documents():
with Document.start_session() as session:
with session.start_transaction():
try:
pele = Player.get({Player.name: "Pelé"})
pele.rating = 98
pele.update(session=session)
maradona = Player.get({Player.name: "Diego Maradona"})
pele.rating = 97
maradona.update(session=session)
except OperationFailure:
session.abort_transaction()
def delete_documents():
with Document.start_session() as session:
with session.start_transaction():
try:
pele = Player.get({Player.name: "Pelé"})
pele.delete(session=session)
maradona = Player.get({Player.name: "Diego Maradona"})
maradona.delete(session=session)
except OperationFailure:
session.abort_transaction()
def main():
# Use a database that has a replica set.
connect(os.environ.get("MONGO_URL", "mongodb://localhost:27017/testdb"))
create_documents()
update_documents()
delete_documents()
if __name__ == "__main__":
main()
Delete document using transaction¶
We can use transactions on delete operations.
We need to pass the session object as kwargs in the delete method.
# Code omitted above
def delete_documents():
with Document.start_session() as session:
with session.start_transaction():
try:
pele = Player.get({Player.name: "Pelé"})
pele.delete(session=session)
maradona = Player.get({Player.name: "Diego Maradona"})
maradona.delete(session=session)
except OperationFailure:
session.abort_transaction()
# Code omitted below
Full file preview
import os
from typing import Optional
from mongodb_odm import Document, connect
from pymongo.errors import OperationFailure
class Player(Document):
name: str
country_code: str
rating: Optional[int] = None
def create_documents():
with Document.start_session() as session:
with session.start_transaction():
try:
Player(
name="Pelé",
country_code="BRA",
).create(session=session)
maradona = Player(name="Diego Maradona", country_code="ARG")
maradona.create(session=session)
except OperationFailure:
session.abort_transaction()
def update_documents():
with Document.start_session() as session:
with session.start_transaction():
try:
pele = Player.get({Player.name: "Pelé"})
pele.rating = 98
pele.update(session=session)
maradona = Player.get({Player.name: "Diego Maradona"})
pele.rating = 97
maradona.update(session=session)
except OperationFailure:
session.abort_transaction()
def delete_documents():
with Document.start_session() as session:
with session.start_transaction():
try:
pele = Player.get({Player.name: "Pelé"})
pele.delete(session=session)
maradona = Player.get({Player.name: "Diego Maradona"})
maradona.delete(session=session)
except OperationFailure:
session.abort_transaction()
def main():
# Use a database that has a replica set.
connect(os.environ.get("MONGO_URL", "mongodb://localhost:27017/testdb"))
create_documents()
update_documents()
delete_documents()
if __name__ == "__main__":
main()
Full Code¶
import os
from typing import Optional
from mongodb_odm import Document, connect
from pymongo.errors import OperationFailure
class Player(Document):
name: str
country_code: str
rating: Optional[int] = None
def create_documents():
with Document.start_session() as session:
with session.start_transaction():
try:
Player(
name="Pelé",
country_code="BRA",
).create(session=session)
maradona = Player(name="Diego Maradona", country_code="ARG")
maradona.create(session=session)
except OperationFailure:
session.abort_transaction()
def update_documents():
with Document.start_session() as session:
with session.start_transaction():
try:
pele = Player.get({Player.name: "Pelé"})
pele.rating = 98
pele.update(session=session)
maradona = Player.get({Player.name: "Diego Maradona"})
pele.rating = 97
maradona.update(session=session)
except OperationFailure:
session.abort_transaction()
def delete_documents():
with Document.start_session() as session:
with session.start_transaction():
try:
pele = Player.get({Player.name: "Pelé"})
pele.delete(session=session)
maradona = Player.get({Player.name: "Diego Maradona"})
maradona.delete(session=session)
except OperationFailure:
session.abort_transaction()
def main():
# Use a database that has a replica set.
connect(os.environ.get("MONGO_URL", "mongodb://localhost:27017/testdb"))
create_documents()
update_documents()
delete_documents()
if __name__ == "__main__":
main()