Overview
MCP Server with LangGraph provides technical controls and architecture to support compliance with data protection regulations and industry standards. This guide covers implementing controls for GDPR , SOC 2 , HIPAA , and other compliance frameworks.
Important Compliance Disclaimer: This codebase provides technical controls that support compliance requirements. However:
Compliance ≠ Technical Controls Alone : Achieving actual compliance requires organizational policies, procedures, legal review, employee training, and often third-party audits
No Certification Included : This software does not come with compliance certification or attestation
Legal Review Required : Always consult with legal counsel and compliance experts for your specific regulatory requirements
Shared Responsibility : You are responsible for configuring, deploying, and operating the system in a compliant manner
See Sources & References for links to official compliance documentation.
Compliance Frameworks
GDPR Compliance
GDPR Compliance Data Flow
The following diagram illustrates the complete GDPR compliance data flow, including data subject requests (DSR), data classification, consent management, retention policies, right to erasure, and audit logging.
Key GDPR Data Flow Components:
Data Subject Requests (DSR) - Turquoise: Entry points for user rights (access, rectification, erasure, portability)
Data Classification - Light Yellow: Automated PII identification and classification
Consent Management - Coral Red: User consent capture, storage, and audit trail
Data Retention - Purple: Automated cleanup based on retention policies (365 days default)
Right to Erasure - Gold: Complete data deletion workflow with anonymization
Audit Logging - Green: Comprehensive logging of all GDPR-related actions
Data Storage - Blue: Encrypted storage with backup capabilities
All flows integrate with the audit logging system to maintain compliance evidence and detect anomalies.
Data Protection Principles
Users can request their data. Implementation :from fastapi import APIRouter, Depends
from src.auth.middleware import get_current_user
router = APIRouter()
@router.get ( "/api/v1/users/me/data" )
async def export_user_data ( user = Depends(get_current_user)):
"""Export all user data (GDPR Article 15)"""
# Gather all user data
data = {
"profile" : await get_user_profile(user.id),
"conversations" : await get_user_conversations(user.id),
"sessions" : await get_user_sessions(user.id),
"preferences" : await get_user_preferences(user.id),
"audit_log" : await get_user_audit_log(user.id)
}
# Log the access request
await audit_log.log(
user_id = user.id,
action = "data_export" ,
timestamp = datetime.utcnow()
)
return data
Testing :# Test data export
curl -X GET https://api.yourdomain.com/api/v1/users/me/data \
-H "Authorization: Bearer $TOKEN "
Users can request data deletion. Implementation :@router.delete ( "/api/v1/users/me" )
async def delete_user_account (
user = Depends(get_current_user),
confirm : bool = Query( ... )
):
"""Delete user account and all data (GDPR Article 17)"""
if not confirm:
raise HTTPException(
status_code = 400 ,
detail = "Confirmation required for account deletion"
)
# Log deletion request
await audit_log.log(
user_id = user.id,
action = "account_deletion_requested" ,
timestamp = datetime.utcnow()
)
# Delete user data
await delete_user_conversations(user.id)
await delete_user_sessions(user.id)
await delete_user_preferences(user.id)
await anonymize_audit_logs(user.id) # Keep for compliance
# Delete OpenFGA tuples
await openfga_client.delete_tuples([
{ "user" : f "user: { user.id } " , "relation" : "*" , "object" : "*" }
])
# Delete user account
await delete_user(user.id)
# Final audit log (anonymized)
await audit_log.log(
user_id = "deleted" ,
action = "account_deleted" ,
original_user_id = user.id,
timestamp = datetime.utcnow()
)
return { "message" : "Account deleted successfully" }
Users can correct their data. Implementation :@router.patch ( "/api/v1/users/me" )
async def update_user_profile (
profile_update : UserProfileUpdate,
user = Depends(get_current_user)
):
"""Update user profile (GDPR Article 16)"""
# Validate update
validated_data = profile_update.dict( exclude_unset = True )
# Log the update
await audit_log.log(
user_id = user.id,
action = "profile_updated" ,
changes = validated_data,
timestamp = datetime.utcnow()
)
# Update profile
updated_user = await update_user(user.id, validated_data)
return updated_user
Export data in machine-readable format. Implementation :@router.get ( "/api/v1/users/me/export" )
async def export_user_data_portable (
format : str = Query( "json" , regex = "^(json|csv)$" ),
user = Depends(get_current_user)
):
"""Export data in portable format (GDPR Article 20)"""
data = await gather_user_data(user.id)
if format == "json" :
return JSONResponse( content = data)
elif format == "csv" :
csv_data = convert_to_csv(data)
return Response(
content = csv_data,
media_type = "text/csv" ,
headers = {
"Content-Disposition" : f "attachment; filename=user_data_ { user.id } .csv"
}
)
Track and manage user consent. Implementation :from pydantic import BaseModel
from enum import Enum
class ConsentType ( str , Enum ):
ANALYTICS = "analytics"
MARKETING = "marketing"
THIRD_PARTY = "third_party"
class ConsentRecord ( BaseModel ):
user_id: str
consent_type: ConsentType
granted: bool
timestamp: datetime
ip_address: str
user_agent: str
@router.post ( "/api/v1/users/me/consent" )
async def update_consent (
consent : ConsentRecord,
request : Request,
user = Depends(get_current_user)
):
"""Update user consent preferences"""
# Record consent with metadata
consent_record = ConsentRecord(
user_id = user.id,
consent_type = consent.consent_type,
granted = consent.granted,
timestamp = datetime.utcnow(),
ip_address = request.client.host,
user_agent = request.headers.get( "user-agent" )
)
await store_consent(consent_record)
# Log consent change
await audit_log.log(
user_id = user.id,
action = "consent_updated" ,
consent_type = consent.consent_type,
granted = consent.granted
)
return { "status" : "consent_recorded" }
@router.get ( "/api/v1/users/me/consent" )
async def get_consent_status ( user = Depends(get_current_user)):
"""Get current consent status"""
return await get_user_consents(user.id)
Collect only necessary data. Implementation :from pydantic import BaseModel, Field
class UserRegistration ( BaseModel ):
"""Minimal user data collection"""
email: EmailStr
# Password handled by Keycloak
# Optional fields
name: Optional[ str ] = None
class Config :
# Reject extra fields
extra = "forbid"
# Auto-delete old data
async def cleanup_old_data ():
"""Delete data older than retention period"""
retention_days = 365 # 1 year
cutoff_date = datetime.utcnow() - timedelta( days = retention_days)
# Delete old conversations
await db.execute(
"DELETE FROM conversations WHERE created_at < :cutoff AND archived = true" ,
{ "cutoff" : cutoff_date}
)
# Delete old sessions
await db.execute(
"DELETE FROM sessions WHERE last_accessed < :cutoff" ,
{ "cutoff" : cutoff_date}
)
logger.info( "Deleted data older than retention period" )
# Schedule cleanup
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
scheduler.add_job(cleanup_old_data, 'cron' , hour = 3 ) # Daily at 3 AM
Build privacy into the system. Implementation :# Pseudonymization
import hashlib
def pseudonymize ( user_id : str ) -> str :
"""Pseudonymize user ID for analytics"""
salt = settings.pseudonymization_salt
return hashlib.sha256( f " { user_id }{ salt } " .encode()).hexdigest()
# Anonymization for logs
class AuditLog ( BaseModel ):
user_id: str
action: str
timestamp: datetime
@ classmethod
def create_anonymized ( cls , user_id : str , action : str ):
"""Create anonymized audit log"""
return cls (
user_id = pseudonymize(user_id),
action = action,
timestamp = datetime.utcnow()
)
# Data encryption at rest
from cryptography.fernet import Fernet
class EncryptedField :
"""Encrypt sensitive fields"""
def __init__ ( self , key : bytes ):
self .cipher = Fernet(key)
def encrypt ( self , value : str ) -> bytes :
return self .cipher.encrypt(value.encode())
def decrypt ( self , encrypted : bytes ) -> str :
return self .cipher.decrypt(encrypted).decode()
# Usage
encryption_key = settings.encryption_key
encryptor = EncryptedField(encryption_key)
# Encrypt before storage
encrypted_data = encryptor.encrypt(sensitive_data)
await db.execute( "INSERT INTO secrets (data) VALUES (:data)" ,
{ "data" : encrypted_data})
GDPR Checklist
### GDPR Compliance Checklist
#### Lawful Basis
- [ ] Consent mechanism implemented
- [ ] Consent records stored with metadata
- [ ] Easy consent withdrawal
#### Data Subject Rights
- [ ] Right to access (data export)
- [ ] Right to rectification (profile updates)
- [ ] Right to erasure (account deletion)
- [ ] Right to data portability (JSON/CSV export)
- [ ] Right to object (opt-out mechanisms)
#### Data Protection
- [ ] Data minimization practiced
- [ ] Purpose limitation enforced
- [ ] Storage limitation (retention policy)
- [ ] Encryption at rest and in transit
- [ ] Pseudonymization where applicable
#### Accountability
- [ ] Privacy policy published
- [ ] Data Processing Agreement (DPA) available
- [ ] Privacy impact assessment completed
- [ ] Data breach notification process
- [ ] DPO appointed (if required)
#### Cross-Border Transfers
- [ ] Standard Contractual Clauses (SCCs)
- [ ] Adequacy decision verified
- [ ] Data localization requirements met
SOC 2 Compliance
Trust Services Criteria
Common Criteria - Security Controls :# Access Control
- Multi-factor authentication required
- Role-based access control (RBAC)
- Principle of least privilege
- Regular access reviews
# Network Security
- Firewall rules configured
- Network segmentation
- TLS 1.2+ enforced
- DDoS protection enabled
# Encryption
- Data encrypted at rest (AES-256)
- Data encrypted in transit (TLS 1.3)
- Key management via KMS/Vault
- Certificate management automated
# Monitoring
- Security event logging
- Intrusion detection
- Vulnerability scanning
- Security alerts configured
Evidence Collection :# Automated evidence collection
async def collect_security_evidence ():
"""Collect SOC 2 security evidence"""
evidence = {
"access_control" : {
"mfa_enabled_users" : await count_mfa_users(),
"rbac_roles" : await get_rbac_roles(),
"last_access_review" : await get_last_access_review_date()
},
"encryption" : {
"tls_version" : "1.3" ,
"cipher_suites" : await get_cipher_suites(),
"certificate_expiry" : await get_cert_expiry_dates()
},
"monitoring" : {
"security_events_logged" : await count_security_events(),
"alerts_configured" : await get_alert_count(),
"incidents_this_month" : await count_incidents()
}
}
return evidence
System Availability Controls :# High Availability
- Multi-zone deployment
- Auto-scaling configured
- Load balancing
- Health checks
# Backup & Recovery
- Automated daily backups
- Point-in-time recovery
- Disaster recovery plan
- RTO : 4 hours, RPO : 1 hour
# Monitoring
- Uptime monitoring
- Performance metrics
- Capacity planning
- SLA tracking (99.9% uptime)
SLA Monitoring :from prometheus_client import Gauge
uptime_percentage = Gauge(
'sla_uptime_percentage' ,
'SLA uptime percentage'
)
async def calculate_sla ():
"""Calculate SLA uptime"""
# Get uptime data
total_time = 30 * 24 * 60 * 60 # 30 days in seconds
downtime = await get_total_downtime_seconds()
uptime_pct = ((total_time - downtime) / total_time) * 100
uptime_percentage.set(uptime_pct)
# Alert if below SLA
if uptime_pct < 99.9 :
await send_alert(
severity = "critical" ,
message = f "SLA breach: { uptime_pct :.2f} % uptime"
)
return uptime_pct
Data Confidentiality Controls :# Access Restrictions
- Need-to-know basis
- Data classification
- Confidentiality agreements
- Secure data disposal
# Data Protection
- Encryption at rest
- Encryption in transit
- Secure key storage
- Data masking in logs
# Monitoring
- Data access logging
- Anomaly detection
- DLP (Data Loss Prevention)
Data Classification :from enum import Enum
class DataClassification ( str , Enum ):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
class SecureData ( BaseModel ):
classification: DataClassification
data: str
def log_access ( self , user_id : str ):
"""Log access to confidential data"""
if self .classification in [
DataClassification. CONFIDENTIAL ,
DataClassification. RESTRICTED
]:
audit_log.log(
user_id = user_id,
action = "confidential_data_access" ,
classification = self .classification,
timestamp = datetime.utcnow()
)
def mask_for_logs ( self ) -> str :
"""Mask confidential data in logs"""
if self .classification == DataClassification. RESTRICTED :
return "***REDACTED***"
elif self .classification == DataClassification. CONFIDENTIAL :
# Show first 4 characters
return self .data[: 4 ] + "****"
return self .data
Processing Integrity (PI)
Data Processing Integrity Controls :# Quality Assurance
- Input validation
- Output verification
- Error handling
- Transaction logging
# Change Management
- Code reviews required
- Testing before deployment
- Rollback procedures
- Change approval process
# Monitoring
- Processing error rates
- Data quality metrics
- Reconciliation processes
Input Validation :from pydantic import BaseModel, validator, Field
class ChatRequest ( BaseModel ):
query: str = Field( ... , min_length = 1 , max_length = 10000 )
conversation_id: Optional[ str ] = None
@validator ( 'query' )
def validate_query ( cls , v ):
"""Validate query content"""
# Check for malicious content
if any (keyword in v.lower() for keyword in [ '`<script>`' , 'javascript:' ]):
raise ValueError ( "Invalid query content" )
# Log validation
logger.info( "Query validated" , query_length = len (v))
return v
@validator ( 'conversation_id' )
def validate_conversation_id ( cls , v ):
"""Validate conversation ID format"""
if v and not v.startswith( 'conv_' ):
raise ValueError ( "Invalid conversation ID format" )
return v
# Transaction integrity
from sqlalchemy.orm import Session
async def create_conversation_with_integrity (
user_id : str ,
initial_message : str ,
db : Session
):
"""Create conversation with transaction integrity"""
try :
# Begin transaction
async with db.begin():
# Create conversation
conversation = Conversation(
user_id = user_id,
created_at = datetime.utcnow()
)
db.add(conversation)
await db.flush()
# Create message
message = Message(
conversation_id = conversation.id,
content = initial_message,
role = "user" ,
created_at = datetime.utcnow()
)
db.add(message)
# Audit log
await audit_log.log(
user_id = user_id,
action = "conversation_created" ,
conversation_id = conversation.id
)
# Commit transaction
await db.commit()
return conversation
except Exception as e:
# Rollback on error
await db.rollback()
logger.error(
"Failed to create conversation" ,
user_id = user_id,
error = str (e)
)
raise
Privacy Protection Controls :# Privacy Practices
- Privacy policy published
- Consent management
- Data minimization
- Purpose limitation
# Data Protection
- PII encryption
- Data anonymization
- Secure deletion
- Privacy impact assessments
# User Rights
- Access to personal data
- Data portability
- Right to deletion
- Opt-out mechanisms
See GDPR section above for implementation details.
SOC 2 Checklist
### SOC 2 Type II Readiness
#### Organizational Controls
- [ ] Security policies documented
- [ ] Risk assessment completed
- [ ] Vendor management program
- [ ] Background checks for employees
- [ ] Security training program
#### Technical Controls
- [ ] Multi-factor authentication
- [ ] Encryption at rest and in transit
- [ ] Network segmentation
- [ ] Intrusion detection
- [ ] Vulnerability management
- [ ] Patch management
- [ ] Backup and recovery
- [ ] Incident response plan
#### Monitoring Controls
- [ ] Security event logging
- [ ] Log retention (1 year minimum)
- [ ] Security monitoring 24/7
- [ ] Performance monitoring
- [ ] Uptime monitoring
- [ ] Alert management
#### Evidence Collection
- [ ] Automated evidence gathering
- [ ] Quarterly access reviews
- [ ] Annual penetration testing
- [ ] Vulnerability scans
- [ ] Change management logs
- [ ] Incident reports
#### Audit Preparation
- [ ] Control documentation
- [ ] Evidence repository
- [ ] Auditor access provisioned
- [ ] Pre-audit assessment
HIPAA Compliance
Technical Safeguards
164.312(a)(1) Implementation :# Unique user identification
class HIPAAUser ( BaseModel ):
user_id: str # Unique identifier
username: str
role: str
last_login: datetime
mfa_enabled: bool = True # Required
# Emergency access procedure
async def emergency_access (
user_id : str ,
reason : str ,
approver_id : str
):
"""Grant emergency access to PHI"""
# Log emergency access
await audit_log.log(
user_id = user_id,
action = "emergency_access_granted" ,
reason = reason,
approver_id = approver_id,
timestamp = datetime.utcnow(),
access_level = "PHI"
)
# Grant temporary elevated access
await grant_temporary_access(
user_id = user_id,
duration_hours = 4 ,
access_level = "PHI"
)
# Alert security team
await send_alert(
channel = "security" ,
message = f "Emergency PHI access granted to { user_id } " ,
severity = "warning"
)
# Automatic logoff
@app.middleware ( "http" )
async def session_timeout ( request : Request, call_next ):
"""Implement automatic logoff after inactivity"""
session_timeout = 900 # 15 minutes
session = await get_session(request)
if session:
last_activity = session.get( "last_activity" )
if last_activity:
inactive_seconds = (datetime.utcnow() - last_activity).total_seconds()
if inactive_seconds > session_timeout:
# Terminate session
await invalidate_session(session.id)
return JSONResponse(
status_code = 401 ,
content = { "detail" : "Session expired due to inactivity" }
)
# Update last activity
session[ "last_activity" ] = datetime.utcnow()
response = await call_next(request)
return response
164.312(b) Implementation :class PHIAuditLog ( BaseModel ):
"""HIPAA-compliant audit log"""
timestamp: datetime
user_id: str
action: str
phi_accessed: bool
patient_id: Optional[ str ] = None
ip_address: str
success: bool
failure_reason: Optional[ str ] = None
async def log_phi_access (
user_id : str ,
action : str ,
patient_id : str ,
ip_address : str ,
success : bool = True
):
"""Log PHI access (required by HIPAA)"""
log_entry = PHIAuditLog(
timestamp = datetime.utcnow(),
user_id = user_id,
action = action,
phi_accessed = True ,
patient_id = patient_id,
ip_address = ip_address,
success = success
)
# Store in tamper-proof audit log
await store_audit_log(log_entry)
# Also send to SIEM
await send_to_siem(log_entry)
# Audit log review
async def review_phi_access_logs ():
"""Regular review of PHI access logs"""
# Get unusual access patterns
unusual_access = await detect_unusual_access(
lookback_days = 7
)
if unusual_access:
await send_alert(
channel = "compliance" ,
message = f "Unusual PHI access detected: { len (unusual_access) } events" ,
severity = "warning" ,
details = unusual_access
)
164.312(a)(2)(iv) and 164.312(e)(2)(ii) Implementation :# Encryption at rest (required)
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
class PHIEncryption :
"""Encrypt PHI data"""
def __init__ ( self , key : bytes ):
self .cipher = AESGCM(key)
def encrypt ( self , plaintext : str , associated_data : bytes = b "" ) -> bytes :
"""Encrypt PHI data"""
nonce = os.urandom( 12 )
ciphertext = self .cipher.encrypt(nonce, plaintext.encode(), associated_data)
return nonce + ciphertext
def decrypt ( self , encrypted : bytes , associated_data : bytes = b "" ) -> str :
"""Decrypt PHI data"""
nonce = encrypted[: 12 ]
ciphertext = encrypted[ 12 :]
plaintext = self .cipher.decrypt(nonce, ciphertext, associated_data)
return plaintext.decode()
# TLS 1.2+ required for transmission
# (Configured in Kubernetes Ingress)
# Database encryption
# PostgreSQL: enable SSL
# REDIS_URL=rediss://... (TLS enabled)
164.312(c)(1) Implementation :import hashlib
import hmac
class DataIntegrity :
"""Ensure PHI data integrity"""
def __init__ ( self , secret_key : bytes ):
self .key = secret_key
def generate_checksum ( self , data : str ) -> str :
"""Generate HMAC checksum"""
return hmac.new(
self .key,
data.encode(),
hashlib.sha256
).hexdigest()
def verify_checksum ( self , data : str , checksum : str ) -> bool :
"""Verify data integrity"""
expected = self .generate_checksum(data)
return hmac.compare_digest(expected, checksum)
# Usage
integrity = DataIntegrity(settings.integrity_key)
# Before storage
data = "patient medical record..."
checksum = integrity.generate_checksum(data)
await db.execute(
"INSERT INTO phi_data (data, checksum) VALUES (:data, :checksum)" ,
{ "data" : data, "checksum" : checksum}
)
# On retrieval
row = await db.fetch_one( "SELECT data, checksum FROM phi_data WHERE id = :id" )
if not integrity.verify_checksum(row[ "data" ], row[ "checksum" ]):
raise IntegrityError( "Data integrity check failed" )
HIPAA Checklist
### HIPAA Compliance Checklist
#### Administrative Safeguards
- [ ] Security officer designated
- [ ] Workforce security training
- [ ] Access authorization process
- [ ] Workforce clearance procedures
- [ ] Termination procedures
- [ ] Business Associate Agreements (BAAs)
#### Physical Safeguards
- [ ] Facility access controls
- [ ] Workstation security
- [ ] Device and media controls
- [ ] Disposal procedures
#### Technical Safeguards
- [ ] Unique user identification (Required)
- [ ] Emergency access procedure (Required)
- [ ] Automatic logoff (Addressable)
- [ ] Encryption at rest (Addressable, but recommended)
- [ ] Encryption in transit (Addressable, but recommended)
- [ ] Audit controls (Required)
- [ ] Integrity controls (Required)
- [ ] Authentication (Required)
#### Breach Notification
- [ ] Breach detection procedures
- [ ] Notification within 60 days
- [ ] Breach log maintained
- [ ] Risk assessment process
Continuous Compliance
Automated Compliance Monitoring
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
## Daily compliance checks
@scheduler.scheduled_job ( 'cron' , hour = 6 )
async def daily_compliance_check ():
"""Run daily compliance checks"""
checks = {
"encryption" : await verify_encryption_status(),
"access_control" : await verify_access_controls(),
"audit_logs" : await verify_audit_logging(),
"backups" : await verify_backup_status(),
"certificates" : await check_certificate_expiry(),
"vulnerabilities" : await scan_vulnerabilities()
}
# Generate compliance report
report = generate_compliance_report(checks)
# Alert on failures
failures = [k for k, v in checks.items() if not v[ "passed" ]]
if failures:
await send_alert(
channel = "compliance" ,
message = f "Compliance check failures: { ', ' .join(failures) } " ,
severity = "high" ,
details = report
)
## Weekly access review
@scheduler.scheduled_job ( 'cron' , day_of_week = 'mon' , hour = 9 )
async def weekly_access_review ():
"""Review user access weekly"""
review = await generate_access_review_report()
await send_notification(
channel = "compliance" ,
message = "Weekly access review ready" ,
attachment = review
)
## Monthly compliance report
@scheduler.scheduled_job ( 'cron' , day = 1 , hour = 9 )
async def monthly_compliance_report ():
"""Generate monthly compliance report"""
report = {
"gdpr" : await generate_gdpr_report(),
"soc2" : await generate_soc2_report(),
"hipaa" : await generate_hipaa_report() if settings.hipaa_enabled else None
}
await send_compliance_report(report)
Best Practices
Maintain compliance documentation
Document all controls
Keep evidence organized
Update policies regularly
Version control for policies
Annual security training
Role-specific training
Compliance awareness
Phishing simulations
Document training completion
Regular penetration testing
Vulnerability assessments
Disaster recovery drills
Incident response testing
Compliance audits
Regular risk assessments
Control effectiveness reviews
Incident lessons learned
Industry benchmark monitoring
Regulatory update tracking
Next Steps
Compliance Ready : GDPR, SOC 2, and HIPAA compliance implemented!