SOC 2 Trust Service Criteria
SOC 2 is based on five Trust Service Criteria (TSC):| Criterion | Focus | Required for |
|---|---|---|
| Security (CC) | Protection against unauthorized access | All SOC 2 audits |
| Availability (A) | System uptime and accessibility | Type II audits |
| Processing Integrity (PI) | Complete, valid, accurate, timely processing | Optional |
| Confidentiality (C) | Protection of confidential information | Optional |
| Privacy (P) | Collection, use, retention, disclosure of personal information | Optional |
Common Criteria (CC) = Security controls (always required)
Additional Criteria: Select based on your service commitments (A, PI, C, P)
Security (Common Criteria)
CC1: Control Environment
Requirement: Organization demonstrates commitment to integrity and ethical values, oversight, organizational structure, and accountability.Implementation
- Governance Structure
- Code of Conduct
Copy
Ask AI
# Information Security Program
## 1. Security Organization
**Chief Information Security Officer (CISO)**: [Name]
- Reports to: CEO/CTO
- Responsibilities: Overall security strategy and compliance
**Security Team Structure**:
- Security Engineering: [Team Size]
- Security Operations (SOC): [Team Size]
- Compliance: [Team Size]
## 2. Policies and Standards
**Information Security Policy** (Last Updated: YYYY-MM-DD)
- Acceptable Use Policy
- Access Control Policy
- Incident Response Policy
- Change Management Policy
- Backup and Recovery Policy
**Review Frequency**: Annually or when significant changes occur
## 3. Risk Management Process
**Risk Assessment Frequency**: Quarterly
**Risk Register**: Maintained in [Tool/Location]
**Risk Treatment**: Accept, Mitigate, Transfer, Avoid
## 4. Security Awareness Training
**Frequency**: Onboarding + Annual refresher
**Topics Covered**:
- Information security basics
- Phishing and social engineering
- Data classification and handling
- Incident reporting
- Acceptable use of systems
**Tracking**: Training completion tracked in [System]
CC2: Communication and Information
Requirement: Organization communicates necessary information to support the functioning of internal controls.Copy
Ask AI
# config/communication_channels.yaml
communication:
security_updates:
channel: "slack_security"
frequency: "as_needed"
audience: "all_employees"
policy_updates:
channel: "email"
frequency: "when_updated"
audience: "all_employees"
acknowledgment_required: true
incident_alerts:
channel: "pagerduty"
escalation: "immediate"
audience: "security_team"
compliance_reporting:
channel: "executive_dashboard"
frequency: "monthly"
audience: "executives"
CC3: Risk Assessment
Requirement: Organization identifies and assesses risks.Copy
Ask AI
# src/compliance/risk_assessment.py
from enum import Enum
from pydantic import BaseModel
class RiskLikelihood(str, Enum):
RARE = "rare" # < 5%
UNLIKELY = "unlikely" # 5-20%
POSSIBLE = "possible" # 20-50%
LIKELY = "likely" # 50-80%
ALMOST_CERTAIN = "almost_certain" # > 80%
class RiskImpact(str, Enum):
NEGLIGIBLE = "negligible"
MINOR = "minor"
MODERATE = "moderate"
MAJOR = "major"
CATASTROPHIC = "catastrophic"
class RiskAssessment(BaseModel):
"""SOC 2 risk assessment framework."""
risk_id: str
category: str # e.g., "data_breach", "system_outage"
description: str
likelihood: RiskLikelihood
impact: RiskImpact
inherent_risk_score: int # 1-25
controls: List[str] # Mitigating controls
residual_risk_score: int # After controls
risk_owner: str
review_date: datetime
status: str # "open", "mitigated", "accepted"
# Risk matrix (Likelihood × Impact)
RISK_MATRIX = {
("rare", "negligible"): 1,
("rare", "minor"): 2,
# ... (25 combinations)
("almost_certain", "catastrophic"): 25
}
CC4: Monitoring Activities
Requirement: Organization monitors the system and implements corrective actions.Copy
Ask AI
# prometheus/alerts/soc2_monitoring.yaml
groups:
- name: soc2_security_monitoring
interval: 60s
rules:
# Unauthorized access detection
- alert: UnauthorizedAccessAttempt
expr: rate(authentication_failures_total[5m]) > 10
for: 2m
labels:
severity: warning
soc2_control: "CC6.1"
annotations:
summary: "High rate of authentication failures"
# System availability
- alert: ServiceDowntime
expr: up{job="mcp-server"} == 0
for: 1m
labels:
severity: critical
soc2_control: "A1.2"
annotations:
summary: "MCP Server is down"
# Data integrity
- alert: DataIntegrityFailure
expr: rate(integrity_check_failures_total[5m]) > 0
for: 1m
labels:
severity: critical
soc2_control: "PI1.3"
# Change management
- alert: UnauthorizedConfigChange
expr: rate(unauthorized_config_changes_total[5m]) > 0
for: 1m
labels:
severity: critical
soc2_control: "CC8.1"
CC5: Control Activities
Requirement: Organization implements control activities through policies and procedures.Access Control Matrix
| Role | Read Data | Write Data | Deploy Code | Admin Access |
|---|---|---|---|---|
| Developer | ✅ | ✅ (dev/staging) | ❌ | ❌ |
| DevOps | ✅ | ✅ (all environments) | ✅ (via CI/CD) | ✅ (production) |
| Security | ✅ | ❌ | ❌ | ✅ (audit only) |
| Support | ✅ (customer data) | ❌ | ❌ | ❌ |
| Executive | ✅ (reports) | ❌ | ❌ | ❌ |
Separation of Duties
Copy
Ask AI
# src/compliance/separation_of_duties.py
class SeparationOfDuties:
"""Enforce SOC 2 separation of duties (SOD)."""
# Conflicting roles (cannot be assigned to same person)
CONFLICTING_ROLES = [
("code_author", "code_approver"), # Can't approve own code
("deployer", "security_reviewer"), # Can't deploy without review
("admin", "auditor") # Auditors can't have admin access
]
async def check_role_conflict(self, user_id: str, new_role: str) -> bool:
"""Verify role assignment doesn't violate SOD."""
current_roles = await self.get_user_roles(user_id)
for role1, role2 in self.CONFLICTING_ROLES:
if new_role == role1 and role2 in current_roles:
return False # Conflict detected
if new_role == role2 and role1 in current_roles:
return False # Conflict detected
return True # No conflict
async def assign_role(self, user_id: str, role: str, approved_by: str):
"""Assign role with SOD check and audit trail."""
# Check for conflicts
if not await self.check_role_conflict(user_id, role):
raise ValueError(f"Role conflict: Cannot assign {role} to user with existing conflicting role")
# Require approval for privileged roles
if role in ["admin", "deployer", "security_reviewer"]:
if not await self.verify_approval(user_id, role, approved_by):
raise ValueError("Role assignment requires approval")
# Assign role
await self.db.role_assignments.insert_one({
"user_id": user_id,
"role": role,
"assigned_at": datetime.utcnow(),
"approved_by": approved_by,
"expires_at": datetime.utcnow() + timedelta(days=90) # Periodic review
})
await self.audit_log.info(
event="role_assigned",
user_id=user_id,
role=role,
approved_by=approved_by
)
CC6: Logical and Physical Access Controls
Requirement: Organization implements logical and physical access controls.Multi-Factor Authentication (MFA)
Copy
Ask AI
# src/auth/mfa.py
from pyotp import TOTP
from typing import Optional
class MFAEnforcement:
"""SOC 2-compliant MFA enforcement."""
# MFA required for all privileged access
MFA_REQUIRED_ROLES = [
"admin",
"developer",
"devops",
"security"
]
async def enforce_mfa(self, user_id: str) -> bool:
"""Verify MFA is enabled and validated."""
user = await self.db.users.find_one({"id": user_id})
# Check if user role requires MFA
requires_mfa = any(
role in self.MFA_REQUIRED_ROLES
for role in user.get("roles", [])
)
if requires_mfa:
# Verify MFA is enabled
if not user.get("mfa_enabled"):
raise ValueError("MFA required for this role")
# Verify MFA token was validated in this session
session = await self.get_current_session(user_id)
if not session.get("mfa_verified"):
raise ValueError("MFA verification required")
return True
async def verify_mfa_token(self, user_id: str, token: str) -> bool:
"""Verify TOTP token."""
user = await self.db.users.find_one({"id": user_id})
secret = user["mfa_secret"]
totp = TOTP(secret)
valid = totp.verify(token, valid_window=1) # Allow 30-second window
if valid:
# Mark session as MFA-verified
await self.update_session(user_id, {"mfa_verified": True})
await self.audit_log.info(
event="mfa_verified",
user_id=user_id
)
return valid
CC7: System Operations
Requirement: Organization manages system operations.Change Management Process
Copy
Ask AI
# src/compliance/change_management.py
class ChangeManagement:
"""SOC 2 change management process."""
async def submit_change_request(
self,
title: str,
description: str,
category: str, # "standard", "normal", "emergency"
affected_systems: List[str],
implementation_plan: str,
rollback_plan: str,
submitted_by: str
) -> str:
"""Submit change request with required documentation."""
change = {
"id": generate_id(),
"title": title,
"description": description,
"category": category,
"affected_systems": affected_systems,
"implementation_plan": implementation_plan,
"rollback_plan": rollback_plan,
"submitted_by": submitted_by,
"submitted_at": datetime.utcnow(),
"status": "pending_approval",
"approvals": []
}
# Determine required approvals based on category
required_approvals = self.get_required_approvals(category)
change["required_approvals"] = required_approvals
await self.db.change_requests.insert_one(change)
# Notify approvers
await self.notify_approvers(change["id"], required_approvals)
return change["id"]
def get_required_approvals(self, category: str) -> List[str]:
"""Determine required approvals based on change category."""
if category == "standard":
return ["technical_lead"] # Pre-approved standard changes
elif category == "normal":
return ["technical_lead", "security_reviewer"]
elif category == "emergency":
return ["on_call_manager"] # Post-implementation review required
return []
async def approve_change(
self,
change_id: str,
approved_by: str,
comments: Optional[str] = None
):
"""Approve change request."""
change = await self.db.change_requests.find_one({"id": change_id})
# Verify approver has authority
if approved_by not in change["required_approvals"]:
raise ValueError("Approver not authorized")
# Record approval
await self.db.change_requests.update_one(
{"id": change_id},
{
"$push": {
"approvals": {
"approved_by": approved_by,
"approved_at": datetime.utcnow(),
"comments": comments
}
}
}
)
# Check if all approvals received
change = await self.db.change_requests.find_one({"id": change_id})
if len(change["approvals"]) >= len(change["required_approvals"]):
await self.db.change_requests.update_one(
{"id": change_id},
{"$set": {"status": "approved"}}
)
await self.audit_log.info(
event="change_approved",
change_id=change_id,
approved_by=approved_by
)
CC8: Change Management
Requirement: Organization identifies and manages changes to the system.CI/CD Pipeline with Controls
Copy
Ask AI
# .github/workflows/soc2_compliant_deployment.yml
name: SOC 2 Compliant Deployment
on:
push:
branches: [main]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
# 1. Code Review Enforcement (CC8.1)
- name: Verify Pull Request Approval
run: |
# Require at least 2 approvals for production changes
gh pr view ${{ github.event.pull_request.number }} --json reviews
# Exit if not approved by 2+ reviewers
# 2. Security Scanning (CC6.6)
- name: Security Scan
run: |
trivy image ${{ env.IMAGE_NAME }}
snyk test
# 3. Automated Testing (PI1.1)
- name: Run Test Suite
run: |
pytest tests/ --cov=src --cov-report=xml
# Require 80%+ coverage
# 4. Change Documentation (CC8.1)
- name: Verify Change Request
run: |
# Verify approved change request exists
python scripts/verify_change_request.py
# 5. Deployment with Approval (CC8.1)
- name: Deploy to Production
run: |
kubectl apply -f deployments/production/
env:
KUBECONFIG: ${{ secrets.PROD_KUBECONFIG }}
# 6. Post-Deployment Verification (A1.2)
- name: Health Check
run: |
curl https://api.example.com/health
# Verify deployment successful
# 7. Audit Trail (CC4.1)
- name: Record Deployment
run: |
python scripts/record_deployment.py \
--change-id ${{ env.CHANGE_ID }} \
--deployed-by ${{ github.actor }} \
--timestamp $(date -u +"%Y-%m-%dT%H:%M:%SZ")
Availability (A)
A1.2: System Availability Monitoring
Requirement: Organization monitors system availability and takes corrective action.Copy
Ask AI
# SLA Definitions
sla:
uptime_target: "99.9%" # 43.8 minutes/month downtime
response_time_p95: "2000ms" # 95th percentile
response_time_p99: "5000ms" # 99th percentile
# Incident response SLAs
incident_response:
critical: "15_minutes" # Page on-call immediately
high: "1_hour"
medium: "4_hours"
low: "24_hours"
# Monitoring configuration
monitoring:
health_checks:
interval: "30s"
timeout: "10s"
endpoints:
- /health
- /ready
synthetic_monitoring:
frequency: "5_minutes"
locations:
- "us-east"
- "eu-west"
- "asia-southeast"
uptime_tracking:
provider: "StatusPage.io"
public_status_page: "https://status.example.com"
Processing Integrity (PI)
PI1.1: Data Processing Controls
Requirement: Processing is complete, valid, accurate, timely, and authorized.Copy
Ask AI
# src/processing/integrity.py
class ProcessingIntegrity:
"""SOC 2 processing integrity controls."""
async def process_with_validation(
self,
data: dict,
processing_function: callable
) -> dict:
"""Process data with integrity checks."""
# 1. Input Validation (Completeness)
validated_input = self.validate_input(data)
# 2. Processing Authorization
if not await self.is_authorized(validated_input):
raise ValueError("Processing not authorized")
# 3. Execute Processing
start_time = datetime.utcnow()
result = await processing_function(validated_input)
end_time = datetime.utcnow()
# 4. Output Validation (Accuracy)
validated_output = self.validate_output(result)
# 5. Timeliness Check
processing_time = (end_time - start_time).total_seconds()
if processing_time > self.MAX_PROCESSING_TIME:
await self.alert_slow_processing(processing_time)
# 6. Integrity Verification
integrity_hash = self.compute_integrity_hash(validated_output)
# 7. Audit Trail
await self.audit_log.info(
event="processing_completed",
input_hash=self.compute_integrity_hash(validated_input),
output_hash=integrity_hash,
processing_time_seconds=processing_time,
status="success"
)
return {
**validated_output,
"_integrity_hash": integrity_hash,
"_processed_at": end_time.isoformat()
}