Skip to main content

SOC 2 Trust Service Criteria

SOC 2 is based on five Trust Service Criteria (TSC):
CriterionFocusRequired for
Security (CC)Protection against unauthorized accessAll SOC 2 audits
Availability (A)System uptime and accessibilityType II audits
Processing Integrity (PI)Complete, valid, accurate, timely processingOptional
Confidentiality (C)Protection of confidential informationOptional
Privacy (P)Collection, use, retention, disclosure of personal informationOptional
Common Criteria (CC) = Security controls (always required) Additional Criteria: Select based on your service commitments (A, PI, C, P)

Security (Common Criteria)

CC1: Control Environment

Requirement: Organization demonstrates commitment to integrity and ethical values, oversight, organizational structure, and accountability.

Implementation

  • Governance Structure
  • Code of Conduct
# Information Security Program

## 1. Security Organization

**Chief Information Security Officer (CISO)**: [Name]
- Reports to: CEO/CTO
- Responsibilities: Overall security strategy and compliance

**Security Team Structure**:
- Security Engineering: [Team Size]
- Security Operations (SOC): [Team Size]
- Compliance: [Team Size]

## 2. Policies and Standards

**Information Security Policy** (Last Updated: YYYY-MM-DD)
- Acceptable Use Policy
- Access Control Policy
- Incident Response Policy
- Change Management Policy
- Backup and Recovery Policy

**Review Frequency**: Annually or when significant changes occur

## 3. Risk Management Process

**Risk Assessment Frequency**: Quarterly
**Risk Register**: Maintained in [Tool/Location]
**Risk Treatment**: Accept, Mitigate, Transfer, Avoid

## 4. Security Awareness Training

**Frequency**: Onboarding + Annual refresher
**Topics Covered**:
- Information security basics
- Phishing and social engineering
- Data classification and handling
- Incident reporting
- Acceptable use of systems

**Tracking**: Training completion tracked in [System]

CC2: Communication and Information

Requirement: Organization communicates necessary information to support the functioning of internal controls.
# config/communication_channels.yaml
communication:
  security_updates:
    channel: "slack_security"
    frequency: "as_needed"
    audience: "all_employees"

  policy_updates:
    channel: "email"
    frequency: "when_updated"
    audience: "all_employees"
    acknowledgment_required: true

  incident_alerts:
    channel: "pagerduty"
    escalation: "immediate"
    audience: "security_team"

  compliance_reporting:
    channel: "executive_dashboard"
    frequency: "monthly"
    audience: "executives"

CC3: Risk Assessment

Requirement: Organization identifies and assesses risks.
# src/compliance/risk_assessment.py
from enum import Enum
from pydantic import BaseModel

class RiskLikelihood(str, Enum):
    RARE = "rare"  # < 5%
    UNLIKELY = "unlikely"  # 5-20%
    POSSIBLE = "possible"  # 20-50%
    LIKELY = "likely"  # 50-80%
    ALMOST_CERTAIN = "almost_certain"  # > 80%

class RiskImpact(str, Enum):
    NEGLIGIBLE = "negligible"
    MINOR = "minor"
    MODERATE = "moderate"
    MAJOR = "major"
    CATASTROPHIC = "catastrophic"

class RiskAssessment(BaseModel):
    """SOC 2 risk assessment framework."""
    risk_id: str
    category: str  # e.g., "data_breach", "system_outage"
    description: str
    likelihood: RiskLikelihood
    impact: RiskImpact
    inherent_risk_score: int  # 1-25
    controls: List[str]  # Mitigating controls
    residual_risk_score: int  # After controls
    risk_owner: str
    review_date: datetime
    status: str  # "open", "mitigated", "accepted"

# Risk matrix (Likelihood × Impact)
RISK_MATRIX = {
    ("rare", "negligible"): 1,
    ("rare", "minor"): 2,
    # ... (25 combinations)
    ("almost_certain", "catastrophic"): 25
}

