DataDrain 馃毎
DataDrain es un micro-framework de nivel empresarial dise帽ado para extraer, archivar y purgar datos hist贸ricos desde bases de datos PostgreSQL transaccionales, as铆 como para ingerir archivos crudos (CSV, JSON, Parquet), hacia un Data Lake anal铆tico.
Utiliza DuckDB en memoria para lograr velocidades de procesamiento y compresi贸n extremas. Garantiza la retenci贸n segura de datos mediante chequeos de integridad estrictos antes de purgar las bases de datos de origen, y automatiza la conversi贸n y subida de archivos pesados a la nube.
Caracter铆sticas Principales
- ETL de Alto Rendimiento: Transfiere millones de registros desde Postgres a Parquet utilizando DuckDB sin cargar los objetos en la memoria RAM de Ruby.
- File Ingestion: Convierte archivos crudos masivos (ej. logs de Netflow en CSV) a Parquet (ZSTD) y los sube directamente a S3 en milisegundos.
-
Hive Partitioning: Organiza autom谩ticamente los archivos en carpetas optimizadas para consultas (
year=X/month=Y/tenant_id=Z). - Storage Adapters: Soporte nativo y transparente para almacenamiento en Disco Local y AWS S3.
-
Integridad Garantizada: Verifica matem谩ticamente que los datos exportados coincidan exactamente con el origen antes de ejecutar sentencias
DELETE. -
ORM Anal铆tico Integrado: Incluye una clase base (
DataDrain::Record) compatible conActiveModelpara consultar y destruir particiones hist贸ricas de forma idiom谩tica.
Instalaci贸n
Agrega esta l铆nea al Gemfile de tu aplicaci贸n o microservicio:
gem 'data_drain', git: '[https://github.com/tu-organizacion/data_drain.git](https://github.com/tu-organizacion/data_drain.git)', branch: 'main'Y ejecuta:
$ bundle installConfiguraci贸n
Crea un inicializador en tu aplicaci贸n (ej. config/initializers/data_drain.rb) para configurar las credenciales y el comportamiento del motor:
DataDrain.configure do |config|
# Almacenamiento (:local o :s3)
config.storage_mode = ENV.fetch('STORAGE_MODE', 'local').to_sym
# AWS S3 (Requerido solo si storage_mode es :s3)
# config.aws_region = ENV['AWS_REGION']
# config.aws_access_key_id = ENV['AWS_ACCESS_KEY_ID']
# config.aws_secret_access_key = ENV['AWS_SECRET_ACCESS_KEY']
# Base de Datos PostgreSQL de Origen (Requerido solo para DataDrain::Engine)
config.db_host = ENV.fetch('DB_HOST', '127.0.0.1')
config.db_port = ENV.fetch('DB_PORT', '5432')
config.db_user = ENV.fetch('DB_USER', 'postgres')
config.db_pass = ENV.fetch('DB_PASS', '')
config.db_name = ENV.fetch('DB_NAME', 'core_production')
# Rendimiento y Tuning de Postgres
config.batch_size = 5000 # Registros a borrar por transacci贸n
config.throttle_delay = 0.5 # Segundos de pausa entre borrados
# Timeout de inactividad de transacciones en PostgreSQL (en milisegundos).
# 脷til establecerlo en 0 para evitar que la conexi贸n se cierre prematuramente
# durante el borrado de grandes vol煤menes de datos.
config.idle_in_transaction_session_timeout = 0
config.logger = Rails.logger
# Tuning de DuckDB
# L铆mite m谩ximo de RAM para las consultas en memoria de DuckDB (ej. '2GB', '512MB').
# Evita que el proceso OOM (Out Of Memory) si el contenedor o servidor tiene memoria limitada.
config.limit_ram = '2GB'
# Directorio temporal de DuckDB para desbordar memoria (spill to disk) durante
# transformaciones pesadas o creaci贸n de archivos Parquet masivos.
# Es muy recomendable que este directorio resida en un disco SSD/NVMe r谩pido.
config.tmp_directory = '/tmp/duckdb_work'
endUso
El framework provee tres herramientas principales: Ingestor de Archivos, Drenaje de Base de Datos, y el ORM Anal铆tico.
1. Ingesti贸n de Archivos Crudos (FileIngestor)
Ideal para servicios que generan grandes vol煤menes de datos (ej. m茅tricas de Netflow). Toma un archivo local, lo transforma, lo comprime a Parquet y lo sube particionado a S3.
# Un archivo generado temporalmente por tu servicio
archivo_temporal = "/tmp/netflow_metrics_1600.csv"
ingestor = DataDrain::FileIngestor.new(
bucket: 'my-bucket-store',
source_path: archivo_temporal,
folder_name: 'netflow',
# Particionamos din谩micamente seg煤n columnas extra铆das al vuelo
partition_keys: %w[year month isp_id],
# Transformaci贸n SQL ejecutada por DuckDB durante la lectura
select_sql: "*, EXTRACT(YEAR FROM timestamp) AS year, EXTRACT(MONTH FROM timestamp) AS month",
delete_after_upload: true # Limpia el archivo temporal al terminar
)
ingestor.call2. Extracci贸n y Purga de BD (Engine)
Ideal para crear Ventanas Rodantes de retenci贸n (ej. mantener solo 6 meses de datos vivos en Postgres y archivar el resto).
# lib/tasks/archive.rake
task versions: :environment do
target_date = 6.months.ago.beginning_of_month
select_sql = <<~SQL
id, item_type, item_id, event, whodunnit,
object::VARCHAR AS object,
object_changes::VARCHAR AS object_changes,
created_at,
EXTRACT(YEAR FROM created_at)::INT AS year,
EXTRACT(MONTH FROM created_at)::INT AS month,
isp_id
SQL
engine = DataDrain::Engine.new(
bucket: 'my-bucket-store',
start_date: target_date.beginning_of_month,
end_date: target_date.end_of_month,
table_name: 'versions',
select_sql: select_sql,
partition_keys: %w[year month isp_id],
where_clause: "event = 'update'"
)
# Cuenta, exporta a Parquet, verifica integridad y purga Postgres.
engine.call
end3. Consultar el Data Lake (Record)
Para consultar los datos archivados sin salir de Ruby, crea un modelo que herede de DataDrain::Record.
# app/models/archived_version.rb
class ArchivedVersion < DataDrain::Record
self.bucket = 'my-bucket-storage'
self.folder_name = 'versions'
self.partition_keys = [:year, :month, :isp_id]
attribute :id, :string
attribute :item_type, :string
attribute :item_id, :string
attribute :event, :string
attribute :whodunnit, :string
attribute :created_at, :datetime
# Utiliza el tipo :json provisto por la gema para hidratar Hashes
attribute :object, :json
attribute :object_changes, :json
endConsultas altamente optimizadas mediante Hive Partitioning:
# B煤squeda puntual hiper-r谩pida aislando las particiones
version = ArchivedVersion.find("un-uuid", year: 2026, month: 3, isp_id: 42)
puts version.object_changes # => {"status" => ["active", "suspended"]}
# Colecciones
history = ArchivedVersion.where(limit: 10, year: 2026, month: 3, isp_id: 42)4. Destrucci贸n de Datos (Retenci贸n y Cumplimiento)
El framework permite eliminar f铆sicamente carpetas completas en S3 o Local utilizando comodines.
# Elimina todo el historial de un cliente en espec铆fico a trav茅s de todos los a帽os
ArchivedVersion.destroy_all(isp_id: 42)
# Elimina todos los datos de marzo de 2024 globalmente
ArchivedVersion.destroy_all(year: 2024, month: 3)Arquitectura
DataDrain implementa el patr贸n Storage Adapter, lo que permite aislar completamente la l贸gica del sistema de archivos de los motores de procesamiento.
- DuckDB mantiene una conexi贸n persistente (
Thread-Safe) para maximizar el rendimiento de las consultas web. - El ORM Anal铆tico incluye sanitizaci贸n de par谩metros para prevenir Inyecci贸n SQL al consultar archivos Parquet.
Licencia
La gema est谩 disponible como c贸digo abierto bajo los t茅rminos de la Licencia MIT.