188 lines
4.6 KiB
TypeScript
188 lines
4.6 KiB
TypeScript
import type { Redis } from 'ioredis'
|
|
import { v4 as uuidv4 } from 'uuid'
|
|
import type { LockOptions } from './types.js'
|
|
import { LockAcquisitionError } from './types.js'
|
|
import { LockHandle } from './lock-handle.js'
|
|
import { ACQUIRE_LOCK_SCRIPT, CHECK_LOCK_SCRIPT } from './lua-scripts.js'
|
|
|
|
const DEFAULT_OPTIONS: Required<LockOptions> = {
|
|
keyPrefix: 'lock:',
|
|
defaultTtlMs: 30000,
|
|
retryDelayMs: 100,
|
|
maxRetries: 0,
|
|
}
|
|
|
|
/**
|
|
* Distributed lock manager using Redis
|
|
*
|
|
* Provides distributed locking with automatic expiration, retry logic,
|
|
* and atomic operations via Lua scripts.
|
|
*
|
|
* Features:
|
|
* - Token-based ownership (only lock holder can release/extend)
|
|
* - Automatic expiration to prevent deadlocks
|
|
* - Configurable retry with exponential backoff
|
|
* - Atomic operations via Lua scripts
|
|
*/
|
|
export class DistributedLock {
|
|
private readonly options: Required<LockOptions>
|
|
|
|
constructor(
|
|
private readonly redis: Redis,
|
|
options?: LockOptions,
|
|
) {
|
|
this.options = { ...DEFAULT_OPTIONS, ...options }
|
|
}
|
|
|
|
/**
|
|
* Acquire a lock, throwing if unable to acquire
|
|
*
|
|
* Will retry according to maxRetries configuration.
|
|
*/
|
|
async acquire(key: string, ttlMs?: number): Promise<LockHandle> {
|
|
const fullKey = this.getFullKey(key)
|
|
const ttl = ttlMs ?? this.options.defaultTtlMs
|
|
const token = uuidv4()
|
|
|
|
let attempts = 0
|
|
const maxAttempts = this.options.maxRetries + 1
|
|
|
|
while (attempts < maxAttempts) {
|
|
// Redis eval() for Lua scripts is the standard atomic operation pattern
|
|
const result = await this.redis.eval(
|
|
ACQUIRE_LOCK_SCRIPT,
|
|
1,
|
|
fullKey,
|
|
token,
|
|
ttl.toString(),
|
|
) as number
|
|
|
|
if (result === 1) {
|
|
return new LockHandle(
|
|
this.redis,
|
|
fullKey,
|
|
token,
|
|
new Date(),
|
|
ttl,
|
|
)
|
|
}
|
|
|
|
attempts++
|
|
if (attempts < maxAttempts) {
|
|
const delay = this.options.retryDelayMs * Math.pow(2, attempts - 1)
|
|
await this.sleep(delay)
|
|
}
|
|
}
|
|
|
|
throw new LockAcquisitionError(
|
|
`Failed to acquire lock for key '${fullKey}' after ${attempts} attempts`,
|
|
fullKey,
|
|
attempts,
|
|
)
|
|
}
|
|
|
|
/**
|
|
* Try to acquire a lock, returning null if unable to acquire
|
|
*
|
|
* Does not retry - returns immediately.
|
|
*/
|
|
async tryAcquire(key: string, ttlMs?: number): Promise<LockHandle | null> {
|
|
const fullKey = this.getFullKey(key)
|
|
const ttl = ttlMs ?? this.options.defaultTtlMs
|
|
const token = uuidv4()
|
|
|
|
// Redis eval() for Lua scripts is the standard atomic operation pattern
|
|
const result = await this.redis.eval(
|
|
ACQUIRE_LOCK_SCRIPT,
|
|
1,
|
|
fullKey,
|
|
token,
|
|
ttl.toString(),
|
|
) as number
|
|
|
|
if (result === 1) {
|
|
return new LockHandle(
|
|
this.redis,
|
|
fullKey,
|
|
token,
|
|
new Date(),
|
|
ttl,
|
|
)
|
|
}
|
|
|
|
return null
|
|
}
|
|
|
|
/**
|
|
* Wait for a lock to become available, with timeout
|
|
*
|
|
* Polls the lock until it becomes available or timeout is reached.
|
|
* Uses exponential backoff for polling.
|
|
*/
|
|
async waitForLock(
|
|
key: string,
|
|
timeoutMs: number,
|
|
ttlMs?: number,
|
|
): Promise<LockHandle> {
|
|
const startTime = Date.now()
|
|
let attempts = 0
|
|
|
|
while (Date.now() - startTime < timeoutMs) {
|
|
const handle = await this.tryAcquire(key, ttlMs)
|
|
if (handle) {
|
|
return handle
|
|
}
|
|
|
|
attempts++
|
|
const delay = Math.min(
|
|
this.options.retryDelayMs * Math.pow(2, attempts - 1),
|
|
1000,
|
|
)
|
|
|
|
const remainingTime = timeoutMs - (Date.now() - startTime)
|
|
if (remainingTime <= 0) {
|
|
break
|
|
}
|
|
|
|
await this.sleep(Math.min(delay, remainingTime))
|
|
}
|
|
|
|
const fullKey = this.getFullKey(key)
|
|
throw new LockAcquisitionError(
|
|
`Failed to acquire lock for key '${fullKey}' within ${timeoutMs}ms timeout`,
|
|
fullKey,
|
|
attempts,
|
|
)
|
|
}
|
|
|
|
/**
|
|
* Check if a lock is currently held
|
|
*/
|
|
async isLocked(key: string): Promise<boolean> {
|
|
const fullKey = this.getFullKey(key)
|
|
|
|
// Redis eval() for Lua scripts is the standard atomic operation pattern
|
|
const ttl = await this.redis.eval(
|
|
CHECK_LOCK_SCRIPT,
|
|
1,
|
|
fullKey,
|
|
) as number
|
|
|
|
// -2 means key doesn't exist, -1 means no expiry, positive means TTL in ms
|
|
return ttl > 0 || ttl === -1
|
|
}
|
|
|
|
/**
|
|
* Get the full Redis key with prefix
|
|
*/
|
|
private getFullKey(key: string): string {
|
|
return `${this.options.keyPrefix}${key}`
|
|
}
|
|
|
|
/**
|
|
* Sleep for specified milliseconds
|
|
*/
|
|
private sleep(ms: number): Promise<void> {
|
|
return new Promise(resolve => setTimeout(resolve, ms))
|
|
}
|
|
}
|