CC4: Monitoring Activities

Requirement: Organization monitors the system and implements corrective actions.
# prometheus/alerts/soc2_monitoring.yaml
groups:
  - name: soc2_security_monitoring
    interval: 60s
    rules:
      # Unauthorized access detection
      - alert: UnauthorizedAccessAttempt
        expr: rate(authentication_failures_total[5m]) > 10
        for: 2m
        labels:
          severity: warning
          soc2_control: "CC6.1"
        annotations:
          summary: "High rate of authentication failures"

      # System availability
      - alert: ServiceDowntime
        expr: up{job="mcp-server"} == 0
        for: 1m
        labels:
          severity: critical
          soc2_control: "A1.2"
        annotations:
          summary: "MCP Server is down"

      # Data integrity
      - alert: DataIntegrityFailure
        expr: rate(integrity_check_failures_total[5m]) > 0
        for: 1m
        labels:
          severity: critical
          soc2_control: "PI1.3"

      # Change management
      - alert: UnauthorizedConfigChange
        expr: rate(unauthorized_config_changes_total[5m]) > 0
        for: 1m
        labels:
          severity: critical
          soc2_control: "CC8.1"

CC5: Control Activities

Requirement: Organization implements control activities through policies and procedures.

Access Control Matrix

RoleRead DataWrite DataDeploy CodeAdmin Access
Developer✅ (dev/staging)
DevOps✅ (all environments)✅ (via CI/CD)✅ (production)
Security✅ (audit only)
Support✅ (customer data)
Executive✅ (reports)

Separation of Duties

# src/compliance/separation_of_duties.py

class SeparationOfDuties:
    """Enforce SOC 2 separation of duties (SOD)."""

    # Conflicting roles (cannot be assigned to same person)
    CONFLICTING_ROLES = [
        ("code_author", "code_approver"),  # Can't approve own code
        ("deployer", "security_reviewer"),  # Can't deploy without review
        ("admin", "auditor")  # Auditors can't have admin access
    ]

    async def check_role_conflict(self, user_id: str, new_role: str) -> bool:
        """Verify role assignment doesn't violate SOD."""
        current_roles = await self.get_user_roles(user_id)

        for role1, role2 in self.CONFLICTING_ROLES:
            if new_role == role1 and role2 in current_roles:
                return False  # Conflict detected
            if new_role == role2 and role1 in current_roles:
                return False  # Conflict detected

        return True  # No conflict

    async def assign_role(self, user_id: str, role: str, approved_by: str):
        """Assign role with SOD check and audit trail."""
        # Check for conflicts
        if not await self.check_role_conflict(user_id, role):
            raise ValueError(f"Role conflict: Cannot assign {role} to user with existing conflicting role")

        # Require approval for privileged roles
        if role in ["admin", "deployer", "security_reviewer"]:
            if not await self.verify_approval(user_id, role, approved_by):
                raise ValueError("Role assignment requires approval")

        # Assign role
        await self.db.role_assignments.insert_one({
            "user_id": user_id,
            "role": role,
            "assigned_at": datetime.utcnow(),
            "approved_by": approved_by,
            "expires_at": datetime.utcnow() + timedelta(days=90)  # Periodic review
        })

        await self.audit_log.info(
            event="role_assigned",
            user_id=user_id,
            role=role,
            approved_by=approved_by
        )

CC6: Logical and Physical Access Controls

Requirement: Organization implements logical and physical access controls.

Multi-Factor Authentication (MFA)

# src/auth/mfa.py
from pyotp import TOTP
from typing import Optional

class MFAEnforcement:
    """SOC 2-compliant MFA enforcement."""

    # MFA required for all privileged access
    MFA_REQUIRED_ROLES = [
        "admin",
        "developer",
        "devops",
        "security"
    ]

    async def enforce_mfa(self, user_id: str) -> bool:
        """Verify MFA is enabled and validated."""
        user = await self.db.users.find_one({"id": user_id})

        # Check if user role requires MFA
        requires_mfa = any(
            role in self.MFA_REQUIRED_ROLES
            for role in user.get("roles", [])
        )

        if requires_mfa:
            # Verify MFA is enabled
            if not user.get("mfa_enabled"):
                raise ValueError("MFA required for this role")

            # Verify MFA token was validated in this session
            session = await self.get_current_session(user_id)
            if not session.get("mfa_verified"):
                raise ValueError("MFA verification required")

        return True

    async def verify_mfa_token(self, user_id: str, token: str) -> bool:
        """Verify TOTP token."""
        user = await self.db.users.find_one({"id": user_id})
        secret = user["mfa_secret"]

        totp = TOTP(secret)
        valid = totp.verify(token, valid_window=1)  # Allow 30-second window

        if valid:
            # Mark session as MFA-verified
            await self.update_session(user_id, {"mfa_verified": True})

            await self.audit_log.info(
                event="mfa_verified",
                user_id=user_id
            )

        return valid

CC7: System Operations

Requirement: Organization manages system operations.

Change Management Process

# src/compliance/change_management.py

class ChangeManagement:
    """SOC 2 change management process."""

    async def submit_change_request(
        self,
        title: str,
        description: str,
        category: str,  # "standard", "normal", "emergency"
        affected_systems: List[str],
        implementation_plan: str,
        rollback_plan: str,
        submitted_by: str
    ) -> str:
        """Submit change request with required documentation."""

        change = {
            "id": generate_id(),
            "title": title,
            "description": description,
            "category": category,
            "affected_systems": affected_systems,
            "implementation_plan": implementation_plan,
            "rollback_plan": rollback_plan,
            "submitted_by": submitted_by,
            "submitted_at": datetime.utcnow(),
            "status": "pending_approval",
            "approvals": []
        }

        # Determine required approvals based on category
        required_approvals = self.get_required_approvals(category)
        change["required_approvals"] = required_approvals

        await self.db.change_requests.insert_one(change)

        # Notify approvers
        await self.notify_approvers(change["id"], required_approvals)

        return change["id"]

    def get_required_approvals(self, category: str) -> List[str]:
        """Determine required approvals based on change category."""
        if category == "standard":
            return ["technical_lead"]  # Pre-approved standard changes
        elif category == "normal":
            return ["technical_lead", "security_reviewer"]
        elif category == "emergency":
            return ["on_call_manager"]  # Post-implementation review required
        return []

    async def approve_change(
        self,
        change_id: str,
        approved_by: str,
        comments: Optional[str] = None
    ):
        """Approve change request."""
        change = await self.db.change_requests.find_one({"id": change_id})

        # Verify approver has authority
        if approved_by not in change["required_approvals"]:
            raise ValueError("Approver not authorized")

        # Record approval
        await self.db.change_requests.update_one(
            {"id": change_id},
            {
                "$push": {
                    "approvals": {
                        "approved_by": approved_by,
                        "approved_at": datetime.utcnow(),
                        "comments": comments
                    }
                }
            }
        )

        # Check if all approvals received
        change = await self.db.change_requests.find_one({"id": change_id})
        if len(change["approvals"]) >= len(change["required_approvals"]):
            await self.db.change_requests.update_one(
                {"id": change_id},
                {"$set": {"status": "approved"}}
            )

        await self.audit_log.info(
            event="change_approved",
            change_id=change_id,
            approved_by=approved_by
        )

CC8: Change Management

Requirement: Organization identifies and manages changes to the system.

CI/CD Pipeline with Controls

# .github/workflows/soc2_compliant_deployment.yml
name: SOC 2 Compliant Deployment

on:
  push:
    branches: [main]

