Skip to main content

Services

Business Logic Layer

Services contain the business logic, called by API routes.

Architecture

┌─────────────────────────────────────────────────────┐
│ API Layer │
│ (FastAPI Routes + Deps) │
└─────────────────────────┬───────────────────────────┘

┌─────────────────────────▼───────────────────────────┐
│ Service Layer │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Ledger │ │ Donations│ │ Stripe │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ ┌────┴─────┐ ┌────┴─────┐ ┌──────────┐ │
│ │ Auth │ │ Orgs │ │ AuthZ │ │
│ └──────────┘ └──────────┘ │ (SpiceDB)│ │
│ └──────────┘ │
└─────────────────────────┬───────────────────────────┘

┌─────────────────────────▼───────────────────────────┐
│ Database Layer │
│ (SQLAlchemy Models) │
└─────────────────────────────────────────────────────┘

Ledger Service

Location: src/amply/services/ledger.py

Purpose: Tamper-evident financial record keeping with hash chain integrity.

Hash Chain

# lib/crypto.py
import hashlib
import json
from datetime import datetime

def canonical_json(obj: dict) -> str:
"""Produce canonical JSON string for hashing."""
return json.dumps(obj, sort_keys=True, separators=(',', ':'), ensure_ascii=False)

def compute_entry_hash(
entry_id: str,
timestamp: datetime,
organisation_id: str,
entry_type: str,
amount: int,
currency: str,
metadata: dict,
prev_entry_hash: str | None,
) -> str:
"""Compute SHA-256 hash for a ledger entry."""
timestamp_str = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')

parts = [
entry_id,
timestamp_str,
organisation_id,
entry_type,
str(amount),
currency.upper(),
canonical_json(metadata),
prev_entry_hash if prev_entry_hash else 'null',
]

input_string = '|'.join(parts)
hash_hex = hashlib.sha256(input_string.encode('utf-8')).hexdigest()

return f'sha256:{hash_hex}'

Create Entry

# services/ledger.py
async def create_entry(
db: AsyncSession,
organisation_id: str,
entry_type: str,
amount: int,
currency: str,
metadata: dict,
) -> LedgerEntry:
"""Create ledger entry with hash chain integrity."""
# Advisory lock per-organisation to prevent race conditions
# Uses parameterized query to prevent SQL injection
await db.execute(
text("SELECT pg_advisory_xact_lock(hashtext(:org_id))"),
{"org_id": organisation_id}
)

# Get previous entry
prev = await get_latest_entry(db, organisation_id)
prev_hash = prev.entry_hash if prev else None

entry_id = generate_id("led")
timestamp = datetime.utcnow()

# Compute hash
entry_hash = compute_entry_hash(
entry_id=entry_id,
timestamp=timestamp,
organisation_id=organisation_id,
entry_type=entry_type,
amount=amount,
currency=currency,
metadata=metadata,
prev_entry_hash=prev_hash,
)

entry = LedgerEntry(
id=entry_id,
organisation_id=organisation_id,
type=entry_type,
amount=amount,
currency=currency,
metadata=metadata,
prev_entry_hash=prev_hash,
entry_hash=entry_hash,
created_at=timestamp,
)
db.add(entry)
return entry

Verify Chain

async def verify_chain(
db: AsyncSession,
organisation_id: str,
) -> ChainVerificationResult:
"""Verify hash chain integrity for an organisation."""
entries = await get_all_entries(db, organisation_id)

for i, entry in enumerate(entries):
# Verify entry's own hash
expected_hash = compute_entry_hash(
entry_id=entry.id,
timestamp=entry.created_at,
organisation_id=entry.organisation_id,
entry_type=entry.type,
amount=entry.amount,
currency=entry.currency,
metadata=entry.metadata,
prev_entry_hash=entry.prev_entry_hash,
)

