Skip to main content
Available in: v2.2.0+ | Latest: v2.4.0 View Release Notes →

Overview

MCP Server with LangGraph provides technical controls and architecture to support compliance with data protection regulations and industry standards. This guide covers implementing controls for GDPR, SOC 2, HIPAA, and other compliance frameworks.
Important Compliance Disclaimer:This codebase provides technical controls that support compliance requirements. However:
  • Compliance ≠ Technical Controls Alone: Achieving actual compliance requires organizational policies, procedures, legal review, employee training, and often third-party audits
  • No Certification Included: This software does not come with compliance certification or attestation
  • Legal Review Required: Always consult with legal counsel and compliance experts for your specific regulatory requirements
  • Shared Responsibility: You are responsible for configuring, deploying, and operating the system in a compliant manner
See Sources & References for links to official compliance documentation.

Compliance Frameworks

GDPR

General Data Protection Regulation
  • EU data protection law
  • Applies to EU residents’ data
  • Heavy penalties for non-compliance
  • Focus on privacy and consent
We provide: Technical controls for data access, export, deletion

SOC 2

Service Organization Control 2
  • Trust Services Criteria
  • Security, availability, confidentiality
  • Annual audits required
  • Customer trust & due diligence
We provide: Audit logging, access controls, monitoring infrastructure

HIPAA

Health Insurance Portability
  • US healthcare data protection
  • Protected Health Information (PHI)
  • Business Associate Agreements
  • Technical safeguards required
We provide: Encryption, access controls, audit trails, PHI handling capabilities

ISO 27001

Information Security Management
  • International standard
  • Risk-based approach
  • Continuous improvement
  • Certification available
We provide: Security architecture aligned with ISO 27001 controls

GDPR Compliance

GDPR Compliance Data Flow

The following diagram illustrates the complete GDPR compliance data flow, including data subject requests (DSR), data classification, consent management, retention policies, right to erasure, and audit logging. Key GDPR Data Flow Components:
  1. Data Subject Requests (DSR) - Turquoise: Entry points for user rights (access, rectification, erasure, portability)
  2. Data Classification - Light Yellow: Automated PII identification and classification
  3. Consent Management - Coral Red: User consent capture, storage, and audit trail
  4. Data Retention - Purple: Automated cleanup based on retention policies (365 days default)
  5. Right to Erasure - Gold: Complete data deletion workflow with anonymization
  6. Audit Logging - Green: Comprehensive logging of all GDPR-related actions
  7. Data Storage - Blue: Encrypted storage with backup capabilities
All flows integrate with the audit logging system to maintain compliance evidence and detect anomalies.

Data Protection Principles

Users can request their data.Implementation:
from fastapi import APIRouter, Depends
from src.auth.middleware import get_current_user

router = APIRouter()

@router.get("/api/v1/users/me/data")
async def export_user_data(user = Depends(get_current_user)):
    """Export all user data (GDPR Article 15)"""

    # Gather all user data
    data = {
        "profile": await get_user_profile(user.id),
        "conversations": await get_user_conversations(user.id),
        "sessions": await get_user_sessions(user.id),
        "preferences": await get_user_preferences(user.id),
        "audit_log": await get_user_audit_log(user.id)
    }

    # Log the access request
    await audit_log.log(
        user_id=user.id,
        action="data_export",
        timestamp=datetime.utcnow()
    )

    return data
Testing:
# Test data export
curl -X GET https://api.yourdomain.com/api/v1/users/me/data \
  -H "Authorization: Bearer $TOKEN"
Users can request data deletion.Implementation:
@router.delete("/api/v1/users/me")
async def delete_user_account(
    user = Depends(get_current_user),
    confirm: bool = Query(...)
):
    """Delete user account and all data (GDPR Article 17)"""

    if not confirm:
        raise HTTPException(
            status_code=400,
            detail="Confirmation required for account deletion"
        )

    # Log deletion request
    await audit_log.log(
        user_id=user.id,
        action="account_deletion_requested",
        timestamp=datetime.utcnow()
    )

    # Delete user data
    await delete_user_conversations(user.id)
    await delete_user_sessions(user.id)
    await delete_user_preferences(user.id)
    await anonymize_audit_logs(user.id)  # Keep for compliance

    # Delete OpenFGA tuples
    await openfga_client.delete_tuples([
        {"user": f"user:{user.id}", "relation": "*", "object": "*"}
    ])

    # Delete user account
    await delete_user(user.id)

    # Final audit log (anonymized)
    await audit_log.log(
        user_id="deleted",
        action="account_deleted",
        original_user_id=user.id,
        timestamp=datetime.utcnow()
    )

    return {"message": "Account deleted successfully"}
