DeDupe
A Ruby gem for distributed deduplication and locking using Redis Sorted Sets. DeDupe provides a simple and efficient way to prevent duplicate execution of tasks across multiple processes or servers, with automatic TTL-based expiration and cleanup.
Features
- ๐ Distributed Locking: Prevent duplicate execution across multiple processes/servers
- โฑ๏ธ TTL-based Expiration: Automatic expiration of locks with configurable time-to-live
- ๐งน Automatic Cleanup: Expired entries are automatically removed
- ๐ฆ Multiple ID Management: Handle multiple locks/IDs within the same namespace
- ๐ Simple API: Easy-to-use interface with block support
- ๐ Redis-backed: Uses Redis Sorted Sets for efficient storage and querying
Installation
Add this line to your application's Gemfile:
gem "de-dupe"And then execute:
$ bundle installOr install it yourself as:
$ gem install de-dupeConfiguration
Configure DeDupe with your Redis connection:
require "de-dupe"
require "connection_pool"
DeDupe.configure do |config|
# Redis connection (supports ConnectionPool or Redis instance)
config.redis = ConnectionPool.new(size: 10, timeout: 1) do
Redis.new(url: ENV.fetch("REDIS_URL", "redis://localhost:6379"))
end
# Namespace for all keys (default: "de-dupe")
config.namespace = "my-app"
# Default TTL in seconds (default: 300 = 5 minutes)
config.expires_in = 3600 # 1 hour
endConfiguration Options
-
redis: Redis connection instance or ConnectionPool (required) -
namespace: Prefix for all Redis keys (default:"de-dupe") -
expires_in: Default TTL in seconds for locks (default:300)
Usage
Simple Locking with DeDupe.acquire
The simplest way to use DeDupe is with the acquire method:
# Prevent duplicate execution of a long-running job
DeDupe.acquire("import", "user-12345", ttl: 3600) do
# This block will only execute if the lock can be acquired
# If another process is already running this, the block won't execute
import_user_data("user-12345")
end
# With multiple namespace levels
DeDupe.acquire("import", "users", "batch-2024-01-01", ttl: 7200) do
process_batch("batch-2024-01-01")
endNote: The last argument is the lock ID, all previous arguments form the namespace.
Using the Lock Class
For more control, use the Lock class directly:
# Create a lock
lock = DeDupe::Lock.new(
lock_key: "import:users",
lock_id: "user-12345",
ttl: 3600 # optional, uses config default if not provided
)
# Check if locked
if lock.locked?
puts "Already processing"
else
# Acquire the lock
if lock.acquire
begin
# Do work
process_user("user-12345")
ensure
# Always release
lock.release
end
end
end
# Or use with_lock for automatic release
lock.with_lock do
# Block only executes if lock can be acquired
# Lock is automatically released after block execution (even on error)
process_user("user-12345")
endUsing the Dataset Class
The Dataset class allows you to manage multiple IDs within the same namespace:
# Create a dataset
dataset = DeDupe::Dataset.new("import:users", ttl: 3600)
# Acquire multiple IDs at once
if dataset.acquire("user-1", "user-2", "user-3")
puts "All IDs acquired"
end
# Check which IDs are locked
locked = dataset.locked_members("user-1", "user-2", "user-3")
# => ["user-1", "user-2", "user-3"]
# Check which IDs are unlocked
unlocked = dataset.unlocked_members("user-1", "user-2", "user-3")
# => []
# Release specific IDs
dataset.release("user-1", "user-2")
# Check lock status for multiple IDs
if dataset.locked?("user-1", "user-2", "user-3")
puts "At least one ID is locked"
end
# Flush all entries
dataset.flush
# Manually clean up expired entries
dataset.flush_expired_membersAPI Reference
DeDupe.acquire(*keys, ttl: nil, &block)
Convenience method for acquiring a lock and executing a block.
-
*keys: Namespace components (last one becomes the lock_id) -
ttl: Optional TTL in seconds (uses config default if not provided) -
&block: Block to execute if lock is acquired
Returns: Result of the block, or nil if lock couldn't be acquired
DeDupe::Lock
new(lock_key:, lock_id:, ttl: nil)
Create a new lock instance.
-
lock_key: Namespace/group for the lock -
lock_id: Unique identifier within the namespace -
ttl: Optional TTL in seconds
acquire / lock
Attempt to acquire the lock. Returns true if acquired, false if already exists.
release / unlock
Release the lock. Returns true if released, false if lock didn't exist.
locked?
Check if the lock is currently active (exists and not expired). Returns true or false.
with_lock(&block)
Acquire lock, execute block, and automatically release lock. Block only executes if lock can be acquired.
- Returns block result if lock acquired,
nilotherwise - Always releases lock, even if block raises an exception
- Propagates exceptions from the block
DeDupe::Dataset
new(lock_key, ttl: nil)
Create a new dataset instance.
-
lock_key: Namespace for the dataset -
ttl: Optional TTL in seconds
acquire(*ids) / lock(*ids)
Acquire locks for multiple IDs. Returns true if at least one ID was acquired.
release(*ids) / unlock(*ids)
Release locks for multiple IDs. Returns true if at least one ID was released.
locked?(*ids)
Check if any of the given IDs are locked. Returns true if at least one is locked.
locked_members(*ids)
Return array of IDs that are currently locked (not expired).
unlocked_members(*ids)
Return array of IDs that are unlocked (don't exist or expired).
flush
Remove all entries from the dataset.
flush_expired_members
Remove all expired entries from the dataset (automatically called during acquire).
Examples
Preventing Duplicate Job Execution
# In a background job processor
class ImportUserJob
def perform(user_id)
DeDupe.acquire("import", "user-#{user_id}", ttl: 3600) do
# Only one instance of this job will run per user_id
UserImporter.new(user_id).import
end
end
endBatch Processing with Deduplication
# Process a batch, ensuring each item is only processed once
dataset = DeDupe::Dataset.new("batch:process-#{batch_id}", ttl: 7200)
items_to_process.each do |item|
next if dataset.locked?(item.id) # Skip if already processing
if dataset.acquire(item.id)
begin
process_item(item)
ensure
dataset.release(item.id)
end
end
endRate Limiting with Locking
# Ensure only one API call per user per minute
user_id = current_user.id
lock = DeDupe::Lock.new(
lock_key: "api:rate-limit",
lock_id: "user-#{user_id}",
ttl: 60 # 1 minute
)
unless lock.locked?
lock.with_lock do
make_api_call(user_id)
end
else
puts "Rate limit: Please wait before making another request"
endParallel Processing Safety
# Ensure only one worker processes a task, even in parallel environments
task_id = "task-12345"
DeDupe.acquire("workers", "process", task_id, ttl: 300) do
# This will only execute in one worker, even if multiple workers
# try to process the same task simultaneously
process_task(task_id)
endHow It Works
DeDupe uses Redis Sorted Sets to store locks with expiration timestamps as scores:
-
Lock Acquisition: When you acquire a lock, it's stored in Redis with a score of
current_time + ttl - Lock Checking: A lock is considered active if its score (expiration time) is greater than the current time
- Automatic Cleanup: Expired entries (where score <= current_time) are automatically removed
- Distributed: Since it uses Redis, locks work across multiple processes and servers
Requirements
- Ruby >= 2.7
- Redis server
-
redisgem -
zeitwerkgem
Development
After checking out the repo, run:
bundle installRun tests:
bundle exec rspecRun RuboCop:
bundle exec rubocopContributing
Bug reports and pull requests are welcome on GitHub at https://github.com/marcosgz/de-dupe.
License
The gem is available as open source under the terms of the MIT License.