jobs:
  deploy:
    runs-on: ubuntu-latest

    steps:
      # 1. Code Review Enforcement (CC8.1)
      - name: Verify Pull Request Approval
        run: |
          # Require at least 2 approvals for production changes
          gh pr view ${{ github.event.pull_request.number }} --json reviews
          # Exit if not approved by 2+ reviewers

      # 2. Security Scanning (CC6.6)
      - name: Security Scan
        run: |
          trivy image ${{ env.IMAGE_NAME }}
          snyk test

      # 3. Automated Testing (PI1.1)
      - name: Run Test Suite
        run: |
          pytest tests/ --cov=src --cov-report=xml
          # Require 80%+ coverage

      # 4. Change Documentation (CC8.1)
      - name: Verify Change Request
        run: |
          # Verify approved change request exists
          python scripts/verify_change_request.py

      # 5. Deployment with Approval (CC8.1)
      - name: Deploy to Production
        run: |
          kubectl apply -f deployments/production/
        env:
          KUBECONFIG: ${{ secrets.PROD_KUBECONFIG }}

      # 6. Post-Deployment Verification (A1.2)
      - name: Health Check
        run: |
          curl https://api.example.com/health
          # Verify deployment successful

      # 7. Audit Trail (CC4.1)
      - name: Record Deployment
        run: |
          python scripts/record_deployment.py \
            --change-id ${{ env.CHANGE_ID }} \
            --deployed-by ${{ github.actor }} \
            --timestamp $(date -u +"%Y-%m-%dT%H:%M:%SZ")

Availability (A)

A1.2: System Availability Monitoring

Requirement: Organization monitors system availability and takes corrective action.
# SLA Definitions
sla:
  uptime_target: "99.9%"  # 43.8 minutes/month downtime
  response_time_p95: "2000ms"  # 95th percentile
  response_time_p99: "5000ms"  # 99th percentile

  # Incident response SLAs
  incident_response:
    critical: "15_minutes"  # Page on-call immediately
    high: "1_hour"
    medium: "4_hours"
    low: "24_hours"

# Monitoring configuration
monitoring:
  health_checks:
    interval: "30s"
    timeout: "10s"
    endpoints:
      - /health
      - /ready

  synthetic_monitoring:
    frequency: "5_minutes"
    locations:
      - "us-east"
      - "eu-west"
      - "asia-southeast"

  uptime_tracking:
    provider: "StatusPage.io"
    public_status_page: "https://status.example.com"

Processing Integrity (PI)

PI1.1: Data Processing Controls

Requirement: Processing is complete, valid, accurate, timely, and authorized.
# src/processing/integrity.py

class ProcessingIntegrity:
    """SOC 2 processing integrity controls."""

    async def process_with_validation(
        self,
        data: dict,
        processing_function: callable
    ) -> dict:
        """Process data with integrity checks."""

        # 1. Input Validation (Completeness)
        validated_input = self.validate_input(data)

        # 2. Processing Authorization
        if not await self.is_authorized(validated_input):
            raise ValueError("Processing not authorized")

        # 3. Execute Processing
        start_time = datetime.utcnow()
        result = await processing_function(validated_input)
        end_time = datetime.utcnow()

        # 4. Output Validation (Accuracy)
        validated_output = self.validate_output(result)

        # 5. Timeliness Check
        processing_time = (end_time - start_time).total_seconds()
        if processing_time > self.MAX_PROCESSING_TIME:
            await self.alert_slow_processing(processing_time)

        # 6. Integrity Verification
        integrity_hash = self.compute_integrity_hash(validated_output)

        # 7. Audit Trail
        await self.audit_log.info(
            event="processing_completed",
            input_hash=self.compute_integrity_hash(validated_input),
            output_hash=integrity_hash,
            processing_time_seconds=processing_time,
            status="success"
        )

        return {
            **validated_output,
            "_integrity_hash": integrity_hash,
            "_processed_at": end_time.isoformat()
        }

Next Steps