Zum Hauptinhalt springen

Testing Guide

Pytest Testing Strategy

Target: 100% code coverage

Test Structure

tests/
├── conftest.py # Shared fixtures
├── factories.py # Test data factories

├── unit/ # Unit tests (mocked dependencies)
│ ├── modules/
│ │ ├── test_ledger.py
│ │ ├── test_payments.py
│ │ ├── test_donations.py
│ │ ├── test_organisations.py
│ │ ├── test_users.py
│ │ ├── test_campaigns.py
│ │ ├── test_businesses.py
│ │ └── test_webhooks.py
│ ├── api/
│ │ ├── test_donations_api.py
│ │ ├── test_organisations_api.py
│ │ └── ...
│ └── lib/
│ ├── test_crypto.py
│ └── test_email.py

├── integration/ # Integration tests (real database)
│ ├── test_donation_flow.py
│ ├── test_ledger_integrity.py
│ ├── test_stripe_integration.py
│ └── test_webhook_dispatch.py

└── e2e/ # End-to-end tests
├── test_full_donation.py
├── test_organisation_onboarding.py
└── test_recurring_donations.py

Configuration

# conftest.py
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from amply.db.base import Base
from amply.config import settings

@pytest.fixture(scope="session")
def event_loop():
"""Create event loop for async tests."""
import asyncio
loop = asyncio.new_event_loop()
yield loop
loop.close()

@pytest.fixture(scope="session")
async def engine():
"""Create test database engine."""
engine = create_async_engine(
settings.test_database_url,
echo=False
)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()

@pytest.fixture
async def db(engine):
"""Create database session for each test."""
async with AsyncSession(engine) as session:
yield session
await session.rollback()

@pytest.fixture
def client(db):
"""Create test client with database session."""
from fastapi.testclient import TestClient
from amply.main import app
from amply.dependencies import get_db

app.dependency_overrides[get_db] = lambda: db
with TestClient(app) as client:
yield client
app.dependency_overrides.clear()

Test Factories

# factories.py
import factory
from factory.alchemy import SQLAlchemyModelFactory
from amply.db.models import Organisation, User, Fund, LedgerEntry

class OrganisationFactory(SQLAlchemyModelFactory):
class Meta:
model = Organisation
sqlalchemy_session_persistence = "commit"

id = factory.Sequence(lambda n: f"org_{n:06d}")
name = factory.Faker('company')
legal_name = factory.LazyAttribute(lambda o: f"{o.name} Inc.")
type = "nonprofit"
status = "verified"
country = "DE"
verification_level = "standard"

class UserFactory(SQLAlchemyModelFactory):
class Meta:
model = User

id = factory.Sequence(lambda n: f"usr_{n:06d}")
email = factory.Faker('email')
name = factory.Faker('name')
status = "active"
email_verified = True

class FundFactory(SQLAlchemyModelFactory):
class Meta:
model = Fund

id = factory.Sequence(lambda n: f"fund_{n:06d}")
organisation_id = factory.LazyAttribute(lambda o: OrganisationFactory().id)
name = "General Fund"
type = "general"
status = "active"
balance = 0
currency = "EUR"

class LedgerEntryFactory(SQLAlchemyModelFactory):
class Meta:
model = LedgerEntry

id = factory.Sequence(lambda n: f"led_{n:06d}")
organisation_id = factory.LazyAttribute(lambda o: OrganisationFactory().id)
type = "donation_received"
amount = 5000
currency = "EUR"
visibility = "public_full"
metadata = {}

Unit Tests

Ledger Tests

# tests/unit/modules/test_ledger.py
import pytest
from amply.modules.ledger.hash_chain import compute_entry_hash, verify_entry_hash

class TestHashChain:
def test_compute_hash_deterministic(self):
"""Same input produces same hash."""
data = {
'entry_id': 'led_001',
'timestamp': '2025-01-15T14:30:00Z',
'organisation_id': 'org_xyz',
'type': 'donation_received',
'amount': 5000,
'currency': 'EUR',
'visibility': 'public_full',
'metadata': {},
'prev_entry_hash': None
}
hash1 = compute_entry_hash(**data)
hash2 = compute_entry_hash(**data)
assert hash1 == hash2

def test_compute_hash_includes_prev_hash(self):
"""Hash changes when prev_entry_hash changes."""
data = {
'entry_id': 'led_002',
'timestamp': '2025-01-15T14:30:00Z',
'organisation_id': 'org_xyz',
'type': 'donation_received',
'amount': 5000,
'currency': 'EUR',
'visibility': 'public_full',
'metadata': {}
}
hash1 = compute_entry_hash(**data, prev_entry_hash=None)
hash2 = compute_entry_hash(**data, prev_entry_hash='sha256:abc123')
assert hash1 != hash2

def test_hash_format(self):
"""Hash is SHA-256 in expected format."""
data = {...}
result = compute_entry_hash(**data)
assert result.startswith('sha256:')
assert len(result) == len('sha256:') + 64 # SHA-256 = 64 hex chars

class TestLedgerService:
@pytest.mark.asyncio
async def test_create_entry_links_to_previous(self, db, organisation):
"""New entry references previous entry's hash."""
service = LedgerService()

entry1 = await service.create_entry(
db, organisation.id, 'donation_received', 5000, 'EUR', 'public_full', {}
)
entry2 = await service.create_entry(
db, organisation.id, 'donation_received', 3000, 'EUR', 'public_full', {}
)

assert entry2.prev_entry_hash == entry1.entry_hash