if entry.entry_hash != expected_hash:
return ChainVerificationResult(
valid=False,
broken_at=entry.id,
error="hash_mismatch",
)

# Verify link to previous
if i > 0:
prev_entry = entries[i - 1]
if entry.prev_entry_hash != prev_entry.entry_hash:
return ChainVerificationResult(
valid=False,
broken_at=entry.id,
error="chain_link_broken",
)

return ChainVerificationResult(valid=True, entry_count=len(entries))

Entry Types

class EntryType(str, Enum):
DONATION_RECEIVED = "donation_received"
EXPENSE = "expense"
TRANSFER_IN = "transfer_in"
TRANSFER_OUT = "transfer_out"
REFUND_ISSUED = "refund_issued"
FEE = "fee"

Donations Service

Location: src/amply/services/donations.py

Purpose: Donation lifecycle from payment to ledger.

Create Donation

async def create_donation(
db: AsyncSession,
data: DonationCreate,
) -> DonationResult:
"""Create a new donation (pending payment)."""
organisation = await get_organisation(db, data.organisation_id)
if not organisation.stripe_account_id:
raise ValueError("Organisation not connected to Stripe")

donation_id = generate_id("don")

# Create PaymentIntent on connected account
payment_intent = await stripe_service.create_payment_intent(
amount=data.amount,
currency=data.currency,
stripe_account_id=organisation.stripe_account_id,
metadata={"donation_id": donation_id},
)

donation = Donation(
id=donation_id,
organisation_id=data.organisation_id,
fund_id=data.fund_id,
amount=data.amount,
currency=data.currency,
donor_email=data.donor_email,
donor_name=data.donor_name,
stripe_payment_intent_id=payment_intent.id,
status="pending",
)
db.add(donation)
await db.commit()

return DonationResult(
id=donation.id,
client_secret=payment_intent.client_secret,
status="pending",
)

Complete Donation

async def complete_donation(
db: AsyncSession,
payment_intent_id: str,
) -> Donation:
"""Complete donation after successful Stripe payment."""
donation = await get_donation_by_payment_intent(db, payment_intent_id)
if not donation:
raise ValueError("Donation not found")

if donation.status == "completed":
return donation # Idempotent

async with db.begin():
# Create ledger entry with Stripe reference for third-party verification
entry = await ledger_service.create_entry(
db,
organisation_id=donation.organisation_id,
entry_type="donation_received",
amount=donation.amount,
currency=donation.currency,
metadata={
"donation_id": donation.id,
"stripe_payment_intent_id": donation.stripe_payment_intent_id, # Third-party anchor
"donor_name": donation.donor_name,
},
)

# Update donation
donation.status = "completed"
donation.ledger_entry_id = entry.id
donation.completed_at = datetime.utcnow()

return donation

Donation Flow

1. POST /donations
└── Create Donation (pending)
└── Create Stripe PaymentIntent
└── Return client_secret

2. Frontend: stripe.confirmPayment()
└── Stripe processes card

3. POST /webhooks/stripe (payment_intent.succeeded)
└── complete_donation()
└── Create ledger entry
└── Update donation status

4. Donor sees confirmation
└── Transaction visible in public ledger

Stripe Service

Location: src/amply/services/stripe.py

Purpose: Stripe Connect integration for payments.

Create Payment Intent

async def create_payment_intent(
amount: int,
currency: str,
stripe_account_id: str,
metadata: dict,
) -> stripe.PaymentIntent:
"""Create PaymentIntent on connected account (direct charge)."""
return stripe.PaymentIntent.create(
amount=amount,
currency=currency.lower(),
stripe_account=stripe_account_id, # Direct charge
metadata=metadata,
automatic_payment_methods={"enabled": True},
)

Handle Webhook

async def handle_webhook(
payload: bytes,
signature: str,
) -> None:
"""Handle incoming Stripe webhook.

Security: Verifies Stripe-Signature header to ensure authenticity.
Idempotency: complete_donation() checks status before processing.
"""
# CRITICAL: Verify webhook signature to prevent spoofing
try:
event = stripe.Webhook.construct_event(
payload,
signature,
settings.stripe_webhook_secret,
)
except stripe.error.SignatureVerificationError:
raise HTTPException(status_code=400, detail="Invalid signature")

if event.type == "payment_intent.succeeded":
payment_intent = event.data.object
# Idempotent: returns existing donation if already completed
await donations_service.complete_donation(
db, payment_intent.id
)

elif event.type == "payment_intent.payment_failed":
payment_intent = event.data.object
await donations_service.fail_donation(
db, payment_intent.id, payment_intent.last_payment_error
)

Organisation Onboarding

def get_connect_url(organisation_id: str, user_id: str) -> str:
"""Generate Stripe Connect OAuth URL."""
state = generate_state_token(organisation_id, user_id)

return f"https://connect.stripe.com/oauth/authorize?" + urlencode({
"response_type": "code",
"client_id": settings.stripe_client_id,
"scope": "read_write",
"redirect_uri": f"{settings.api_url}/stripe/callback",
"state": state,
})

async def complete_connect(code: str, state: str) -> Organisation:
"""Complete Stripe Connect OAuth flow."""
organisation_id, user_id = verify_state_token(state)

response = stripe.OAuth.token(grant_type="authorization_code", code=code)
stripe_account_id = response["stripe_user_id"]

organisation = await update_organisation(
organisation_id,
stripe_account_id=stripe_account_id,
stripe_connected_at=datetime.utcnow(),
)

return organisation

Auth Service

Location: src/amply/services/auth.py

Purpose: User authentication and secure session management.

Session Security Model

Sessions are protected by multiple security layers:

LayerPurposeMechanism
Security stampsInvalidate on password changeStamp comparison
IP bindingDetect location anomaliesConfigurable strictness
User-agent trackingIdentify devicesFingerprint logging
Session revocationUser/admin controlDelete or rotate stamp

Login (with Security Features)

async def login(
db: AsyncSession,
email: str,
password: str,
request: Request,
) -> Session:
"""Authenticate user and create secure session."""
user = await get_user_by_email(db, email)
if not user or not verify_password(password, user.password_hash):
await log_security_event(None, "login_failed", {"email": email})
raise AuthenticationError("Invalid credentials")

# Parse client information
client_info = parse_client_info(request)

# Determine IP binding from user preferences
ip_binding = user.security_settings.get("ip_binding", "country")

# Create session with security context
session_id = generate_id("ses")
session_data = {
"user_id": user.id,
"security_stamp": user.security_stamp, # Copy current stamp
"ip_binding": ip_binding,
"created_at": datetime.utcnow().isoformat(),
"last_activity": datetime.utcnow().isoformat(),
"expires_at": (datetime.utcnow() + timedelta(days=7)).isoformat(),
"client": {
"ip_address": client_info.ip_address,
"ip_country": client_info.ip_country,
"ip_city": client_info.ip_city,
"user_agent": client_info.user_agent,
"browser_family": client_info.browser_family,
"os_family": client_info.os_family,
"device_type": client_info.device_type,
"fingerprint": client_info.fingerprint,
},
}

# Store in Redis with TTL
await redis.setex(
f"session:{session_id}",
86400 * 7, # 7 days
json.dumps(session_data)
)

# Add to user's session index
await redis.sadd(f"user_sessions:{user.id}", session_id)

# Check for new device notification
if await is_new_device(user.id, client_info.fingerprint):
await send_new_device_notification(user, client_info)

await log_security_event(user.id, "session_created", {"device": client_info.fingerprint})

return Session(id=session_id, **session_data)

Session Validation (Complete Pipeline)

async def validate_session(
session_id: str,
request: Request,
) -> User | None:
"""
Complete session validation with security checks.

Checks: expiration, security stamp, IP binding, user-agent.
"""
# 1. Fetch session
session = await get_session(session_id)
if not session:
return None

