Async/Await Support¶
MongoDB-ODM now provides comprehensive support for async/await operations, built on top of PyMongo's async capabilities. This enables you to build high-performance asynchronous applications with MongoDB while maintaining the familiar ODM interface.
Async/Await Features¶
We've added async/await support to MongoDB-ODM with the following key features:
Async Function Naming Convention¶
- Most async methods are prefixed with
aand aligned with existing methods (e.g.,find()→afind(),create()→acreate()). - Function arguments remain the same between sync and async versions
- Consistent API design makes migration straightforward
Backward Compatibility¶
- All existing synchronous functionality remains unchanged
- No breaking changes to current sync operations
- You can gradually migrate to async operations
Complete Async Coverage¶
- All major database operations have async equivalents
- Connection management, CRUD operations, aggregation, transactions
- Index management and relationship loading
Quick Example¶
Here's a simple comparison showing the similarity between sync and async operations:
Synchronous (Existing)¶
import os
from mongodb_odm import Document, Field, apply_indexes, connect, disconnect
MONGO_URL = os.environ.get("MONGO_URL", "mongodb://localhost:27017/testdb")
class User(Document):
name: str = Field()
email: str = Field()
def main():
# Connection
connect(MONGO_URL)
apply_indexes()
# Create document
user = User(name="John", email="john@example.com")
user.create()
# Find documents
_users = list(User.find({"name": "John"}))
# Update document
user.update({"$set": {"name": "Jane"}})
# Delete document
user.delete()
# Cleanup
disconnect()
if __name__ == "__main__":
main()
Asynchronous (New)¶
import asyncio
import os
from mongodb_odm import Document, Field, adisconnect, async_apply_indexes, connect
MONGO_URL = os.environ.get("MONGO_URL", "mongodb://localhost:27017/testdb")
class User(Document):
name: str = Field()
email: str = Field()
async def main():
connect(MONGO_URL, async_is_enabled=True)
await async_apply_indexes()
# Create document
user = User(name="John", email="john@example.com")
await user.acreate()
# Find documents
_users = [user async for user in User.afind({"name": "John"})]
# Update document
await user.aupdate({"$set": {"name": "Jane"}})
# Delete document
await user.adelete()
# Cleanup
await adisconnect()
asyncio.run(main())
Key Async Methods¶
All async methods follow the same pattern - they're prefixed with a and require await:
Connection¶
connect(..., async_is_enabled=True)- Enable async modeawait adisconnect()- Close async connectionawait adrop_database()- Drop database asynchronously
Index Management¶
await async_apply_indexes()- Apply indexes asynchronously
Operations¶
await doc.acreate()- Create documentModel.afind(filter)- Find documents (returns AsyncIterator)await Model.afind_one(filter)- Find single documentawait Model.aget(filter)- Get document (raises if not found)await Model.aget_or_create(filter, **data)- Get or create documentawait doc.aupdate(data)- Update documentawait Model.aupdate_one(filter, data)- Update one documentawait Model.aupdate_many(filter, data)- Update multiple documentsawait doc.adelete()- Delete documentawait Model.adelete_one(filter)- Delete one documentawait Model.adelete_many(filter)- Delete multiple documentsawait Model.acount_documents(filter)- Count documentsawait Model.aexists(filter)- Check if documents existModel.aaggregate(pipeline)- Aggregation (returns AsyncIterator)await Model.aget_random_one(filter)- Get random documentawait Model.abulk_write(operations)- Bulk operationsawait Model.aload_related(objects, fields)- Load relationships
Sessions & Transactions¶
await Model.astart_session()- Start async session for transactions
Troubleshooting¶
Event Loop Synchronization Issues¶
When using async functions in environments where an event loop is already running (like Jupyter notebooks, some web frameworks, or testing environments), you may encounter RuntimeError: asyncio.run() cannot be called from a running event loop. This happens because asyncio.run() tries to create a new event loop, but one already exists.
The solution is to detect if an event loop is running and handle it appropriately:
def run_async(func_call: Coroutine[Any, Any, T]) -> T:
"""
Run an async function, handling both cases where an event loop
is already running or not.
"""
try:
# Check if we're already in an event loop
asyncio.get_running_loop()
# If we are, run in a separate thread to avoid conflicts
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(asyncio.run, func_call)
return future.result()
except RuntimeError:
# No event loop running, safe to use asyncio.run()
return asyncio.run(func_call)
def create_indexes() -> None:
run_async(async_apply_indexes())