@pytest.mark.asyncio
async def test_verify_chain_detects_tampering(self, db, organisation):
"""Chain verification fails if entry is tampered."""
service = LedgerService()

# Create entries
await service.create_entry(db, organisation.id, ...)
await service.create_entry(db, organisation.id, ...)
await service.create_entry(db, organisation.id, ...)

# Tamper with middle entry (simulate attack)
entry = await db.get(LedgerEntry, 'led_002')
entry.amount = 9999999
await db.commit()

# Verify chain
result = await service.verify_chain(db, organisation.id)
assert result.valid is False
assert 'led_002' in result.broken_at

Payment Tests

# tests/unit/modules/test_payments.py
import pytest
from unittest.mock import patch, MagicMock

class TestPaymentService:
@pytest.mark.asyncio
@patch('amply.modules.payments.stripe_connect.stripe')
async def test_create_payment_intent(self, mock_stripe, db, organisation):
"""PaymentIntent created on connected account."""
mock_stripe.PaymentIntent.create.return_value = MagicMock(
id='pi_test123',
client_secret='pi_test123_secret_xxx'
)

service = PaymentService()
result = await service.create_payment_intent(
organisation_id=organisation.id,
amount=5000,
currency='EUR'
)

mock_stripe.PaymentIntent.create.assert_called_once()
call_kwargs = mock_stripe.PaymentIntent.create.call_args.kwargs
assert call_kwargs['amount'] == 5000
assert call_kwargs['currency'] == 'eur'
assert call_kwargs['transfer_data']['destination'] == organisation.stripe_account_id

@pytest.mark.asyncio
async def test_process_successful_payment_creates_ledger_entry(
self, db, organisation, payment_intent
):
"""Successful payment creates ledger entry."""
service = PaymentService()

result = await service.process_successful_payment(
db, payment_intent.id
)

# Check ledger entry created
entry = await db.query(LedgerEntry).filter_by(
transaction_id=result.id
).first()
assert entry is not None
assert entry.amount == payment_intent.amount
assert entry.type == 'donation_received'

Integration Tests

# tests/integration/test_donation_flow.py
import pytest
from httpx import AsyncClient

class TestDonationFlow:
@pytest.mark.asyncio
async def test_full_donation_flow(self, client, organisation, fund):
"""Test complete donation flow from API to ledger."""
# 1. Create donation
response = await client.post('/v1/donations', json={
'organisation_id': organisation.id,
'fund_id': fund.id,
'amount': 5000,
'currency': 'EUR',
'donor': {
'email': 'test@example.com',
'name': 'Test Donor'
}
})
assert response.status_code == 201
donation = response.json()['data']
assert donation['status'] == 'pending'

# 2. Simulate Stripe webhook (payment success)
await simulate_stripe_webhook('payment_intent.succeeded', {
'id': donation['stripe_payment_id'],
'amount': 5000,
'currency': 'eur'
})

# 3. Verify donation completed
response = await client.get(f'/v1/donations/{donation["id"]}')
assert response.json()['data']['status'] == 'completed'

# 4. Verify ledger entry created
response = await client.get(
f'/v1/organisations/{organisation.id}/ledger'
)
entries = response.json()['data']
assert len(entries) == 1
assert entries[0]['amount'] == 5000
assert entries[0]['type'] == 'donation_received'

# 5. Verify fund balance updated
response = await client.get(f'/v1/funds/{fund.id}')
assert response.json()['data']['balance'] == 5000

class TestLedgerIntegrity:
@pytest.mark.asyncio
async def test_concurrent_entries_maintain_chain(self, db, organisation):
"""Concurrent ledger entries maintain hash chain integrity."""
import asyncio

async def create_entry(n):
await ledger_service.create_entry(
db, organisation.id, 'donation_received',
1000 * n, 'EUR', 'public_full', {'n': n}
)

# Create 10 entries concurrently
await asyncio.gather(*[create_entry(i) for i in range(10)])

# Verify chain integrity
result = await ledger_service.verify_chain(db, organisation.id)
assert result.valid is True
assert result.entry_count == 10

Running Tests

# Run all tests
pytest

# Run with coverage
pytest --cov=amply --cov-report=html --cov-fail-under=100

# Run specific test file
pytest tests/unit/modules/test_ledger.py

# Run specific test
pytest tests/unit/modules/test_ledger.py::TestHashChain::test_compute_hash_deterministic

# Run with verbose output
pytest -v

# Run only integration tests
pytest tests/integration/

# Run in parallel
pytest -n auto

CI Configuration

# .github/workflows/test.yml
name: Test

on: [push, pull_request]

jobs:
test:
runs-on: ubuntu-latest

services:
postgres:
image: postgres:15
env:
POSTGRES_DB: amply_test
POSTGRES_USER: test
POSTGRES_PASSWORD: test
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432

redis:
image: redis:7
ports:
- 6379:6379

steps:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'

- name: Install dependencies
run: pip install -e ".[dev]"

- name: Run tests
env:
DATABASE_URL: postgresql+asyncpg://test:test@localhost:5432/amply_test
REDIS_URL: redis://localhost:6379
run: pytest --cov=amply --cov-report=xml --cov-fail-under=100

- name: Upload coverage
uses: codecov/codecov-action@v4

Coverage Requirements

Minimum: 100% line coverage

Exclusions (in pyproject.toml):

[tool.coverage.run]
omit = [
"*/migrations/*",
"*/__init__.py",
]

[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"if TYPE_CHECKING:",
"raise NotImplementedError",
]

Related: