Technical Safeguards
1. Access Control (§164.312(a)(1))
Requirement: Implement technical policies and procedures for electronic information systems that maintain ePHI to allow access only to those persons or software programs that have been granted access rights.MCP Server Implementation
- Authentication
- Authorization
- Session Management
JWT-Based Authentication:Keycloak SSO Integration:
Copy
Ask AI
# config/auth.yaml
authentication:
jwt:
enabled: true
secret: ${JWT_SECRET} # Stored in Infisical
issuer: "mcp-server-prod"
audience: "healthcare-agents"
expiration: 3600 # 1 hour
refresh_enabled: true
refresh_expiration: 86400 # 24 hours
mfa:
enabled: true # REQUIRED for HIPAA
providers:
- totp
- webauthn
grace_period: 0 # No grace period
Copy
Ask AI
# config/keycloak.yaml
keycloak:
realm: healthcare
server_url: https://auth.healthcare.example.com
client_id: mcp-server
client_secret: ${KEYCLOAK_CLIENT_SECRET}
# HIPAA requirements
require_mfa: true
session_timeout: 900 # 15 minutes
idle_timeout: 300 # 5 minutes
max_sessions_per_user: 1 # Single session
OpenFGA (Zanzibar Model):Role-Based Access Control:
Copy
Ask AI
# Fine-grained access control for PHI
from openfga import OpenFGAClient
# Define authorization model
model = """
model
schema 1.1
type user
type patient
relations
define viewer: [user]
define editor: [user]
define owner: [user]
type medical_record
relations
define patient: [patient]
define viewer: [user] or viewer from patient
define editor: [user] or editor from patient
define can_view: viewer
define can_edit: editor
"""
# Check access before serving PHI
async def check_phi_access(user_id: str, record_id: str, action: str):
allowed = await openfga_client.check(
user=f"user:{user_id}",
relation=f"can_{action}",
object=f"medical_record:{record_id}"
)
return allowed
Copy
Ask AI
# config/rbac.yaml
roles:
healthcare_provider:
permissions:
- phi:read
- phi:write
- patient:diagnose
nurse:
permissions:
- phi:read
- patient:update_vitals
administrative:
permissions:
- phi:read # Limited to billing PHI only
- patient:schedule
patient:
permissions:
- phi:read_own # Only their own records
Automatic Session Timeouts:
Copy
Ask AI
# src/middleware/session.py
from datetime import datetime, timedelta
class HIPAASessionMiddleware:
"""Enforce HIPAA session timeout requirements."""
MAX_SESSION_DURATION = timedelta(minutes=15)
IDLE_TIMEOUT = timedelta(minutes=5)
async def __call__(self, request, call_next):
session = request.state.session
# Check session age
if session.created_at + self.MAX_SESSION_DURATION < datetime.utcnow():
return JSONResponse(
status_code=401,
content={"error": "Session expired (max duration)"}
)
# Check idle time
if session.last_activity + self.IDLE_TIMEOUT < datetime.utcnow():
await self.revoke_session(session.id)
return JSONResponse(
status_code=401,
content={"error": "Session expired (idle timeout)"}
)
# Update last activity
session.last_activity = datetime.utcnow()
response = await call_next(request)
return response
2. Audit Controls (§164.312(b))
Requirement: Implement hardware, software, and/or procedural mechanisms that record and examine activity in information systems that contain or use ePHI.Comprehensive Audit Logging
- Audit Log Configuration
- Audit Log Format
- Audit Query Examples
Copy
Ask AI
# config/audit.yaml
audit_logging:
enabled: true
retention_days: 2555 # 7 years (HIPAA requirement)
events:
# Authentication events
- user_login
- user_logout
- failed_login_attempt
- mfa_challenge
- password_reset
# Authorization events
- phi_access_granted
- phi_access_denied
- permission_change
# Data events
- phi_read
- phi_create
- phi_update
- phi_delete
- phi_export
# System events
- configuration_change
- user_created
- user_deleted
- role_assigned
output:
- type: postgresql
table: audit_logs
encrypted: true
- type: siem
endpoint: https://siem.healthcare.example.com
protocol: syslog
- type: s3
bucket: hipaa-audit-logs
encryption: AES-256
lifecycle: 2555_days
Copy
Ask AI
{
"timestamp": "2025-11-02T14:32:15.123Z",
"event_id": "evt_7k9m2n4p",
"event_type": "phi_read",
"user": {
"id": "user_abc123",
"email": "dr.smith@hospital.example.com",
"role": "healthcare_provider",
"ip_address": "10.0.1.45",
"user_agent": "Mozilla/5.0...",
"mfa_verified": true
},
"resource": {
"type": "medical_record",
"id": "record_xyz789",
"patient_id": "patient_456",
"classification": "PHI"
},
"action": {
"operation": "read",
"result": "success",
"authorization_method": "openfga",
"access_granted_by": "viewer_relation"
},
"context": {
"agent_workflow": "diagnosis_assistant",
"tool_used": "ehr_integration",
"llm_provider": "gemini-2.5-pro",
"tokens_processed": 1250
},
"metadata": {
"request_id": "req_1a2b3c",
"session_id": "sess_x7y8z9",
"trace_id": "trace_lmn456"
}
}
Copy
Ask AI
-- Find all PHI access by user in last 30 days
SELECT
timestamp,
event_type,
resource->>'patient_id' as patient_id,
action->>'result' as result
FROM audit_logs
WHERE user->>'id' = 'user_abc123'
AND event_type IN ('phi_read', 'phi_update', 'phi_delete')
AND timestamp >= NOW() - INTERVAL '30 days'
ORDER BY timestamp DESC;
-- Detect unauthorized access attempts
SELECT
user->>'email' as user_email,
resource->>'id' as resource_id,
COUNT(*) as failed_attempts
FROM audit_logs
WHERE event_type = 'phi_access_denied'
AND timestamp >= NOW() - INTERVAL '1 hour'
GROUP BY user->>'email', resource->>'id'
HAVING COUNT(*) >= 3;
-- Compliance report: All PHI access in date range
SELECT
DATE(timestamp) as access_date,
user->>'role' as user_role,
COUNT(*) as access_count,
COUNT(DISTINCT resource->>'patient_id') as unique_patients
FROM audit_logs
WHERE event_type IN ('phi_read', 'phi_update')
AND timestamp BETWEEN '2025-10-01' AND '2025-10-31'
GROUP BY DATE(timestamp), user->>'role'
ORDER BY access_date, user_role;
3. Integrity Controls (§164.312(c)(1))
Requirement: Implement policies and procedures to protect ePHI from improper alteration or destruction.Data Integrity Implementation
Copy
Ask AI
# src/middleware/integrity.py
import hashlib
import hmac
from datetime import datetime
class DataIntegrityMiddleware:
"""Ensure PHI integrity with cryptographic hashing."""
def compute_integrity_hash(self, data: dict, secret: bytes) -> str:
"""Compute HMAC-SHA256 hash of PHI data."""
canonical = json.dumps(data, sort_keys=True)
return hmac.new(secret, canonical.encode(), hashlib.sha256).hexdigest()
async def store_phi(self, patient_id: str, data: dict):
"""Store PHI with integrity hash."""
# Compute integrity hash
integrity_hash = self.compute_integrity_hash(
data,
secret=self.get_secret_key()
)
# Store with metadata
record = {
"patient_id": patient_id,
"data": data,
"integrity_hash": integrity_hash,
"created_at": datetime.utcnow().isoformat(),
"created_by": self.current_user.id,
"version": 1
}
await self.db.medical_records.insert_one(record)
return record
async def verify_phi_integrity(self, record: dict) -> bool:
"""Verify PHI has not been tampered with."""
stored_hash = record.get("integrity_hash")
computed_hash = self.compute_integrity_hash(
record["data"],
secret=self.get_secret_key()
)
if stored_hash != computed_hash:
# Log integrity violation
await self.audit_log.critical(
event="integrity_violation",
record_id=record["_id"],
patient_id=record["patient_id"]
)
return False
return True
4. Transmission Security (§164.312(e)(1))
Requirement: Implement technical security measures to guard against unauthorized access to ePHI that is being transmitted over an electronic communications network.Encryption in Transit
Copy
Ask AI
# config/tls.yaml
tls:
# Require TLS 1.3 (HIPAA best practice)
min_version: "1.3"
cipher_suites:
- TLS_AES_256_GCM_SHA384
- TLS_CHACHA20_POLY1305_SHA256
- TLS_AES_128_GCM_SHA256
# Certificate management
cert_file: /secrets/tls/cert.pem
key_file: /secrets/tls/key.pem
ca_file: /secrets/tls/ca.pem
# mTLS for service-to-service
mutual_tls:
enabled: true
verify_client_cert: true
allowed_dns_names:
- "*.healthcare.example.com"
# Kubernetes Ingress
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: mcp-server-hipaa
annotations:
nginx.ingress.kubernetes.io/ssl-protocols: "TLSv1.3"
nginx.ingress.kubernetes.io/ssl-ciphers: "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256"
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
spec:
tls:
- hosts:
- agents.healthcare.example.com
secretName: tls-cert
5. Encryption at Rest (§164.312(a)(2)(iv))
Requirement: Implement a mechanism to encrypt and decrypt ePHI (addressable).Database Encryption
Copy
Ask AI
# PostgreSQL encryption configuration
# postgresql.conf
ssl = on
ssl_cert_file = '/var/lib/postgresql/server.crt'
ssl_key_file = '/var/lib/postgresql/server.key'
ssl_ca_file = '/var/lib/postgresql/root.crt'
# Enable transparent data encryption (TDE)
# For Google Cloud SQL
resource "google_sql_database_instance" "hipaa_db" {
name = "mcp-server-hipaa"
settings {
tier = "db-n1-standard-4"
# Encryption at rest
database_flags {
name = "cloudsql.enable_pgaudit"
value = "on"
}
backup_configuration {
enabled = true
binary_log_enabled = false # PostgreSQL
point_in_time_recovery_enabled = true
}
# Encrypt backups
disk_encryption_configuration {
kms_key_name = "projects/PROJECT_ID/locations/us-central1/keyRings/hipaa/cryptoKeys/phi-db"
}
}
}
Application-Level Encryption
Copy
Ask AI
# src/encryption/phi_encryption.py
from cryptography.fernet import Fernet
from typing import Any, Dict
class PHIEncryption:
"""Application-level encryption for sensitive PHI fields."""
def __init__(self, encryption_key: bytes):
self.cipher = Fernet(encryption_key)
def encrypt_sensitive_fields(self, record: Dict[str, Any]) -> Dict[str, Any]:
"""Encrypt sensitive PHI fields before storage."""
sensitive_fields = [
"ssn",
"date_of_birth",
"medical_record_number",
"diagnosis",
"medications",
"test_results"
]
encrypted_record = record.copy()
for field in sensitive_fields:
if field in record and record[field]:
# Encrypt field value
plaintext = json.dumps(record[field]).encode()
encrypted = self.cipher.encrypt(plaintext)
encrypted_record[field] = {
"_encrypted": True,
"_value": encrypted.decode()
}
return encrypted_record
def decrypt_sensitive_fields(self, record: Dict[str, Any]) -> Dict[str, Any]:
"""Decrypt sensitive PHI fields after retrieval."""
decrypted_record = record.copy()
for field, value in record.items():
if isinstance(value, dict) and value.get("_encrypted"):
# Decrypt field value
encrypted = value["_value"].encode()
plaintext = self.cipher.decrypt(encrypted)
decrypted_record[field] = json.loads(plaintext)
return decrypted_record