Users can correct their data.Implementation:
@router.patch("/api/v1/users/me")
async def update_user_profile(
    profile_update: UserProfileUpdate,
    user = Depends(get_current_user)
):
    """Update user profile (GDPR Article 16)"""

    # Validate update
    validated_data = profile_update.dict(exclude_unset=True)

    # Log the update
    await audit_log.log(
        user_id=user.id,
        action="profile_updated",
        changes=validated_data,
        timestamp=datetime.utcnow()
    )

    # Update profile
    updated_user = await update_user(user.id, validated_data)

    return updated_user
Export data in machine-readable format.Implementation:
@router.get("/api/v1/users/me/export")
async def export_user_data_portable(
    format: str = Query("json", regex="^(json|csv)$"),
    user = Depends(get_current_user)
):
    """Export data in portable format (GDPR Article 20)"""

    data = await gather_user_data(user.id)

    if format == "json":
        return JSONResponse(content=data)
    elif format == "csv":
        csv_data = convert_to_csv(data)
        return Response(
            content=csv_data,
            media_type="text/csv",
            headers={
                "Content-Disposition": f"attachment; filename=user_data_{user.id}.csv"
            }
        )
Collect only necessary data.Implementation:
from pydantic import BaseModel, Field

class UserRegistration(BaseModel):
    """Minimal user data collection"""
    email: EmailStr
    # Password handled by Keycloak

    # Optional fields
    name: Optional[str] = None

    class Config:
        # Reject extra fields
        extra = "forbid"

# Auto-delete old data
async def cleanup_old_data():
    """Delete data older than retention period"""

    retention_days = 365  # 1 year
    cutoff_date = datetime.utcnow() - timedelta(days=retention_days)

    # Delete old conversations
    await db.execute(
        "DELETE FROM conversations WHERE created_at < :cutoff AND archived = true",
        {"cutoff": cutoff_date}
    )

    # Delete old sessions
    await db.execute(
        "DELETE FROM sessions WHERE last_accessed < :cutoff",
        {"cutoff": cutoff_date}
    )

    logger.info("Deleted data older than retention period")

# Schedule cleanup
from apscheduler.schedulers.asyncio import AsyncIOScheduler

scheduler = AsyncIOScheduler()
scheduler.add_job(cleanup_old_data, 'cron', hour=3)  # Daily at 3 AM
Build privacy into the system.Implementation:
# Pseudonymization
import hashlib

def pseudonymize(user_id: str) -> str:
    """Pseudonymize user ID for analytics"""
    salt = settings.pseudonymization_salt
    return hashlib.sha256(f"{user_id}{salt}".encode()).hexdigest()

# Anonymization for logs
class AuditLog(BaseModel):
    user_id: str
    action: str
    timestamp: datetime

    @classmethod
    def create_anonymized(cls, user_id: str, action: str):
        """Create anonymized audit log"""
        return cls(
            user_id=pseudonymize(user_id),
            action=action,
            timestamp=datetime.utcnow()
        )

# Data encryption at rest
from cryptography.fernet import Fernet

class EncryptedField:
    """Encrypt sensitive fields"""

    def __init__(self, key: bytes):
        self.cipher = Fernet(key)

    def encrypt(self, value: str) -> bytes:
        return self.cipher.encrypt(value.encode())

    def decrypt(self, encrypted: bytes) -> str:
        return self.cipher.decrypt(encrypted).decode()

# Usage
encryption_key = settings.encryption_key
encryptor = EncryptedField(encryption_key)

# Encrypt before storage
encrypted_data = encryptor.encrypt(sensitive_data)
await db.execute("INSERT INTO secrets (data) VALUES (:data)",
                 {"data": encrypted_data})

GDPR Checklist

### GDPR Compliance Checklist

#### Lawful Basis
- [ ] Consent mechanism implemented
- [ ] Consent records stored with metadata
- [ ] Easy consent withdrawal

#### Data Subject Rights
- [ ] Right to access (data export)
- [ ] Right to rectification (profile updates)
- [ ] Right to erasure (account deletion)
- [ ] Right to data portability (JSON/CSV export)
- [ ] Right to object (opt-out mechanisms)

#### Data Protection
- [ ] Data minimization practiced
- [ ] Purpose limitation enforced
- [ ] Storage limitation (retention policy)
- [ ] Encryption at rest and in transit
- [ ] Pseudonymization where applicable

#### Accountability
- [ ] Privacy policy published
- [ ] Data Processing Agreement (DPA) available
- [ ] Privacy impact assessment completed
- [ ] Data breach notification process
- [ ] DPO appointed (if required)

#### Cross-Border Transfers
- [ ] Standard Contractual Clauses (SCCs)
- [ ] Adequacy decision verified
- [ ] Data localization requirements met

SOC 2 Compliance

Trust Services Criteria

Common Criteria - SecurityControls:
# Access Control
- Multi-factor authentication required
- Role-based access control (RBAC)
- Principle of least privilege
- Regular access reviews

# Network Security
- Firewall rules configured
- Network segmentation
- TLS 1.2+ enforced
- DDoS protection enabled

# Encryption
- Data encrypted at rest (AES-256)
- Data encrypted in transit (TLS 1.3)
- Key management via KMS/Vault
- Certificate management automated

# Monitoring
- Security event logging
- Intrusion detection
- Vulnerability scanning
- Security alerts configured
Evidence Collection:
# Automated evidence collection
async def collect_security_evidence():
    """Collect SOC 2 security evidence"""

    evidence = {
        "access_control": {
            "mfa_enabled_users": await count_mfa_users(),
            "rbac_roles": await get_rbac_roles(),
            "last_access_review": await get_last_access_review_date()
        },
        "encryption": {
            "tls_version": "1.3",
            "cipher_suites": await get_cipher_suites(),
            "certificate_expiry": await get_cert_expiry_dates()
        },
        "monitoring": {
            "security_events_logged": await count_security_events(),
            "alerts_configured": await get_alert_count(),
            "incidents_this_month": await count_incidents()
        }
    }

    return evidence
System AvailabilityControls:
# High Availability
- Multi-zone deployment
- Auto-scaling configured
- Load balancing
- Health checks

# Backup & Recovery
- Automated daily backups
- Point-in-time recovery
- Disaster recovery plan
- RTO: 4 hours, RPO: 1 hour

# Monitoring
- Uptime monitoring
- Performance metrics
- Capacity planning
- SLA tracking (99.9% uptime)
SLA Monitoring:
from prometheus_client import Gauge

uptime_percentage = Gauge(
    'sla_uptime_percentage',
    'SLA uptime percentage'
)

async def calculate_sla():
    """Calculate SLA uptime"""

    # Get uptime data
    total_time = 30 * 24 * 60 * 60  # 30 days in seconds
    downtime = await get_total_downtime_seconds()

    uptime_pct = ((total_time - downtime) / total_time) * 100
    uptime_percentage.set(uptime_pct)

    # Alert if below SLA
    if uptime_pct < 99.9:
        await send_alert(
            severity="critical",
            message=f"SLA breach: {uptime_pct:.2f}% uptime"
        )

    return uptime_pct
Data ConfidentialityControls:
# Access Restrictions
- Need-to-know basis
- Data classification
- Confidentiality agreements
- Secure data disposal

# Data Protection
- Encryption at rest
- Encryption in transit
- Secure key storage
- Data masking in logs

# Monitoring
- Data access logging
- Anomaly detection
- DLP (Data Loss Prevention)
Data Classification:
from enum import Enum

class DataClassification(str, Enum):
    PUBLIC = "public"
    INTERNAL = "internal"
    CONFIDENTIAL = "confidential"
    RESTRICTED = "restricted"

class SecureData(BaseModel):
    classification: DataClassification
    data: str

    def log_access(self, user_id: str):
        """Log access to confidential data"""
        if self.classification in [
            DataClassification.CONFIDENTIAL,
            DataClassification.RESTRICTED
        ]:
            audit_log.log(
                user_id=user_id,
                action="confidential_data_access",
                classification=self.classification,
                timestamp=datetime.utcnow()
            )

    def mask_for_logs(self) -> str:
        """Mask confidential data in logs"""
        if self.classification == DataClassification.RESTRICTED:
            return "***REDACTED***"
        elif self.classification == DataClassification.CONFIDENTIAL:
            # Show first 4 characters
            return self.data[:4] + "****"
        return self.data
Data Processing IntegrityControls:
# Quality Assurance
- Input validation
- Output verification
- Error handling
- Transaction logging

# Change Management
- Code reviews required
- Testing before deployment
- Rollback procedures
- Change approval process

# Monitoring
- Processing error rates
- Data quality metrics
- Reconciliation processes
Input Validation:
from pydantic import BaseModel, validator, Field

class ChatRequest(BaseModel):
    query: str = Field(..., min_length=1, max_length=10000)
    conversation_id: Optional[str] = None

    @validator('query')
    def validate_query(cls, v):
        """Validate query content"""
        # Check for malicious content
        if any(keyword in v.lower() for keyword in ['`<script>`', 'javascript:']):
            raise ValueError("Invalid query content")

        # Log validation
        logger.info("Query validated", query_length=len(v))

        return v

    @validator('conversation_id')
    def validate_conversation_id(cls, v):
        """Validate conversation ID format"""
        if v and not v.startswith('conv_'):
            raise ValueError("Invalid conversation ID format")
        return v

# Transaction integrity
from sqlalchemy.orm import Session

async def create_conversation_with_integrity(
    user_id: str,
    initial_message: str,
    db: Session
):
    """Create conversation with transaction integrity"""

    try:
        # Begin transaction
        async with db.begin():
            # Create conversation
            conversation = Conversation(
                user_id=user_id,
                created_at=datetime.utcnow()
            )
            db.add(conversation)
            await db.flush()

            # Create message
            message = Message(
                conversation_id=conversation.id,
                content=initial_message,
                role="user",
                created_at=datetime.utcnow()
            )
            db.add(message)

            # Audit log
            await audit_log.log(
                user_id=user_id,
                action="conversation_created",
                conversation_id=conversation.id
            )

            # Commit transaction
            await db.commit()

        return conversation

    except Exception as e:
        # Rollback on error
        await db.rollback()
        logger.error(
            "Failed to create conversation",
            user_id=user_id,
            error=str(e)
        )
        raise
Privacy ProtectionControls:
# Privacy Practices
- Privacy policy published
- Consent management
- Data minimization
- Purpose limitation

# Data Protection
- PII encryption
- Data anonymization
- Secure deletion
- Privacy impact assessments

# User Rights
- Access to personal data
- Data portability
- Right to deletion
- Opt-out mechanisms
See GDPR section above for implementation details.

SOC 2 Checklist

### SOC 2 Type II Readiness

#### Organizational Controls
- [ ] Security policies documented
- [ ] Risk assessment completed
- [ ] Vendor management program
- [ ] Background checks for employees
- [ ] Security training program

#### Technical Controls
- [ ] Multi-factor authentication
- [ ] Encryption at rest and in transit
- [ ] Network segmentation
- [ ] Intrusion detection
- [ ] Vulnerability management
- [ ] Patch management
- [ ] Backup and recovery
- [ ] Incident response plan

#### Monitoring Controls
- [ ] Security event logging
- [ ] Log retention (1 year minimum)
- [ ] Security monitoring 24/7
- [ ] Performance monitoring
- [ ] Uptime monitoring
- [ ] Alert management

#### Evidence Collection
- [ ] Automated evidence gathering
- [ ] Quarterly access reviews
- [ ] Annual penetration testing
- [ ] Vulnerability scans
- [ ] Change management logs
- [ ] Incident reports

#### Audit Preparation
- [ ] Control documentation
- [ ] Evidence repository
- [ ] Auditor access provisioned
- [ ] Pre-audit assessment

HIPAA Compliance

Technical Safeguards

164.312(a)(1)Implementation:
# Unique user identification
class HIPAAUser(BaseModel):
    user_id: str  # Unique identifier
    username: str
    role: str
    last_login: datetime
    mfa_enabled: bool = True  # Required

# Emergency access procedure
async def emergency_access(
    user_id: str,
    reason: str,
    approver_id: str
):
    """Grant emergency access to PHI"""

    # Log emergency access
    await audit_log.log(
        user_id=user_id,
        action="emergency_access_granted",
        reason=reason,
        approver_id=approver_id,
        timestamp=datetime.utcnow(),
        access_level="PHI"
    )

    # Grant temporary elevated access
    await grant_temporary_access(
        user_id=user_id,
        duration_hours=4,
        access_level="PHI"
    )

    # Alert security team
    await send_alert(
        channel="security",
        message=f"Emergency PHI access granted to {user_id}",
        severity="warning"
    )

# Automatic logoff
@app.middleware("http")
async def session_timeout(request: Request, call_next):
    """Implement automatic logoff after inactivity"""

    session_timeout = 900  # 15 minutes

    session = await get_session(request)
    if session:
        last_activity = session.get("last_activity")
        if last_activity:
            inactive_seconds = (datetime.utcnow() - last_activity).total_seconds()

            if inactive_seconds > session_timeout:
                # Terminate session
                await invalidate_session(session.id)

                return JSONResponse(
                    status_code=401,
                    content={"detail": "Session expired due to inactivity"}
                )

        # Update last activity
        session["last_activity"] = datetime.utcnow()

    response = await call_next(request)
    return response
164.312(b)Implementation:
class PHIAuditLog(BaseModel):
    """HIPAA-compliant audit log"""
    timestamp: datetime
    user_id: str
    action: str
    phi_accessed: bool
    patient_id: Optional[str] = None
    ip_address: str
    success: bool
    failure_reason: Optional[str] = None

async def log_phi_access(
    user_id: str,
    action: str,
    patient_id: str,
    ip_address: str,
    success: bool = True
):
    """Log PHI access (required by HIPAA)"""

    log_entry = PHIAuditLog(
        timestamp=datetime.utcnow(),
        user_id=user_id,
        action=action,
        phi_accessed=True,
        patient_id=patient_id,
        ip_address=ip_address,
        success=success
    )

    # Store in tamper-proof audit log
    await store_audit_log(log_entry)

    # Also send to SIEM
    await send_to_siem(log_entry)

# Audit log review
async def review_phi_access_logs():
    """Regular review of PHI access logs"""

    # Get unusual access patterns
    unusual_access = await detect_unusual_access(
        lookback_days=7
    )

    if unusual_access:
        await send_alert(
            channel="compliance",
            message=f"Unusual PHI access detected: {len(unusual_access)} events",
            severity="warning",
            details=unusual_access
        )
164.312(a)(2)(iv) and 164.312(e)(2)(ii)Implementation:
# Encryption at rest (required)
from cryptography.hazmat.primitives.ciphers.aead import AESGCM

class PHIEncryption:
    """Encrypt PHI data"""

    def __init__(self, key: bytes):
        self.cipher = AESGCM(key)

    def encrypt(self, plaintext: str, associated_data: bytes = b"") -> bytes:
        """Encrypt PHI data"""
        nonce = os.urandom(12)
        ciphertext = self.cipher.encrypt(nonce, plaintext.encode(), associated_data)
        return nonce + ciphertext

    def decrypt(self, encrypted: bytes, associated_data: bytes = b"") -> str:
        """Decrypt PHI data"""
        nonce = encrypted[:12]
        ciphertext = encrypted[12:]
        plaintext = self.cipher.decrypt(nonce, ciphertext, associated_data)
        return plaintext.decode()

# TLS 1.2+ required for transmission
# (Configured in Kubernetes Ingress)

# Database encryption
# PostgreSQL: enable SSL
# REDIS_URL=rediss://...  (TLS enabled)
164.312(c)(1)Implementation:
import hashlib
import hmac

class DataIntegrity:
    """Ensure PHI data integrity"""

    def __init__(self, secret_key: bytes):
        self.key = secret_key

    def generate_checksum(self, data: str) -> str:
        """Generate HMAC checksum"""
        return hmac.new(
            self.key,
            data.encode(),
            hashlib.sha256
        ).hexdigest()

    def verify_checksum(self, data: str, checksum: str) -> bool:
        """Verify data integrity"""
        expected = self.generate_checksum(data)
        return hmac.compare_digest(expected, checksum)

# Usage
integrity = DataIntegrity(settings.integrity_key)

# Before storage
data = "patient medical record..."
checksum = integrity.generate_checksum(data)
await db.execute(
    "INSERT INTO phi_data (data, checksum) VALUES (:data, :checksum)",
    {"data": data, "checksum": checksum}
)

# On retrieval
row = await db.fetch_one("SELECT data, checksum FROM phi_data WHERE id = :id")
if not integrity.verify_checksum(row["data"], row["checksum"]):
    raise IntegrityError("Data integrity check failed")

HIPAA Checklist

### HIPAA Compliance Checklist

#### Administrative Safeguards
- [ ] Security officer designated
- [ ] Workforce security training
- [ ] Access authorization process
- [ ] Workforce clearance procedures
- [ ] Termination procedures
- [ ] Business Associate Agreements (BAAs)

#### Physical Safeguards
- [ ] Facility access controls
- [ ] Workstation security
- [ ] Device and media controls
- [ ] Disposal procedures

#### Technical Safeguards
- [ ] Unique user identification (Required)
- [ ] Emergency access procedure (Required)
- [ ] Automatic logoff (Addressable)
- [ ] Encryption at rest (Addressable, but recommended)
- [ ] Encryption in transit (Addressable, but recommended)
- [ ] Audit controls (Required)
- [ ] Integrity controls (Required)
- [ ] Authentication (Required)

#### Breach Notification
- [ ] Breach detection procedures
- [ ] Notification within 60 days
- [ ] Breach log maintained
- [ ] Risk assessment process

Continuous Compliance

Automated Compliance Monitoring

from apscheduler.schedulers.asyncio import AsyncIOScheduler

scheduler = AsyncIOScheduler()

## Daily compliance checks
@scheduler.scheduled_job('cron', hour=6)
async def daily_compliance_check():
    """Run daily compliance checks"""

    checks = {
        "encryption": await verify_encryption_status(),
        "access_control": await verify_access_controls(),
        "audit_logs": await verify_audit_logging(),
        "backups": await verify_backup_status(),
        "certificates": await check_certificate_expiry(),
        "vulnerabilities": await scan_vulnerabilities()
    }

    # Generate compliance report
    report = generate_compliance_report(checks)

    # Alert on failures
    failures = [k for k, v in checks.items() if not v["passed"]]
    if failures:
        await send_alert(
            channel="compliance",
            message=f"Compliance check failures: {', '.join(failures)}",
            severity="high",
            details=report
        )

## Weekly access review
@scheduler.scheduled_job('cron', day_of_week='mon', hour=9)
async def weekly_access_review():
    """Review user access weekly"""

    review = await generate_access_review_report()

    await send_notification(
        channel="compliance",
        message="Weekly access review ready",
        attachment=review
    )

## Monthly compliance report
@scheduler.scheduled_job('cron', day=1, hour=9)
async def monthly_compliance_report():
    """Generate monthly compliance report"""

    report = {
        "gdpr": await generate_gdpr_report(),
        "soc2": await generate_soc2_report(),
        "hipaa": await generate_hipaa_report() if settings.hipaa_enabled else None
    }

    await send_compliance_report(report)

Best Practices

  • Maintain compliance documentation
  • Document all controls
  • Keep evidence organized
  • Update policies regularly
  • Version control for policies
  • Annual security training
  • Role-specific training
  • Compliance awareness
  • Phishing simulations
  • Document training completion
  • Regular penetration testing
  • Vulnerability assessments
  • Disaster recovery drills
  • Incident response testing
  • Compliance audits
  • Regular risk assessments
  • Control effectiveness reviews
  • Incident lessons learned
  • Industry benchmark monitoring
  • Regulatory update tracking

Next Steps


Compliance Ready: GDPR, SOC 2, and HIPAA compliance implemented!