# 2. Check expiration
if datetime.fromisoformat(session["expires_at"]) < datetime.utcnow():
await delete_session(session_id, session["user_id"])
return None

# 3. Security stamp validation (critical)
user = await get_user(session["user_id"])
if not user:
await delete_session(session_id, session["user_id"])
return None

current_stamp = await get_user_security_stamp(user.id)
if session["security_stamp"] != current_stamp:
# Password changed or "logout everywhere" triggered
await delete_session(session_id, user.id)
await log_security_event(user.id, "session_invalidated_stamp", session_id)
return None

# 4. IP binding validation
current_ip = get_client_ip(request)
if not validate_ip_binding(session, current_ip):
await log_security_event(user.id, "ip_binding_violation", {
"session_id": session_id,
"original_ip": session["client"]["ip_address"],
"current_ip": current_ip,
})
raise IPBindingViolation("Session IP mismatch")

# 5. User-agent anomaly detection (soft check)
current_ua = parse_client_info(request)
if current_ua.fingerprint != session["client"]["fingerprint"]:
await log_security_event(user.id, "ua_change_detected", {
"session_id": session_id,
"original": session["client"]["fingerprint"],
"current": current_ua.fingerprint,
})
# Log but don't block - could be browser update

# 6. Update last activity
await update_session_activity(session_id)

return user

Security Stamp Management

def generate_security_stamp() -> str:
"""Generate random security stamp."""
return secrets.token_urlsafe(32)

async def rotate_security_stamp(
db: AsyncSession,
user_id: str,
reason: str,
):
"""
Rotate user's security stamp, invalidating ALL sessions.

Called on: password change, email change, "logout everywhere",
2FA changes, admin action.
"""
new_stamp = generate_security_stamp()

await db.execute(
update(User)
.where(User.id == user_id)
.values(security_stamp=new_stamp)
)
await db.commit()

# Invalidate cached stamp
await redis.delete(f"user_stamp:{user_id}")

await log_security_event(user_id, "security_stamp_rotated", {"reason": reason})

async def get_user_security_stamp(user_id: str) -> str:
"""Get user's security stamp (cached for performance)."""
cache_key = f"user_stamp:{user_id}"

# Try cache first
cached = await redis.get(cache_key)
if cached:
return cached

# Fetch from database
user = await get_user(user_id)
stamp = user.security_stamp

# Cache for 5 minutes
await redis.setex(cache_key, 300, stamp)

return stamp

IP Binding Validation

def validate_ip_binding(session: dict, current_ip: str) -> bool:
"""Validate request IP against session's binding policy."""
binding = session.get("ip_binding", "country")
original_ip = session["client"]["ip_address"]

if binding == "none":
return True

if binding == "strict":
return current_ip == original_ip

if binding == "subnet":
# Same /24 network
return get_subnet(current_ip, 24) == get_subnet(original_ip, 24)

if binding == "country":
original_country = session["client"]["ip_country"]
current_country = get_country_from_ip(current_ip)
return current_country == original_country

return True # Unknown binding type, allow

Session Revocation

async def logout(session_id: str, user_id: str):
"""Log out current session."""
await delete_session(session_id, user_id)
await log_security_event(user_id, "session_revoked", session_id)

async def logout_everywhere(user_id: str, reason: str = "user_requested"):
"""Invalidate ALL user sessions by rotating security stamp."""
await rotate_security_stamp(user_id, reason)
# All sessions now invalid on next validation attempt

async def revoke_specific_session(user_id: str, target_session_id: str):
"""Revoke a specific session from 'Active Sessions' list."""
session = await get_session(target_session_id)

if not session or session["user_id"] != user_id:
raise PermissionError("Cannot revoke this session")

await delete_session(target_session_id, user_id)
await log_security_event(user_id, "session_revoked", target_session_id)

