Project 8: Just-In-Time (JIT) Access Broker
Project 8: Just-In-Time (JIT) Access Broker
The Core Question: âWhy should developers ever have permanent database passwords? What if every credential had a heartbeat?â
Project Overview
| Attribute | Value |
|---|---|
| Difficulty | Level 3: Advanced |
| Time Estimate | 1 Week (40-60 hours) |
| Primary Language | Go or Python |
| Alternative Languages | Rust, Node.js |
| Prerequisites | REST API development, SQL basics, understanding of authentication |
| Main Book | âFoundations of Information Securityâ by Jason Andress |
| Software/Tool | HashiCorp Vault (concepts), AWS IAM (optional) |
| Knowledge Area | Identity Management / Ephemeral Credentials |
Learning Objectives
By completing this project, you will:
- Understand the danger of static credentials - Why permanent passwords are Zero Trustâs worst enemy
- Implement ephemeral credential generation - Create database users that exist only for minutes
- Master TTL-based revocation - Build reliable automatic cleanup systems
- Design approval workflows - Implement human-in-the-loop access decisions
- Create compliance-ready audit logs - Log every access request, approval, and revocation
- Apply Least Privilege at the database level - Generate credentials with minimal required permissions
- Understand HashiCorp Vaultâs Database Secrets Engine - Conceptually and potentially integrate with it
Deep Theoretical Foundation
The Problem with Static Credentials
Traditional database access looks like this:
+------------------------------------------------------------------+
| The Static Credential Problem |
+------------------------------------------------------------------+
| |
| Developer Config File Production Database |
| | | | |
| | 1. Store once | | |
| |---------------->| DB_PASSWORD= | |
| | | "supersecret123" | |
| | | (never rotated) | |
| | | | |
| | 2. Use forever | | |
| |----------------------------------------->| |
| | | | |
| | 3. Developer leaves company... | |
| | | | |
| | 4. Password still works! | |
| | X----------------------------------------->| |
| | (credential never revoked) | |
| | | | |
| +----------------------------------------------------------+ |
| | Problems: | |
| | - Password shared in git history, chat, wikis | |
| | - No expiration date | |
| | - No audit trail of who used it when | |
| | - Same password used by multiple developers | |
| | - Rotation requires coordinated app redeployment | |
| +----------------------------------------------------------+ |
| |
+------------------------------------------------------------------+
Real-World Impact: The 2019 Capital One breach involved credentials that should have been rotated months earlier. The 2021 Codecov breach exposed thousands of companiesâ static secrets.
Ephemeral vs Static Credentials
| Aspect | Static Credentials | Ephemeral Credentials |
|---|---|---|
| Lifetime | Months to years | Minutes to hours |
| Rotation | Manual, painful | Automatic, every use |
| Sharing | Often shared among team | Unique per request |
| Revocation | Forget to do it | Automatic on expiry |
| Audit Trail | âSomeone used the prod passwordâ | âalice@corp requested DB access at 14:32 for incident #1234â |
| Blast Radius | If leaked, valid forever | If leaked, valid for 30 minutes max |
| Compliance | Hard to prove access control | Every access logged and justified |
The Principle of Least Privilege
+------------------------------------------------------------------+
| Least Privilege in Action |
+------------------------------------------------------------------+
| |
| Traditional: "Give developers the 'db_admin' role" |
| +----------------------------------------------------------+ |
| | Permissions granted: | |
| | - SELECT on all tables <- Actually needed | |
| | - INSERT on all tables <- Sometimes needed | |
| | - UPDATE on all tables <- Rarely needed | |
| | - DELETE on all tables <- Never needed for debugging | |
| | - DROP TABLE <- NEVER needed | |
| | - CREATE USER <- NEVER needed | |
| | - GRANT <- NEVER needed | |
| +----------------------------------------------------------+ |
| |
| JIT Approach: "Grant exactly what's needed, when needed" |
| +----------------------------------------------------------+ |
| | Request: "Debug user lookup for ticket PROD-1234" | |
| | Generated permissions: | |
| | - SELECT on users table ONLY | |
| | - WHERE clause: user_id = 12345 ONLY (row-level) | |
| | - TTL: 30 minutes | |
| | - Read replica only (can't affect production writes) | |
| +----------------------------------------------------------+ |
| |
+------------------------------------------------------------------+
Just-In-Time Access Patterns
JIT access follows a request-approve-grant-revoke lifecycle:
+------------------------------------------------------------------+
| JIT Access Lifecycle |
+------------------------------------------------------------------+
| |
| Developer JIT Broker Manager Database |
| | | | | |
| | 1. Request Access | | | |
| | - Why: ticket | | | |
| | - What: SELECT | | | |
| | - How long: 30m | | | |
| |------------------>| | | |
| | | | | |
| | | 2. Route for | | |
| | | approval | | |
| | |---------------->| | |
| | | | | |
| | | 3. Approved | | |
| | |<----------------| | |
| | | | | |
| | | 4. Create temp user | |
| | | (random password) | |
| | |-------------------------------->| |
| | | | | |
| | 5. Credentials | | | |
| | (30 min TTL) | | | |
| |<------------------| | | |
| | | | | |
| | 6. Use database | | | |
| |-----------------------------------------------> | |
| | | | | |
| | | 7. TTL expires | | |
| | | (30 min later) | | |
| | |-------------------------------->| |
| | | DROP USER... | | |
| | | | | |
| +----------------------------------------------------------+ |
| | Result: Access granted for exactly 30 minutes, fully | |
| | logged, with specific permissions, automatically revoked | |
| +----------------------------------------------------------+ |
| |
+------------------------------------------------------------------+
Database User/Role Management
PostgreSQL User Management
PostgreSQL provides rich access control through roles:
-- PostgreSQL Role Hierarchy
+------------------------------------------------------------------+
| PostgreSQL Access Control |
+------------------------------------------------------------------+
| |
| SUPERUSER (postgres) |
| | |
| +-- db_owner (role that owns the database) |
| | | |
| | +-- app_service (application's permanent role) |
| | | |
| | +-- jit_creator (role that creates temp users) |
| | | |
| | +-- jit_user_a1b2c3 (temp, 30 min) |
| | +-- jit_user_d4e5f6 (temp, 30 min) |
| | +-- jit_user_g7h8i9 (temp, 30 min) |
| | |
+------------------------------------------------------------------+
-- Creating a JIT creator role (one-time setup)
CREATE ROLE jit_creator WITH LOGIN PASSWORD 'broker-secret';
GRANT CONNECT ON DATABASE myapp TO jit_creator;
GRANT CREATE ON SCHEMA public TO jit_creator;
-- The JIT creator can create temporary users
GRANT pg_read_all_data TO jit_creator WITH ADMIN OPTION;
-- (PostgreSQL 14+, or grant SELECT on specific tables)
-- Creating an ephemeral user (what the broker does)
CREATE ROLE jit_user_a1b2c3
WITH LOGIN
PASSWORD 'randomly-generated-32-char-password'
VALID UNTIL (NOW() + INTERVAL '30 minutes')
CONNECTION LIMIT 5;
-- Grant specific permissions
GRANT CONNECT ON DATABASE myapp TO jit_user_a1b2c3;
GRANT USAGE ON SCHEMA public TO jit_user_a1b2c3;
GRANT SELECT ON users, orders TO jit_user_a1b2c3;
-- Revocation (automatic or explicit)
DROP ROLE jit_user_a1b2c3;
-- Note: Must first terminate active connections
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE usename = 'jit_user_a1b2c3';
MySQL User Management
MySQLâs approach is similar but with different syntax:
-- MySQL User Management for JIT
+------------------------------------------------------------------+
| MySQL Access Control |
+------------------------------------------------------------------+
-- Creating an ephemeral user
CREATE USER 'jit_user_a1b2c3'@'%'
IDENTIFIED BY 'randomly-generated-password'
PASSWORD EXPIRE INTERVAL 30 MINUTE
ACCOUNT LOCK; -- Lock until ready
-- Grant specific permissions
GRANT SELECT ON myapp.users TO 'jit_user_a1b2c3'@'%';
GRANT SELECT ON myapp.orders TO 'jit_user_a1b2c3'@'%';
-- Unlock the account (activate credentials)
ALTER USER 'jit_user_a1b2c3'@'%' ACCOUNT UNLOCK;
-- Revocation
DROP USER 'jit_user_a1b2c3'@'%';
-- Kill active sessions during revocation
SELECT CONCAT('KILL ', id, ';')
FROM information_schema.processlist
WHERE user = 'jit_user_a1b2c3';
TTL Management and Reliable Scheduling
The most challenging aspect of JIT access is reliable revocation. Missing a revocation means the credential lives beyond its intended lifetime.
+------------------------------------------------------------------+
| TTL Management Strategies |
+------------------------------------------------------------------+
| |
| Strategy 1: Database-Level Expiry (PostgreSQL VALID UNTIL) |
| +----------------------------------------------------------+ |
| | Pros: | |
| | - Enforced by database, not our code | |
| | - Works even if broker crashes | |
| | - Simple to implement | |
| | | |
| | Cons: | |
| | - User role remains in database (clutter) | |
| | - No cleanup of granted permissions | |
| | - Connection might stay active after expiry | |
| +----------------------------------------------------------+ |
| |
| Strategy 2: Task Queue (Celery, RQ, etc.) |
| +----------------------------------------------------------+ |
| | Pros: | |
| | - Full cleanup (DROP USER, kill connections) | |
| | - Can retry on failure | |
| | - Visible queue for monitoring | |
| | | |
| | Cons: | |
| | - Another service to maintain | |
| | - Queue failures delay revocation | |
| | - Requires persistent job storage (Redis, DB) | |
| +----------------------------------------------------------+ |
| |
| Strategy 3: Periodic Sweeper + Database Expiry (Recommended) |
| +----------------------------------------------------------+ |
| | Best of both worlds: | |
| | - Database VALID UNTIL provides hard guarantee | |
| | - Background sweeper runs every 5 min | |
| | - Sweeper cleans up expired users (DROP USER) | |
| | - Even if sweeper is late, access is still revoked | |
| +----------------------------------------------------------+ |
| |
+------------------------------------------------------------------+
Audit Logging for Compliance
SOC2, PCI-DSS, HIPAA, and GDPR all require demonstrable access controls:
+------------------------------------------------------------------+
| Compliance Audit Requirements |
+------------------------------------------------------------------+
| |
| SOC2 Control CC6.1: |
| "The entity implements logical access security software, |
| infrastructure, and architectures over protected information |
| assets to protect them from security events." |
| |
| What auditors want to see: |
| +----------------------------------------------------------+ |
| | 1. WHO requested access? | |
| | - User identity (email, employee ID) | |
| | - Authentication method (SSO, MFA used?) | |
| | | |
| | 2. WHAT did they request? | |
| | - Database and tables requested | |
| | - Permission level (SELECT, UPDATE, etc.) | |
| | - Row-level restrictions if any | |
| | | |
| | 3. WHY did they need it? | |
| | - Ticket number / incident reference | |
| | - Business justification | |
| | | |
| | 4. WHO approved it? | |
| | - Approver identity | |
| | - Timestamp of approval | |
| | - Or: auto-approval policy that was applied | |
| | | |
| | 5. WHEN was access active? | |
| | - Credential creation timestamp | |
| | - Credential revocation timestamp | |
| | - Actual usage timestamps (from DB logs) | |
| | | |
| | 6. Was access PROPERLY revoked? | |
| | - Revocation timestamp | |
| | - Confirmation that user was dropped | |
| | - Any sessions terminated | |
| +----------------------------------------------------------+ |
| |
| Example Audit Log Entry: |
| { |
| "event_id": "uuid-here", |
| "event_type": "credential_granted", |
| "timestamp": "2024-01-15T14:32:00Z", |
| "requester": { |
| "email": "alice@company.com", |
| "employee_id": "EMP-1234", |
| "ip_address": "10.0.1.50", |
| "mfa_verified": true |
| }, |
| "request": { |
| "database": "production", |
| "tables": ["users", "orders"], |
| "permissions": ["SELECT"], |
| "justification": "Debugging PROD-5678", |
| "requested_ttl_minutes": 30 |
| }, |
| "approval": { |
| "approved_by": "bob@company.com", |
| "approved_at": "2024-01-15T14:35:00Z", |
| "granted_ttl_minutes": 30 |
| }, |
| "credential": { |
| "temp_username": "jit_user_a1b2c3", |
| "expires_at": "2024-01-15T15:05:00Z", |
| "password_hash": "sha256:abc...(for audit, not auth)" |
| } |
| } |
| |
+------------------------------------------------------------------+
HashiCorp Vaultâs Database Secrets Engine (Conceptual)
Vaultâs database secrets engine is the production-grade implementation of what youâre building:
+------------------------------------------------------------------+
| Vault Database Secrets Engine Concepts |
+------------------------------------------------------------------+
| |
| Application Vault Database |
| | | | |
| | 1. Authenticate | | |
| |---------------->| | |
| | (using JWT, | | |
| | AppRole, etc)| | |
| | | | |
| | 2. Request creds| | |
| | for "mydb" | | |
| |---------------->| | |
| | | | |
| | | 3. Create user | |
| | | (uses template) | |
| | |------------------>| |
| | | | |
| | 4. Credentials | | |
| | + lease_id | | |
| |<----------------| | |
| | | | |
| | 5. Use database | | |
| |------------------------------------->| |
| | | | |
| | | 6. Lease expires | |
| | | or is revoked | |
| | |------------------>| |
| | | DROP USER | |
| |
| Key Vault Concepts: |
| +----------------------------------------------------------+ |
| | - Leases: Every secret has an expiration (TTL) | |
| | - Renewal: Apps can extend lease before expiry | |
| | - Revocation: Explicit or automatic on lease expiry | |
| | - Templates: SQL statements for user creation/deletion | |
| | - Rotation: Root credentials rotated without downtime | |
| +----------------------------------------------------------+ |
| |
+------------------------------------------------------------------+
Complete Project Specification
What Youâre Building
A self-service portal and API where developers can request temporary database access. Instead of permanent passwords stored in config files, the broker:
- Accepts access requests with justification
- Routes requests for approval (or auto-approves based on policy)
- Generates a temporary database user with a random password
- Sets the credential to expire in N minutes (configurable TTL)
- Provides credentials to the requester
- Automatically revokes the user when the TTL expires
- Logs everything for compliance audits
Functional Requirements
- Access Request Submission
- Developer submits request via web UI or CLI
- Request includes: target database, required permissions, justification, requested TTL
- System validates the requesterâs identity (SSO/OAuth)
- Approval Workflow
- Requests routed to appropriate approver based on policy
- Approvers receive notification (email, Slack, etc.)
- Approvers can approve, deny, or modify (reduce permissions/TTL)
- Auto-approval policies for low-risk requests (e.g., read-only on staging)
- Credential Generation
- Generate unique username (e.g.,
jit_alice_2024011514) - Generate cryptographically random password (32+ characters)
- Create database user with minimal required permissions
- Set database-level expiry (VALID UNTIL)
- Generate unique username (e.g.,
- Credential Delivery
- Display credentials once (never store plaintext after)
- Provide connection string / psql command
- Option to auto-configure client tool (e.g., inject into psql session)
- Automatic Revocation
- Background job runs every minute
- Identifies expired credentials
- Terminates active database connections
- Drops the temporary user
- Logs revocation event
- Audit Logging
- Every API call logged with full context
- Audit log is append-only, tamper-evident
- Supports export to SIEM (JSON, Syslog)
Technical Requirements
- API: REST with JSON, authenticated via OAuth2/OIDC
- Database Support: PostgreSQL (primary), MySQL (secondary)
- Task Queue: Celery with Redis, or goroutine-based scheduler (Go)
- Persistence: PostgreSQL for broker metadata
- Authentication: OAuth2/OIDC (e.g., Google, Okta, Auth0)
Acceptance Criteria
# Scenario 1: Request and receive credentials
$ jit-cli request \
--database production-pg \
--permissions SELECT \
--tables users,orders \
--justification "Debugging PROD-1234" \
--ttl 30m
Request submitted. Awaiting approval...
Approved by manager@company.com at 14:35:00
Your credentials (valid for 30 minutes):
Username: jit_alice_20240115_a1b2c3
Password: Kj8mN2pQ5rT7vX9yB3dF6gH8jL0n
Expires: 2024-01-15 15:05:00 UTC
Connect with:
psql "postgresql://jit_alice_20240115_a1b2c3:Kj8mN2pQ5rT7vX9yB3dF6gH8jL0n@db.company.com:5432/production"
# Scenario 2: Credential expires automatically
# At 15:05:00, connection fails:
$ psql "postgresql://jit_alice_20240115_a1b2c3:..."
psql: error: FATAL: role "jit_alice_20240115_a1b2c3" does not exist
# Scenario 3: Audit query
$ jit-cli audit --user alice@company.com --since 2024-01-01
[
{
"event": "access_requested",
"time": "2024-01-15T14:32:00Z",
"database": "production-pg",
"permissions": ["SELECT"],
"justification": "Debugging PROD-1234"
},
{
"event": "access_approved",
"time": "2024-01-15T14:35:00Z",
"approved_by": "manager@company.com"
},
{
"event": "credential_created",
"time": "2024-01-15T14:35:05Z",
"temp_user": "jit_alice_20240115_a1b2c3",
"expires": "2024-01-15T15:05:00Z"
},
{
"event": "credential_revoked",
"time": "2024-01-15T15:05:01Z",
"reason": "TTL expired"
}
]
Real World Outcome
After completing this project, you can connect to production databases like this:
# Before: Static credentials in .env files, shared among team
$ cat .env
DATABASE_URL=postgresql://app_user:supersecret@prod:5432/myapp
# This password was set 2 years ago, known by 15 people who've left...
# After: JIT credentials, unique per request, auto-expiring
$ jit request --db prod --reason "PROD-5678" --ttl 30m
Approved! Credentials expire in 30 minutes.
$ psql "postgresql://jit_alice_a1b2c3:Kj8m...@prod:5432/myapp"
psql (15.2)
SSL connection (protocol: TLSv1.3)
myapp=> SELECT * FROM users WHERE id = 12345;
id | email | created_at
------+-------------------+------------
12345 | user@example.com | 2024-01-10
(1 row)
myapp=> \q
# 30 minutes later...
$ psql "postgresql://jit_alice_a1b2c3:Kj8m...@prod:5432/myapp"
psql: error: connection to server failed: FATAL:
role "jit_alice_a1b2c3" does not exist
# The credential is dead. As it should be.
The Core Question Youâre Answering
âHow can I eliminate standing privileges and grant access only when needed, for only as long as needed, with full audit trails?â
Traditional access control grants permanent or long-lived credentials that accumulate over time. An engineer joins the team and receives database access. Years later, theyâve changed roles three times but still have that original access. When they leave the company, nobody remembers all the systems they could reach.
This âstanding privilegeâ model is dangerous for several reasons:
- Credential sprawl - Over time, users accumulate more access than they need
- Forgotten access - Nobody tracks who has access to what after initial provisioning
- Stale credentials - Passwords set years ago remain valid indefinitely
- No context - You cannot answer âwhy did Alice access the database at 3am?â
- Shared secrets - The same password ends up in config files, wikis, and chat logs
- Blast radius - A leaked credential provides unlimited access until manually revoked
JIT access inverts this model: no standing access exists. Every access request requires justification, approval (human or automated), and generates a unique, time-limited credential. When the credential expires, access is automatically revoked. Every access is logged with who, what, why, and when.
Concepts You Must Understand First
Before implementing a JIT access broker, you need to understand these foundational concepts:
1. Just-In-Time (JIT) vs Just-Enough Access (JEA)
These are complementary but distinct principles:
-
JIT (Just-In-Time): Access is granted only when needed and automatically expires. The temporal dimension. âYou get access for 30 minutes, starting now.â
-
JEA (Just-Enough Access): Access is scoped to the minimum necessary permissions. The permission dimension. âYou get SELECT on the users table only, not full database admin.â
A complete Zero Trust implementation requires both: access that is granted only when needed (JIT) with only the permissions required (JEA).
2. Privileged Access Management (PAM)
PAM is the discipline of controlling and monitoring privileged access to critical systems. Key PAM concepts include:
- Privileged accounts: Administrative accounts with elevated permissions (database admins, root access, cloud admin roles)
- Session recording: Capturing what users do during privileged sessions
- Credential checkout: Users âcheck outâ credentials for a defined period, then return them
- Separation of duties: Ensuring no single user can complete a sensitive action alone
Your JIT broker is essentially a lightweight PAM solution focused on database access.
3. Credential Vaulting and Rotation
Static credentials stored in application configs are security liabilities. Modern approaches include:
- Secret vaulting: Storing credentials in dedicated systems like HashiCorp Vault, AWS Secrets Manager, or Azure Key Vault
- Dynamic secrets: Instead of storing credentials, generate them on-demand (what youâre building)
- Automatic rotation: Regularly changing credentials without human intervention
- Zero-knowledge access: Applications receive credentials without developers ever seeing them
4. Time-Bound Access Tokens
Several mechanisms enable time-limited access:
- Database-level expiry: PostgreSQLâs
VALID UNTILclause, MySQLâsPASSWORD EXPIRE - Token TTLs: JWT tokens with expiration claims
- Lease-based access: HashiCorp Vaultâs lease system where every secret has a lifetime
- Session timeouts: Terminating idle or long-running database sessions
The key insight: use multiple layers. Database expiry is your last line of defense even if application-level revocation fails.
5. Approval Workflows
JIT access requires a decision-making process:
- Manual approval: A human reviews and approves each request
- Policy-based auto-approval: Predefined rules automatically approve low-risk requests
- Multi-party approval: Sensitive access requires multiple approvers
- Break-glass procedures: Emergency access that bypasses normal approval but triggers alerts
Design your workflow to balance security (approval) with developer productivity (not waiting hours for read-only staging access).
6. Audit Logging and Compliance
Compliance frameworks (SOC2, PCI-DSS, HIPAA, GDPR) require demonstrable access controls:
- Completeness: Every access event must be logged
- Immutability: Logs cannot be modified or deleted
- Traceability: Each action links to an identity and justification
- Retention: Logs kept for required periods (often 1-7 years)
- Searchability: Auditors must be able to query historical access
Your audit log should answer: âShow me every time anyone accessed the payments table in production last month, who approved it, and why.â
Questions to Guide Your Design
Before writing code, think through these design questions:
Request Flow
- How does a developer initiate an access request? CLI tool? Web UI? Slack bot?
- What information must they provide? Database name, tables, permissions, justification, requested duration?
- How do you validate the requesterâs identity? OAuth? SSO? API keys?
- Where are pending requests stored? How long before they expire unapproved?
Approval Process
- Who can approve requests? Direct managers? Database owners? Security team?
- How do approvers receive notifications? Email? Slack? In-app?
- Whatâs the SLA for approvals? Should low-risk requests auto-approve?
- Can approvers modify requests (reduce permissions, shorten TTL)?
Credential Lifecycle
- How are temporary usernames generated? Must be unique, identifiable, compliant with database constraints.
- How are passwords generated? Cryptographically random, sufficient entropy, no special characters that break connection strings?
- How are credentials delivered to the requester? Display once? Inject into CLI? Store in their vault?
- What happens if credential creation fails partway through?
Revocation Reliability
- How do you ensure revocation happens even if the broker crashes?
- What if the target database is temporarily unreachable?
- How do you handle active sessions when revoking?
- Whatâs your monitoring for missed revocations?
Audit and Compliance
- What events must be logged? Requests, approvals, denials, credential creation, usage, revocation?
- How do you prevent tampering with audit logs?
- How long must logs be retained?
- Can you prove to an auditor exactly who had database access at any point in time?
Thinking Exercise
Before implementing, design an access request workflow on paper:
Scenario: Developer Alice needs to debug a production issue. She needs SELECT access to the orders and payments tables for 30 minutes.
Draw the sequence diagram covering:
- Alice submits request (what information does she provide?)
- Request is validated (what checks happen?)
- Approval routing (who needs to approve? how are they notified?)
- Approval decision (what can the approver modify?)
- Credential generation (what database commands run?)
- Credential delivery (how does Alice receive the credentials?)
- Alice uses the database (what gets logged?)
- TTL expiry approaches (any warnings?)
- Revocation executes (what commands run? what if Alice is mid-query?)
- Audit query (how would a compliance auditor review this access?)
Consider edge cases:
- What if the approver doesnât respond in 2 hours?
- What if Alice requests access to a table that doesnât exist?
- What if the database is unreachable when creating the credential?
- What if revocation fails because the database is unreachable?
- What if Alice requests access again while her first credential is still active?
Hints in Layers
If you get stuck during implementation, use these hints progressively:
Hint 1: Credential Uniqueness
Problem: How do I generate unique temporary usernames that wonât collide?
Direction: Combine deterministic elements (requester identity, timestamp) with random elements (short hex suffix). Ensure the result fits within PostgreSQLâs 63-character identifier limit.
More detail
Format: jit_{username_prefix}_{YYYYMMDDHHMM}_{random_hex}
Example: jit_alice_202401151432_a1b2c3
The timestamp provides debuggability (when was this created?), the random suffix prevents collisions if the same user requests access twice in the same minute.
Hint 2: Reliable Revocation
Problem: How do I ensure credentials are always revoked, even if my broker crashes?
Direction: Use defense in depth. The databaseâs VALID UNTIL provides a hard guarantee. Your applicationâs revocation job provides cleanup (dropping the user role). Even if your job never runs, the credential stops working at the database level.
More detail
Three-layer approach:
- Database expiry:
VALID UNTILprevents authentication after TTL - Periodic sweeper: Background job runs every minute, cleans up expired users
- Monitoring: Alert if any credential is more than 5 minutes past expiry without revocation
The sweeper handles cleanup even if itâs delayed. The database expiry means a delayed sweeper doesnât extend access.
Hint 3: SQL Injection Prevention
Problem: Iâm building SQL statements with user-provided data (usernames, table names). How do I prevent injection?
Direction: Use your database driverâs identifier quoting functionality. Never use string formatting for SQL. Validate that usernames and table names match expected patterns before using them.
More detail
Python with psycopg2:
from psycopg2 import sql
# For identifiers (table names, usernames)
query = sql.SQL("GRANT SELECT ON {} TO {}").format(
sql.Identifier(table_name),
sql.Identifier(username)
)
# For values (passwords)
cursor.execute(
sql.SQL("CREATE ROLE {} WITH PASSWORD %s").format(
sql.Identifier(username)
),
(password,)
)
Also validate: if not re.match(r'^[a-z][a-z0-9_]{0,62}$', username): raise ValueError()
Hint 4: Handling Active Sessions During Revocation
Problem: When I drop a database user, their active sessions might continue. How do I terminate them?
Direction: Before dropping the role, query pg_stat_activity to find active connections for that user, then use pg_terminate_backend() to terminate them.
More detail
-- First, terminate all active connections
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE usename = 'jit_alice_a1b2c3';
-- Then drop the role
DROP ROLE IF EXISTS jit_alice_a1b2c3;
Note: pg_terminate_backend() requires appropriate privileges. Your JIT brokerâs admin connection needs the pg_signal_backend role or superuser access.
Hint 5: Tamper-Evident Audit Logs
Problem: How do I prove audit logs havenât been modified?
Direction: Use hash chaining. Each log entry includes a hash of the previous entry. To tamper with any entry, an attacker would need to recompute all subsequent hashes.
More detail
import hashlib
import json
def log_event(event_type, data, prev_hash):
entry = {
"event_type": event_type,
"timestamp": datetime.utcnow().isoformat(),
"data": data,
"previous_hash": prev_hash
}
entry_json = json.dumps(entry, sort_keys=True)
entry_hash = hashlib.sha256(entry_json.encode()).hexdigest()
# Store entry with its hash
db.insert(event_type=event_type, data=entry, entry_hash=entry_hash)
return entry_hash # Use as previous_hash for next entry
Verification: Iterate through all entries, recompute each hash, verify it matches stored hash and previous_hash chain is unbroken.
Solution Architecture
System Architecture
+------------------------------------------------------------------+
| JIT Access Broker Architecture |
+------------------------------------------------------------------+
| |
| +-----------+ +------------------------------------------+ |
| | Web UI |--->| API Gateway | |
| | (React) | | (rate limiting, OAuth2 verification) | |
| +-----------+ +------------------------------------------+ |
| | |
| +-----------+ v |
| | CLI Tool |--->+------------------------------------------+ |
| | (jit-cli) | | JIT Broker Core | |
| +-----------+ | | |
| | +-------------+ +------------------+ | |
| | | Request | | Credential | | |
| | | Manager | | Generator | | |
| | +-------------+ +------------------+ | |
| | | |
| | +-------------+ +------------------+ | |
| | | Approval | | Revocation | | |
| | | Workflow | | Scheduler | | |
| | +-------------+ +------------------+ | |
| | | |
| | +-------------+ +------------------+ | |
| | | Audit | | Policy | | |
| | | Logger | | Engine | | |
| | +-------------+ +------------------+ | |
| +------------------------------------------+ |
| | | | |
| +----------------+ | | |
| | | | |
| v v v |
| +-------------+ +-------------+ +--------+ |
| | Broker DB | | Target DB | | Redis | |
| | (metadata, | | (PostgreSQL | | (task | |
| | audit logs) | | or MySQL) | | queue) | |
| +-------------+ +-------------+ +--------+ |
| |
+------------------------------------------------------------------+
Data Model
-- Core tables for the JIT Broker
-- Registered target databases
CREATE TABLE target_databases (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(100) NOT NULL UNIQUE,
db_type VARCHAR(20) NOT NULL, -- 'postgresql' or 'mysql'
host VARCHAR(255) NOT NULL,
port INTEGER NOT NULL DEFAULT 5432,
database_name VARCHAR(100) NOT NULL,
admin_username VARCHAR(100) NOT NULL,
admin_password_encrypted BYTEA NOT NULL, -- encrypted with broker's key
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Access requests
CREATE TABLE access_requests (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
requester_email VARCHAR(255) NOT NULL,
requester_ip INET,
target_database_id UUID REFERENCES target_databases(id),
requested_permissions TEXT[] NOT NULL, -- ['SELECT', 'INSERT']
requested_tables TEXT[], -- null means all allowed tables
justification TEXT NOT NULL,
requested_ttl_minutes INTEGER NOT NULL DEFAULT 30,
status VARCHAR(20) NOT NULL DEFAULT 'pending', -- pending, approved, denied, expired
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Approvals
CREATE TABLE approvals (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
request_id UUID REFERENCES access_requests(id),
approver_email VARCHAR(255) NOT NULL,
decision VARCHAR(20) NOT NULL, -- 'approved' or 'denied'
granted_ttl_minutes INTEGER, -- may differ from requested
denied_reason TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Generated credentials
CREATE TABLE credentials (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
request_id UUID REFERENCES access_requests(id),
temp_username VARCHAR(100) NOT NULL,
password_hash VARCHAR(100) NOT NULL, -- for audit, not auth
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
expires_at TIMESTAMP WITH TIME ZONE NOT NULL,
revoked_at TIMESTAMP WITH TIME ZONE,
revocation_reason VARCHAR(50) -- 'ttl_expired', 'manual', 'emergency'
);
-- Audit log (append-only)
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
event_type VARCHAR(50) NOT NULL,
event_data JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Prevent updates/deletes on audit log
CREATE RULE audit_log_no_update AS ON UPDATE TO audit_log DO INSTEAD NOTHING;
CREATE RULE audit_log_no_delete AS ON DELETE TO audit_log DO INSTEAD NOTHING;
API Endpoints
# JIT Broker API Specification
POST /api/v1/requests
# Create a new access request
Request:
database_id: uuid
permissions: ["SELECT"]
tables: ["users", "orders"] # optional, null = all allowed
justification: "Debugging PROD-1234"
ttl_minutes: 30
Response:
request_id: uuid
status: "pending" | "auto_approved"
credential: { ... } | null # if auto-approved
GET /api/v1/requests/{id}
# Get request status
GET /api/v1/requests
# List my requests
Query params: status, since, until
POST /api/v1/requests/{id}/approve
# Approve a request (manager only)
Request:
ttl_minutes: 30 # can reduce from requested
Response:
credential: { username, password, expires_at, connection_string }
POST /api/v1/requests/{id}/deny
# Deny a request (manager only)
Request:
reason: "Too broad permissions requested"
DELETE /api/v1/credentials/{id}
# Emergency revocation
GET /api/v1/audit
# Query audit log
Query params: user, event_type, since, until
Phased Implementation Guide
Phase 1: Manual SQL User Creation/Deletion (Day 1)
Goal: Understand database user management mechanics before automating.
Deliverables:
- Script that creates a PostgreSQL user with specific permissions
- Script that revokes the user and terminates connections
- Understanding of VALID UNTIL behavior
Steps:
- Set up a local PostgreSQL instance:
```bash
Using Docker
docker run -d
âname jit-postgres
-e POSTGRES_PASSWORD=postgres
-p 5432:5432
postgres:15
Connect as superuser
psql -h localhost -U postgres
2. **Create a temporary user manually**:
```sql
-- Create user with 5-minute expiry
CREATE ROLE jit_test_user
WITH LOGIN
PASSWORD 'test123'
VALID UNTIL (NOW() + INTERVAL '5 minutes')
CONNECTION LIMIT 2;
-- Grant permissions
GRANT CONNECT ON DATABASE postgres TO jit_test_user;
GRANT USAGE ON SCHEMA public TO jit_test_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO jit_test_user;
- Test the credential lifecycle:
```bash
Connect as the temporary user
psql -h localhost -U jit_test_user -d postgres
Run a query
postgres=> SELECT current_user, now();
Wait 5 minutes, try again - should fail
psql -h localhost -U jit_test_user -d postgres
FATAL: password authentication failed (user expired)
4. **Implement cleanup script**:
```sql
-- Terminate connections and drop user
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE usename = 'jit_test_user';
DROP ROLE IF EXISTS jit_test_user;
- Write a Python/Go wrapper:
```python
Python example using psycopg2
import psycopg2 import secrets from datetime import datetime, timedelta
def create_temp_user(conn, username: str, tables: list[str], ttl_minutes: int) -> str: password = secrets.token_urlsafe(24) expires_at = datetime.utcnow() + timedelta(minutes=ttl_minutes)
with conn.cursor() as cur:
# Create role with expiry
cur.execute(f"""
CREATE ROLE {username}
WITH LOGIN
PASSWORD %s
VALID UNTIL %s
CONNECTION LIMIT 5
""", (password, expires_at))
# Grant permissions
cur.execute(f"GRANT CONNECT ON DATABASE mydb TO {username}")
cur.execute(f"GRANT USAGE ON SCHEMA public TO {username}")
for table in tables:
cur.execute(f"GRANT SELECT ON {table} TO {username}")
conn.commit()
return password
def revoke_user(conn, username: str): with conn.cursor() as cur: # Terminate connections cur.execute(âââ SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE usename = %s âââ, (username,))
# Drop role
cur.execute(f"DROP ROLE IF EXISTS {username}")
conn.commit() ```
Checkpoint: You can manually create and revoke temporary database users with specific permissions and TTLs.
Phase 2: REST API for Access Requests (Days 2-3)
Goal: Build the API that accepts and stores access requests.
Deliverables:
- REST API with request submission endpoint
- Database schema for storing requests
- Basic authentication (hardcoded tokens initially)
Steps:
- Set up the project structure:
jit-broker/
âââ cmd/
â âââ server/
â âââ main.go # Entry point
âââ internal/
â âââ api/
â â âââ handlers.go # HTTP handlers
â â âââ middleware.go # Auth, logging
â â âââ routes.go # Route definitions
â âââ models/
â â âââ models.go # Data structures
â âââ store/
â â âââ postgres.go # Database operations
â âââ credentials/
â âââ generator.go # User creation logic
âââ migrations/
â âââ 001_initial.sql # Schema
âââ go.mod
âââ go.sum
Or for Python:
jit-broker/
âââ app/
â âââ __init__.py
â âââ main.py # FastAPI app
â âââ api/
â â âââ routes.py # Endpoints
â â âââ deps.py # Dependencies
â âââ models/
â â âââ request.py # Pydantic models
â â âââ database.py # SQLAlchemy models
â âââ services/
â â âââ credentials.py # Credential generator
â â âââ audit.py # Audit logging
â âââ db/
â âââ session.py # DB connection
âââ alembic/ # Migrations
âââ requirements.txt
âââ Dockerfile
- Implement the request submission endpoint:
# FastAPI example
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from typing import List, Optional
class AccessRequest(BaseModel):
database_id: str
permissions: List[str]
tables: Optional[List[str]] = None
justification: str
ttl_minutes: int = 30
class AccessRequestResponse(BaseModel):
request_id: str
status: str
created_at: datetime
@app.post("/api/v1/requests", response_model=AccessRequestResponse)
async def create_request(
request: AccessRequest,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# Validate database exists
target_db = db.query(TargetDatabase).filter_by(id=request.database_id).first()
if not target_db:
raise HTTPException(404, "Database not found")
# Create request record
db_request = AccessRequestModel(
requester_email=current_user.email,
target_database_id=request.database_id,
requested_permissions=request.permissions,
requested_tables=request.tables,
justification=request.justification,
requested_ttl_minutes=request.ttl_minutes,
status="pending"
)
db.add(db_request)
db.commit()
# Audit log
audit_log(db, "access_requested", {
"request_id": str(db_request.id),
"requester": current_user.email,
"database": target_db.name,
"permissions": request.permissions
})
return AccessRequestResponse(
request_id=str(db_request.id),
status="pending",
created_at=db_request.created_at
)
- Add request listing and status endpoints:
@app.get("/api/v1/requests")
async def list_requests(
status: Optional[str] = None,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
query = db.query(AccessRequestModel).filter_by(
requester_email=current_user.email
)
if status:
query = query.filter_by(status=status)
return query.order_by(AccessRequestModel.created_at.desc()).all()
@app.get("/api/v1/requests/{request_id}")
async def get_request(
request_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
request = db.query(AccessRequestModel).filter_by(
id=request_id,
requester_email=current_user.email
).first()
if not request:
raise HTTPException(404, "Request not found")
return request
Checkpoint: You can submit access requests via the API and see them in the database.
Phase 3: Automatic Credential Generation (Days 3-4)
Goal: When a request is approved, automatically generate database credentials.
Deliverables:
- Credential generator service
- Approval endpoint that triggers generation
- Secure credential delivery
Steps:
- Implement the credential generator:
import secrets
import hashlib
from datetime import datetime, timedelta
class CredentialGenerator:
def __init__(self, admin_conn_string: str):
self.admin_conn = psycopg2.connect(admin_conn_string)
def generate_username(self, requester_email: str) -> str:
"""Generate unique temporary username"""
prefix = requester_email.split("@")[0].replace(".", "_")[:20]
timestamp = datetime.utcnow().strftime("%Y%m%d%H%M")
random_suffix = secrets.token_hex(3)
return f"jit_{prefix}_{timestamp}_{random_suffix}"
def generate_password(self) -> str:
"""Generate cryptographically secure password"""
return secrets.token_urlsafe(32)
def create_credential(
self,
username: str,
password: str,
permissions: List[str],
tables: Optional[List[str]],
ttl_minutes: int
) -> datetime:
"""Create the database user with specified permissions"""
expires_at = datetime.utcnow() + timedelta(minutes=ttl_minutes)
with self.admin_conn.cursor() as cur:
# Create role
cur.execute(f"""
CREATE ROLE {username}
WITH LOGIN
PASSWORD %s
VALID UNTIL %s
CONNECTION LIMIT 5
""", (password, expires_at))
# Grant base permissions
cur.execute(f"GRANT CONNECT ON DATABASE mydb TO {username}")
cur.execute(f"GRANT USAGE ON SCHEMA public TO {username}")
# Grant table permissions
if tables:
for table in tables:
for perm in permissions:
cur.execute(f"GRANT {perm} ON {table} TO {username}")
else:
# Grant on all tables (if allowed by policy)
for perm in permissions:
cur.execute(f"""
GRANT {perm} ON ALL TABLES IN SCHEMA public TO {username}
""")
self.admin_conn.commit()
return expires_at
- Implement the approval endpoint:
@app.post("/api/v1/requests/{request_id}/approve")
async def approve_request(
request_id: str,
approval: ApprovalRequest,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# Verify approver has permission
if not current_user.is_approver:
raise HTTPException(403, "Not authorized to approve requests")
# Get the request
request = db.query(AccessRequestModel).filter_by(id=request_id).first()
if not request:
raise HTTPException(404, "Request not found")
if request.status != "pending":
raise HTTPException(400, f"Request already {request.status}")
# Get target database config
target_db = db.query(TargetDatabase).filter_by(
id=request.target_database_id
).first()
# Generate credentials
generator = CredentialGenerator(target_db.admin_conn_string)
username = generator.generate_username(request.requester_email)
password = generator.generate_password()
ttl = approval.ttl_minutes or request.requested_ttl_minutes
expires_at = generator.create_credential(
username=username,
password=password,
permissions=request.requested_permissions,
tables=request.requested_tables,
ttl_minutes=ttl
)
# Store credential record (hash password for audit)
credential = CredentialModel(
request_id=request.id,
temp_username=username,
password_hash=hashlib.sha256(password.encode()).hexdigest()[:16],
expires_at=expires_at
)
db.add(credential)
# Update request status
request.status = "approved"
# Record approval
db.add(ApprovalModel(
request_id=request.id,
approver_email=current_user.email,
decision="approved",
granted_ttl_minutes=ttl
))
db.commit()
# Audit log
audit_log(db, "credential_created", {
"request_id": str(request.id),
"temp_username": username,
"expires_at": expires_at.isoformat(),
"approved_by": current_user.email
})
# Build connection string
conn_string = (
f"postgresql://{username}:{password}@"
f"{target_db.host}:{target_db.port}/{target_db.database_name}"
)
return {
"username": username,
"password": password, # Only time this is exposed!
"expires_at": expires_at,
"connection_string": conn_string,
"psql_command": f'psql "{conn_string}"'
}
- Add security measures:
# Never log the actual password
import logging
class PasswordFilter(logging.Filter):
def filter(self, record):
record.msg = re.sub(
r'password["\']?\s*[:=]\s*["\']?[^"\'}\s]+',
'password=***REDACTED***',
str(record.msg)
)
return True
# Rate limiting
from slowapi import Limiter
limiter = Limiter(key_func=get_remote_address)
@app.post("/api/v1/requests/{request_id}/approve")
@limiter.limit("10/minute")
async def approve_request(...):
...
Checkpoint: Approving a request creates a working database credential that expires.
Phase 4: TTL-based Revocation with Task Queue (Days 4-5)
Goal: Automatically revoke credentials when they expire.
Deliverables:
- Background scheduler/worker for revocation
- Revocation logic that handles edge cases
- Monitoring for revocation failures
Steps:
- Set up the task queue (Celery for Python, or goroutines for Go):
# Python with Celery
from celery import Celery
from celery.schedules import crontab
celery_app = Celery(
"jit_broker",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
celery_app.conf.beat_schedule = {
"revoke-expired-credentials": {
"task": "tasks.revoke_expired_credentials",
"schedule": 60.0, # Every minute
},
}
- Implement the revocation task:
@celery_app.task(bind=True, max_retries=3)
def revoke_expired_credentials(self):
"""Find and revoke all expired credentials"""
db = get_db_session()
# Find expired but not yet revoked credentials
expired = db.query(CredentialModel).filter(
CredentialModel.expires_at < datetime.utcnow(),
CredentialModel.revoked_at.is_(None)
).all()
for cred in expired:
try:
revoke_single_credential.delay(str(cred.id))
except Exception as e:
logger.error(f"Failed to queue revocation for {cred.id}: {e}")
@celery_app.task(bind=True, max_retries=5)
def revoke_single_credential(self, credential_id: str):
"""Revoke a single credential"""
db = get_db_session()
cred = db.query(CredentialModel).filter_by(id=credential_id).first()
if not cred or cred.revoked_at:
return # Already revoked or doesn't exist
request = db.query(AccessRequestModel).filter_by(id=cred.request_id).first()
target_db = db.query(TargetDatabase).filter_by(
id=request.target_database_id
).first()
try:
# Connect as admin
conn = psycopg2.connect(target_db.admin_conn_string)
with conn.cursor() as cur:
# First, terminate any active connections
cur.execute("""
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE usename = %s
""", (cred.temp_username,))
terminated = cur.fetchall()
# Drop the role
cur.execute(f"DROP ROLE IF EXISTS {cred.temp_username}")
conn.commit()
conn.close()
# Update credential record
cred.revoked_at = datetime.utcnow()
cred.revocation_reason = "ttl_expired"
db.commit()
# Audit log
audit_log(db, "credential_revoked", {
"credential_id": str(cred.id),
"temp_username": cred.temp_username,
"reason": "ttl_expired",
"connections_terminated": len(terminated)
})
logger.info(f"Revoked credential {cred.temp_username}")
except Exception as e:
logger.error(f"Failed to revoke {cred.temp_username}: {e}")
# Retry with exponential backoff
raise self.retry(exc=e, countdown=2 ** self.request.retries)
- Add emergency revocation endpoint:
@app.delete("/api/v1/credentials/{credential_id}")
async def emergency_revoke(
credential_id: str,
reason: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Emergency revocation - immediate, bypasses queue"""
if not current_user.is_admin:
raise HTTPException(403, "Admin access required")
cred = db.query(CredentialModel).filter_by(id=credential_id).first()
if not cred:
raise HTTPException(404, "Credential not found")
# Synchronous revocation - don't queue, do it now
revoke_single_credential(credential_id)
# Update with manual reason
cred.revocation_reason = f"emergency: {reason}"
db.commit()
audit_log(db, "credential_emergency_revoked", {
"credential_id": credential_id,
"revoked_by": current_user.email,
"reason": reason
})
return {"status": "revoked", "credential_id": credential_id}
- Add monitoring for revocation health:
@app.get("/api/v1/health/revocation")
async def revocation_health(db: Session = Depends(get_db)):
"""Check for overdue revocations"""
# Find credentials that should have been revoked but weren't
grace_period = timedelta(minutes=5)
overdue = db.query(CredentialModel).filter(
CredentialModel.expires_at < datetime.utcnow() - grace_period,
CredentialModel.revoked_at.is_(None)
).count()
if overdue > 0:
return {
"status": "unhealthy",
"overdue_revocations": overdue,
"message": f"{overdue} credentials past expiry + grace period"
}
return {
"status": "healthy",
"overdue_revocations": 0
}
Checkpoint: Credentials are automatically revoked within 1-2 minutes of expiry.
Phase 5: Audit Logging and Approval Workflows (Days 5-7)
Goal: Complete the compliance story with comprehensive audit logging and flexible approval workflows.
Deliverables:
- Append-only audit log with tamper detection
- Policy-based auto-approval
- Notification integration (Slack, email)
- Audit query API
Steps:
- Implement comprehensive audit logging:
import hashlib
import json
from datetime import datetime
class AuditLogger:
def __init__(self, db_session):
self.db = db_session
self._last_hash = None
def log(self, event_type: str, data: dict, actor: str = None):
"""Log an audit event with chained hashing for tamper detection"""
event = {
"event_type": event_type,
"timestamp": datetime.utcnow().isoformat(),
"actor": actor,
"data": data
}
# Get previous entry's hash for chaining
prev = self.db.query(AuditLogModel).order_by(
AuditLogModel.id.desc()
).first()
prev_hash = prev.entry_hash if prev else "genesis"
# Compute hash including previous hash (blockchain-style chain)
entry_json = json.dumps(event, sort_keys=True)
entry_hash = hashlib.sha256(
f"{prev_hash}|{entry_json}".encode()
).hexdigest()
# Store
log_entry = AuditLogModel(
event_type=event_type,
event_data=event,
actor=actor,
previous_hash=prev_hash,
entry_hash=entry_hash
)
self.db.add(log_entry)
self.db.commit()
return entry_hash
def verify_chain(self) -> bool:
"""Verify audit log hasn't been tampered with"""
entries = self.db.query(AuditLogModel).order_by(
AuditLogModel.id.asc()
).all()
prev_hash = "genesis"
for entry in entries:
# Recompute hash
entry_json = json.dumps(entry.event_data, sort_keys=True)
expected_hash = hashlib.sha256(
f"{prev_hash}|{entry_json}".encode()
).hexdigest()
if entry.entry_hash != expected_hash:
return False
if entry.previous_hash != prev_hash:
return False
prev_hash = entry.entry_hash
return True
- Implement policy-based auto-approval:
class ApprovalPolicy:
"""Define when requests can be auto-approved"""
policies = [
{
"name": "staging_readonly",
"conditions": {
"database_name_pattern": ".*staging.*",
"permissions": ["SELECT"],
"max_ttl_minutes": 60
},
"action": "auto_approve"
},
{
"name": "production_readonly",
"conditions": {
"database_name_pattern": ".*prod.*",
"permissions": ["SELECT"],
"max_ttl_minutes": 30,
"requester_groups": ["senior_engineers", "sre"]
},
"action": "auto_approve"
},
{
"name": "production_write",
"conditions": {
"database_name_pattern": ".*prod.*",
"permissions": ["INSERT", "UPDATE", "DELETE"]
},
"action": "require_approval",
"approvers": ["db_admins", "manager"]
}
]
def evaluate(self, request: AccessRequest, user: User) -> dict:
"""Evaluate request against policies"""
for policy in self.policies:
if self._matches(request, user, policy["conditions"]):
return {
"policy_name": policy["name"],
"action": policy["action"],
"approvers": policy.get("approvers", [])
}
# Default: require approval
return {
"policy_name": "default",
"action": "require_approval",
"approvers": ["manager"]
}
def _matches(self, request, user, conditions) -> bool:
# Pattern matching logic
...
- Add notification integration:
import httpx
from abc import ABC, abstractmethod
class Notifier(ABC):
@abstractmethod
async def notify_approval_needed(self, request, approvers): ...
@abstractmethod
async def notify_approved(self, request, credential): ...
@abstractmethod
async def notify_denied(self, request, reason): ...
class SlackNotifier(Notifier):
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
async def notify_approval_needed(self, request, approvers):
message = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "Database Access Request"
}
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"*Requester:*\n{request.requester_email}"},
{"type": "mrkdwn", "text": f"*Database:*\n{request.database_name}"},
{"type": "mrkdwn", "text": f"*Permissions:*\n{', '.join(request.permissions)}"},
{"type": "mrkdwn", "text": f"*Justification:*\n{request.justification}"},
]
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "Approve"},
"style": "primary",
"url": f"https://jit.company.com/approve/{request.id}"
},
{
"type": "button",
"text": {"type": "plain_text", "text": "Deny"},
"style": "danger",
"url": f"https://jit.company.com/deny/{request.id}"
}
]
}
]
}
async with httpx.AsyncClient() as client:
await client.post(self.webhook_url, json=message)
- Implement audit query API:
@app.get("/api/v1/audit")
async def query_audit_log(
user: Optional[str] = None,
event_type: Optional[str] = None,
since: Optional[datetime] = None,
until: Optional[datetime] = None,
limit: int = 100,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Query audit log for compliance reporting"""
if not current_user.is_auditor:
raise HTTPException(403, "Auditor access required")
query = db.query(AuditLogModel)
if user:
query = query.filter(AuditLogModel.actor == user)
if event_type:
query = query.filter(AuditLogModel.event_type == event_type)
if since:
query = query.filter(AuditLogModel.created_at >= since)
if until:
query = query.filter(AuditLogModel.created_at <= until)
entries = query.order_by(
AuditLogModel.created_at.desc()
).limit(limit).all()
return {
"entries": [
{
"id": e.id,
"event_type": e.event_type,
"timestamp": e.created_at,
"actor": e.actor,
"data": e.event_data
}
for e in entries
],
"chain_verified": audit_logger.verify_chain()
}
Checkpoint: Complete JIT broker with policy-based approval, notifications, and auditable access logs.
Testing Strategy
Unit Tests
# test_credential_generator.py
import pytest
from unittest.mock import MagicMock, patch
class TestCredentialGenerator:
def test_generate_username_format(self):
gen = CredentialGenerator(None)
username = gen.generate_username("alice.smith@company.com")
assert username.startswith("jit_alice_smith_")
assert len(username) <= 63 # PostgreSQL limit
def test_generate_password_strength(self):
gen = CredentialGenerator(None)
password = gen.generate_password()
assert len(password) >= 32
# URL-safe base64 characters only
assert all(c.isalnum() or c in "-_" for c in password)
def test_generate_unique_passwords(self):
gen = CredentialGenerator(None)
passwords = [gen.generate_password() for _ in range(100)]
assert len(set(passwords)) == 100 # All unique
class TestApprovalPolicy:
def test_staging_auto_approve(self):
policy = ApprovalPolicy()
request = MagicMock(
database_name="staging-db",
permissions=["SELECT"],
ttl_minutes=30
)
user = MagicMock(groups=["developers"])
result = policy.evaluate(request, user)
assert result["action"] == "auto_approve"
def test_production_write_requires_approval(self):
policy = ApprovalPolicy()
request = MagicMock(
database_name="production-db",
permissions=["UPDATE"],
ttl_minutes=30
)
user = MagicMock(groups=["developers"])
result = policy.evaluate(request, user)
assert result["action"] == "require_approval"
assert "db_admins" in result["approvers"]
Integration Tests
# test_integration.py
import pytest
import psycopg2
@pytest.fixture
def test_db():
"""Create a test database with the broker schema"""
conn = psycopg2.connect("postgresql://postgres:postgres@localhost/test_jit")
yield conn
conn.close()
@pytest.fixture
def target_db():
"""Create a target database for credential testing"""
conn = psycopg2.connect("postgresql://postgres:postgres@localhost/target_test")
yield conn
conn.close()
class TestCredentialLifecycle:
def test_create_and_use_credential(self, target_db):
generator = CredentialGenerator(target_db)
# Create credential
username = generator.generate_username("test@example.com")
password = generator.generate_password()
expires_at = generator.create_credential(
username=username,
password=password,
permissions=["SELECT"],
tables=["users"],
ttl_minutes=5
)
# Verify we can connect
conn = psycopg2.connect(
f"postgresql://{username}:{password}@localhost/target_test"
)
cur = conn.cursor()
cur.execute("SELECT 1")
assert cur.fetchone() == (1,)
conn.close()
# Verify permission restrictions
conn = psycopg2.connect(
f"postgresql://{username}:{password}@localhost/target_test"
)
cur = conn.cursor()
with pytest.raises(psycopg2.errors.InsufficientPrivilege):
cur.execute("INSERT INTO users (name) VALUES ('test')")
conn.close()
def test_revocation_terminates_connection(self, target_db):
generator = CredentialGenerator(target_db)
# Create credential
username = generator.generate_username("test@example.com")
password = generator.generate_password()
generator.create_credential(
username=username,
password=password,
permissions=["SELECT"],
tables=["users"],
ttl_minutes=5
)
# Connect
conn = psycopg2.connect(
f"postgresql://{username}:{password}@localhost/target_test"
)
# Revoke
revoker = CredentialRevoker(target_db)
revoker.revoke(username)
# Verify connection is dead
with pytest.raises(psycopg2.OperationalError):
conn.cursor().execute("SELECT 1")
End-to-End Tests
#!/bin/bash
# e2e_test.sh - Full workflow test
set -e
echo "=== E2E Test: JIT Access Broker ==="
# 1. Submit access request
REQUEST_ID=$(curl -s -X POST http://localhost:8000/api/v1/requests \
-H "Authorization: Bearer $USER_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"database_id": "'$TEST_DB_ID'",
"permissions": ["SELECT"],
"tables": ["users"],
"justification": "E2E test",
"ttl_minutes": 2
}' | jq -r '.request_id')
echo "Created request: $REQUEST_ID"
# 2. Approve the request
CREDS=$(curl -s -X POST "http://localhost:8000/api/v1/requests/$REQUEST_ID/approve" \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{"ttl_minutes": 2}')
USERNAME=$(echo $CREDS | jq -r '.username')
PASSWORD=$(echo $CREDS | jq -r '.password')
echo "Got credentials: $USERNAME"
# 3. Test database access
psql "postgresql://$USERNAME:$PASSWORD@localhost/testdb" -c "SELECT 1" || {
echo "FAIL: Could not connect with credentials"
exit 1
}
echo "Successfully connected to database"
# 4. Wait for expiry
echo "Waiting 3 minutes for credential expiry..."
sleep 180
# 5. Verify access revoked
if psql "postgresql://$USERNAME:$PASSWORD@localhost/testdb" -c "SELECT 1" 2>/dev/null; then
echo "FAIL: Credential still works after expiry!"
exit 1
fi
echo "Credential correctly revoked"
# 6. Verify audit log
AUDIT=$(curl -s "http://localhost:8000/api/v1/audit?user=test@example.com" \
-H "Authorization: Bearer $AUDITOR_TOKEN")
if ! echo $AUDIT | jq -e '.entries[] | select(.event_type == "credential_revoked")' > /dev/null; then
echo "FAIL: Revocation not in audit log"
exit 1
fi
echo "=== E2E Test PASSED ==="
Common Pitfalls and Debugging
Pitfall 1: SQL Injection in Username Generation
Symptom: Database errors or security vulnerabilities when creating users.
Cause: Using string formatting instead of proper escaping for user-provided data.
Solution:
# WRONG - SQL injection risk
cur.execute(f"CREATE ROLE {username} WITH PASSWORD '{password}'")
# CORRECT - Use identifier quoting
from psycopg2 import sql
cur.execute(
sql.SQL("CREATE ROLE {} WITH LOGIN PASSWORD %s").format(
sql.Identifier(username)
),
(password,)
)
# Also validate username format
import re
if not re.match(r'^jit_[a-z0-9_]{1,50}$', username):
raise ValueError("Invalid username format")
Pitfall 2: Race Condition in Revocation
Symptom: Credential used after supposed revocation, or duplicate revocation attempts.
Cause: No locking around revocation check and execution.
Solution:
# Use SELECT FOR UPDATE to lock the row
with db.begin():
cred = db.query(CredentialModel).filter_by(
id=credential_id
).with_for_update().first()
if cred.revoked_at:
return # Already revoked by another worker
# Do revocation
revoke_in_database(cred.temp_username)
cred.revoked_at = datetime.utcnow()
db.commit()
Pitfall 3: Credential Leakage in Logs
Symptom: Passwords appearing in application logs, error messages, or APM tools.
Cause: Logging request/response objects that contain credentials.
Solution:
# Create a response model that never serializes the password after creation
class CredentialResponse(BaseModel):
username: str
password: str = Field(exclude=True) # Never serialize
expires_at: datetime
class Config:
json_encoders = {
# Custom encoder that redacts password
}
# Or use a dedicated delivery mechanism
class SecureCredentialDelivery:
def deliver(self, credential: Credential, channel: str):
if channel == "api_response":
# One-time display, immediately cleared from memory
password = credential.get_password_once()
return {"username": credential.username, "password": password}
elif channel == "vault_transit":
# Encrypt with user's public key
...
Pitfall 4: TTL Drift Due to Clock Skew
Symptom: Credentials expire too early or too late on different systems.
Cause: Different clocks on broker server, database server, and client machines.
Solution:
# Always use UTC
from datetime import datetime, timezone
expires_at = datetime.now(timezone.utc) + timedelta(minutes=ttl)
# Store timezone-aware timestamps
# PostgreSQL: TIMESTAMP WITH TIME ZONE
# MySQL: TIMESTAMP (automatically UTC)
# Sync clocks with NTP
# On all servers: systemctl enable chronyd
Pitfall 5: Connection Pool Exhaustion
Symptom: Broker becomes unresponsive; âtoo many connectionsâ errors.
Cause: Each credential creation uses a database connection that isnât properly released.
Solution:
# Use connection pooling
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_timeout=30,
pool_recycle=1800
)
# Always use context managers
with engine.connect() as conn:
conn.execute(...)
# Connection automatically returned to pool
Extensions and Challenges
Extension 1: Multi-Database Support
Extend the broker to support MySQL, Microsoft SQL Server, and MongoDB:
class DatabaseAdapter(ABC):
@abstractmethod
def create_user(self, username, password, permissions, ttl): ...
@abstractmethod
def revoke_user(self, username): ...
class PostgreSQLAdapter(DatabaseAdapter):
...
class MySQLAdapter(DatabaseAdapter):
def create_user(self, username, password, permissions, ttl):
self.conn.execute(f"""
CREATE USER '{username}'@'%'
IDENTIFIED BY '{password}'
PASSWORD EXPIRE INTERVAL {ttl} MINUTE
""")
...
class MongoDBAdapter(DatabaseAdapter):
def create_user(self, username, password, permissions, ttl):
self.db.command("createUser", username, pwd=password, roles=...)
Extension 2: Break-Glass Emergency Access
Implement emergency access for on-call engineers that bypasses normal approval:
@app.post("/api/v1/emergency-access")
async def break_glass_access(
request: EmergencyAccessRequest,
current_user: User = Depends(get_current_user)
):
"""
Immediate access with:
- No approval required
- Shorter TTL (15 min max)
- Alerts sent to security team
- Requires incident ticket
- Recorded specially in audit
"""
if not current_user.is_oncall:
raise HTTPException(403, "Only on-call engineers can break glass")
# Verify incident exists
incident = verify_incident_exists(request.incident_id)
# Generate credential immediately
credential = generate_emergency_credential(
request.database,
max_ttl=15 # Hard cap
)
# Alert security
await notify_security_team(
f"BREAK GLASS: {current_user.email} accessed {request.database} "
f"for incident {request.incident_id}"
)
# Special audit entry
audit_log("emergency_access", {
"user": current_user.email,
"database": request.database,
"incident": request.incident_id,
"credential_id": credential.id
})
return credential
Extension 3: Vault Integration
Replace the custom credential generator with HashiCorp Vault:
import hvac
class VaultCredentialGenerator:
def __init__(self, vault_addr: str, token: str):
self.client = hvac.Client(url=vault_addr, token=token)
def create_credential(self, database_role: str) -> dict:
"""Get dynamic credentials from Vault's database secrets engine"""
creds = self.client.secrets.database.generate_credentials(
name=database_role
)
return {
"username": creds["data"]["username"],
"password": creds["data"]["password"],
"lease_id": creds["lease_id"],
"lease_duration": creds["lease_duration"]
}
def revoke_credential(self, lease_id: str):
"""Explicitly revoke before TTL"""
self.client.sys.revoke_lease(lease_id)
Extension 4: Row-Level Security Integration
Generate credentials with PostgreSQL row-level security policies:
-- Enable RLS on the table
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;
-- Policy: users can only see their team's orders
CREATE POLICY team_orders ON orders
FOR SELECT
USING (team_id = current_setting('app.team_id')::int);
-- When creating the temp user, set their team context
CREATE FUNCTION set_jit_context(team_id int) RETURNS void AS $$
BEGIN
PERFORM set_config('app.team_id', team_id::text, false);
END;
$$ LANGUAGE plpgsql;
-- Temp user can only see their team's data
GRANT EXECUTE ON FUNCTION set_jit_context TO jit_user_xxx;
Books That Will Help
Primary Reading
- âFoundations of Information Securityâ by Jason Andress
- Chapter 5: Access Control
- Chapter 11: Security Assessment and Auditing
- Excellent foundation for understanding access control models
- âZero Trust Networksâ by Evan Gilman and Doug Barth
- Chapter 4: Making Authorization Decisions
- Chapter 5: Trusting Users
- The definitive book on Zero Trust architecture
Supporting Material
- âSecurity in Computingâ by Charles Pfleeger et al.
- Chapter 2: Access Control
- Chapter 5: Database Security
- Deep academic treatment of access control theory
- âPostgreSQL 14 Administration Cookbookâ by Simon Riggs
- Chapter on Role-Based Access Control
- Practical PostgreSQL security implementation
- âPractical Cloud Securityâ by Chris Dotson
- Chapter on Identity and Access Management
- Modern cloud-native approaches to access control
- âVault: A Practical Guide to Managing Secretsâ (HashiCorp Documentation)
- Database Secrets Engine documentation
- Production-grade reference implementation
Interview Questions You Can Now Answer
- âWhat is Just-In-Time access, and why is it important for Zero Trust?â
- JIT access grants permissions only when needed, for a limited time, with audit trail
- Eliminates standing privileges that are Zero Trustâs worst enemy
- Reduces blast radius of compromised credentials
- âHow would you implement automatic credential revocation with high reliability?â
- Combine database-level expiry (VALID UNTIL) with application-level cleanup
- Use task queues with retries for revocation jobs
- Implement monitoring for overdue revocations
- Have emergency revocation endpoint for manual intervention
- âWhat should be logged for compliance with SOC2 access control requirements?â
- Who requested access (identity, authentication method, source IP)
- What was requested (database, tables, permissions)
- Why (justification, ticket reference)
- Who approved (approver identity, timestamp)
- When was access active (creation time, revocation time)
- Logs must be tamper-evident (hash chaining)
- âHow does HashiCorp Vaultâs database secrets engine work?â
- Applications authenticate to Vault
- Request credentials for a named role
- Vault connects to DB as admin, creates temp user
- Returns credentials with a lease (TTL)
- On lease expiry or revocation, Vault drops the user
- âWhatâs the difference between RBAC and ABAC, and when would you use each?â
- RBAC: Role-Based - simple, works for static organizational structures
- ABAC: Attribute-Based - dynamic, considers context (time, location, risk score)
- JIT often uses ABAC policies to determine auto-approval
- âHow would you handle a security incident where you need to revoke all active JIT credentials?â
- Emergency revocation endpoint with admin auth
- Query all active credentials (revoked_at IS NULL)
- Parallel revocation across all databases
- Force-terminate all active sessions
- Audit log with incident reference
Self-Assessment Checklist
Before moving to the next project, verify you can:
Understanding
- Explain why static credentials violate Zero Trust principles
- Describe the JIT access lifecycle (request, approve, grant, use, revoke)
- Explain the difference between database-level expiry and application-level revocation
- Describe what makes an audit log âcompliance-readyâ for SOC2
Implementation
- Create a PostgreSQL user with VALID UNTIL expiry
- Terminate active database connections for a specific user
- Generate cryptographically secure random passwords
- Implement a background job that runs reliably every minute
Security
- Prevent SQL injection when creating dynamic usernames
- Ensure passwords never appear in logs
- Implement rate limiting on credential generation
- Create tamper-evident audit logs
Operations
- Monitor for failed or overdue revocations
- Implement emergency credential revocation
- Export audit logs in SIEM-compatible format
Integration
- Connect your broker to an OAuth2/OIDC provider
- Send notifications to Slack/email for approval requests
- Explain how HashiCorp Vault could replace your custom implementation
Final Milestone: A production-ready JIT access broker where developers never touch permanent database passwords, every access is logged and justified, and credentials die automatically.
Whatâs Next
After completing this project, you have mastered ephemeral credentials and the âLeast Privilegeâ pillar of Zero Trust. Consider these next steps:
- Project 9: ZTNA App Tunnel - Build secure, identity-aware tunnels for application access
- Vault Deep Dive - Implement your broker using Vaultâs database secrets engine
- Integrate with Project 1 - Add your JIT broker behind the Identity-Aware Reverse Proxy
Remember: Static credentials are technical debt with security interest. Every permanent password in your infrastructure is a liability waiting to become an incident.