async def delete_session(session_id: str, user_id: str):
"""Delete session from Redis."""
await redis.delete(f"session:{session_id}")
await redis.srem(f"user_sessions:{user_id}", session_id)

Active Sessions List

async def get_active_sessions(user_id: str, current_session_id: str) -> list[dict]:
"""Get all active sessions for user display."""
session_ids = await redis.smembers(f"user_sessions:{user_id}")

sessions = []
for session_id in session_ids:
session = await get_session(session_id)
if session:
sessions.append({
"id": session_id,
"device": f"{session['client']['browser_family']} on {session['client']['os_family']}",
"device_type": session["client"]["device_type"],
"location": session["client"].get("ip_city", session["client"]["ip_country"]),
"ip_address": mask_ip(session["client"]["ip_address"]),
"last_active": session["last_activity"],
"created_at": session["created_at"],
"is_current": session_id == current_session_id,
})

# Sort by last activity, current session first
sessions.sort(key=lambda s: (not s["is_current"], s["last_active"]), reverse=True)

return sessions

Password Change (Triggers Stamp Rotation)

async def change_password(
db: AsyncSession,
user_id: str,
current_password: str,
new_password: str,
current_session_id: str,
):
"""Change password and invalidate all other sessions."""
user = await get_user(user_id)

if not verify_password(current_password, user.password_hash):
raise AuthenticationError("Current password incorrect")

# Update password
new_hash = hash_password(new_password)
await db.execute(
update(User)
.where(User.id == user_id)
.values(password_hash=new_hash)
)

# Rotate security stamp (invalidates ALL sessions)
await rotate_security_stamp(db, user_id, "password_changed")

# Re-create current session with new stamp (keep user logged in)
# ... session recreation logic ...

await log_security_event(user_id, "password_changed", None)

Organisations Service

Location: src/amply/services/organisations.py

Purpose: Organisation management.

Create Organisation

async def create_organisation(
db: AsyncSession,
data: OrganisationCreate,
owner_user_id: str,
) -> Organisation:
"""Create new organisation with default fund."""
org_id = generate_id("org")

organisation = Organisation(
id=org_id,
name=data.name,
legal_name=data.legal_name,
country=data.country,
status="pending",
)
db.add(organisation)

# Create default fund
fund = Fund(
id=generate_id("fund"),
organisation_id=org_id,
name="General Fund",
type="general",
)
db.add(fund)

# Add owner
membership = OrganisationMembership(
user_id=owner_user_id,
organisation_id=org_id,
role="owner",
)
db.add(membership)

await db.commit()
return organisation

Authorization Service

Location: app/services/authorization.py

Purpose: Sync SpiceDB relationships when entities are created, updated, or deleted.

Relationship Sync

Every domain event that changes authorization state has a corresponding sync function:

FunctionTriggerSpiceDB Effect
sync_organization_createdOrg createdWrite organization#parent@platform:amply
sync_member_addedMember joins orgWrite organization#<role>@user:X
sync_member_removedMember leaves orgDelete organization#<role>@user:X
sync_member_role_changedRole updatedDelete old role, write new role
sync_fund_createdFund createdWrite fund#parent@organization:X
sync_campaign_createdCampaign createdWrite campaign#parent@organization:X + campaign#owner@user:Z
sync_fund_deletedFund deletedDelete all relationships for fund
sync_campaign_deletedCampaign deletedDelete all relationships for campaign
sync_organization_deletedOrg deletedDelete all relationships for org

Retry Strategy

All write operations use exponential backoff (3 attempts with 0.1s, 0.5s, 2.0s delays). If all retries fail, the error is logged and re-raised.

Delete operations do not retry — a failed delete is fail-safe because the resource no longer exists in the DB, so permission checks fail at the DB lookup stage before reaching SpiceDB.

Full implementation details: Authorization


Related: