Project

data_drain

0.0
The project is in a healthy, maintained state
Extrae datos transaccionales, los archiva en un Data Lake (S3/Local) en formato Parquet usando Hive Partitioning, y purga el origen de forma segura.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
 Dependencies

Runtime

~> 1.114
~> 1.4
>= 1.2
 Project Readme

DataDrain 馃毎

DataDrain es un micro-framework de nivel empresarial dise帽ado para extraer, archivar y purgar datos hist贸ricos desde bases de datos PostgreSQL transaccionales, as铆 como para ingerir archivos crudos (CSV, JSON, Parquet), hacia un Data Lake anal铆tico.

Utiliza DuckDB en memoria para lograr velocidades de procesamiento y compresi贸n extremas. Garantiza la retenci贸n segura de datos mediante chequeos de integridad estrictos antes de purgar las bases de datos de origen, y automatiza la conversi贸n y subida de archivos pesados a la nube.

Caracter铆sticas Principales

  • ETL de Alto Rendimiento: Transfiere millones de registros desde Postgres a Parquet utilizando DuckDB sin cargar los objetos en la memoria RAM de Ruby.
  • File Ingestion: Convierte archivos crudos masivos (ej. logs de Netflow en CSV) a Parquet (ZSTD) y los sube directamente a S3 en milisegundos.
  • Hive Partitioning: Organiza autom谩ticamente los archivos en carpetas optimizadas para consultas (year=X/month=Y/tenant_id=Z).
  • Storage Adapters: Soporte nativo y transparente para almacenamiento en Disco Local y AWS S3.
  • Integridad Garantizada: Verifica matem谩ticamente que los datos exportados coincidan exactamente con el origen antes de ejecutar sentencias DELETE.
  • ORM Anal铆tico Integrado: Incluye una clase base (DataDrain::Record) compatible con ActiveModel para consultar y destruir particiones hist贸ricas de forma idiom谩tica.

Instalaci贸n

Agrega esta l铆nea al Gemfile de tu aplicaci贸n o microservicio:

gem 'data_drain', git: '[https://github.com/tu-organizacion/data_drain.git](https://github.com/tu-organizacion/data_drain.git)', branch: 'main'

Y ejecuta:

$ bundle install

Configuraci贸n

Crea un inicializador en tu aplicaci贸n (ej. config/initializers/data_drain.rb) para configurar las credenciales y el comportamiento del motor:

DataDrain.configure do |config|
  # Almacenamiento (:local o :s3)
  config.storage_mode = ENV.fetch('STORAGE_MODE', 'local').to_sym

  # AWS S3 (Requerido solo si storage_mode es :s3)
  # config.aws_region = ENV['AWS_REGION']
  # config.aws_access_key_id = ENV['AWS_ACCESS_KEY_ID']
  # config.aws_secret_access_key = ENV['AWS_SECRET_ACCESS_KEY']

  # Base de Datos PostgreSQL de Origen (Requerido solo para DataDrain::Engine)
  config.db_host = ENV.fetch('DB_HOST', '127.0.0.1')
  config.db_port = ENV.fetch('DB_PORT', '5432')
  config.db_user = ENV.fetch('DB_USER', 'postgres')
  config.db_pass = ENV.fetch('DB_PASS', '')
  config.db_name = ENV.fetch('DB_NAME', 'core_production')

  # Rendimiento y Tuning de Postgres
  config.batch_size     = 5000 # Registros a borrar por transacci贸n
  config.throttle_delay = 0.5  # Segundos de pausa entre borrados
  
  # Timeout de inactividad de transacciones en PostgreSQL (en milisegundos).
  # 脷til establecerlo en 0 para evitar que la conexi贸n se cierre prematuramente 
  # durante el borrado de grandes vol煤menes de datos.
  config.idle_in_transaction_session_timeout = 0
  
  config.logger         = Rails.logger

  # Tuning de DuckDB
  # L铆mite m谩ximo de RAM para las consultas en memoria de DuckDB (ej. '2GB', '512MB').
  # Evita que el proceso OOM (Out Of Memory) si el contenedor o servidor tiene memoria limitada.
  config.limit_ram      = '2GB'
  
  # Directorio temporal de DuckDB para desbordar memoria (spill to disk) durante
  # transformaciones pesadas o creaci贸n de archivos Parquet masivos.
  # Es muy recomendable que este directorio resida en un disco SSD/NVMe r谩pido.
  config.tmp_directory  = '/tmp/duckdb_work'
end

Uso

El framework provee tres herramientas principales: Ingestor de Archivos, Drenaje de Base de Datos, y el ORM Anal铆tico.

1. Ingesti贸n de Archivos Crudos (FileIngestor)

Ideal para servicios que generan grandes vol煤menes de datos (ej. m茅tricas de Netflow). Toma un archivo local, lo transforma, lo comprime a Parquet y lo sube particionado a S3.

# Un archivo generado temporalmente por tu servicio
archivo_temporal = "/tmp/netflow_metrics_1600.csv"

ingestor = DataDrain::FileIngestor.new(
  bucket: 'my-bucket-store',
  source_path: archivo_temporal,
  folder_name: 'netflow',
  # Particionamos din谩micamente seg煤n columnas extra铆das al vuelo
  partition_keys: %w[year month isp_id],
  # Transformaci贸n SQL ejecutada por DuckDB durante la lectura
  select_sql: "*, EXTRACT(YEAR FROM timestamp) AS year, EXTRACT(MONTH FROM timestamp) AS month",
  delete_after_upload: true # Limpia el archivo temporal al terminar
)

ingestor.call

2. Extracci贸n y Purga de BD (Engine)

Ideal para crear Ventanas Rodantes de retenci贸n (ej. mantener solo 6 meses de datos vivos en Postgres y archivar el resto).

# lib/tasks/archive.rake
task versions: :environment do
  target_date = 6.months.ago.beginning_of_month

  select_sql = <<~SQL
    id, item_type, item_id, event, whodunnit,
    object::VARCHAR AS object,
    object_changes::VARCHAR AS object_changes,
    created_at,
    EXTRACT(YEAR FROM created_at)::INT AS year,
    EXTRACT(MONTH FROM created_at)::INT AS month,
    isp_id
  SQL

  engine = DataDrain::Engine.new(
    bucket:         'my-bucket-store',
    start_date:     target_date.beginning_of_month,
    end_date:       target_date.end_of_month,
    table_name:     'versions',
    select_sql:     select_sql,
    partition_keys: %w[year month isp_id],
    where_clause:   "event = 'update'"
  )

  # Cuenta, exporta a Parquet, verifica integridad y purga Postgres.
  engine.call
end

3. Consultar el Data Lake (Record)

Para consultar los datos archivados sin salir de Ruby, crea un modelo que herede de DataDrain::Record.

# app/models/archived_version.rb
class ArchivedVersion < DataDrain::Record
  self.bucket = 'my-bucket-storage'
  self.folder_name = 'versions'
  self.partition_keys = [:year, :month, :isp_id]

  attribute :id, :string
  attribute :item_type, :string
  attribute :item_id, :string
  attribute :event, :string
  attribute :whodunnit, :string
  attribute :created_at, :datetime

  # Utiliza el tipo :json provisto por la gema para hidratar Hashes
  attribute :object, :json
  attribute :object_changes, :json
end

Consultas altamente optimizadas mediante Hive Partitioning:

# B煤squeda puntual hiper-r谩pida aislando las particiones
version = ArchivedVersion.find("un-uuid", year: 2026, month: 3, isp_id: 42)
puts version.object_changes # => {"status" => ["active", "suspended"]}

# Colecciones
history = ArchivedVersion.where(limit: 10, year: 2026, month: 3, isp_id: 42)

4. Destrucci贸n de Datos (Retenci贸n y Cumplimiento)

El framework permite eliminar f铆sicamente carpetas completas en S3 o Local utilizando comodines.

# Elimina todo el historial de un cliente en espec铆fico a trav茅s de todos los a帽os
ArchivedVersion.destroy_all(isp_id: 42)

# Elimina todos los datos de marzo de 2024 globalmente
ArchivedVersion.destroy_all(year: 2024, month: 3)

Arquitectura

DataDrain implementa el patr贸n Storage Adapter, lo que permite aislar completamente la l贸gica del sistema de archivos de los motores de procesamiento.

  • DuckDB mantiene una conexi贸n persistente (Thread-Safe) para maximizar el rendimiento de las consultas web.
  • El ORM Anal铆tico incluye sanitizaci贸n de par谩metros para prevenir Inyecci贸n SQL al consultar archivos Parquet.

Licencia

La gema est谩 disponible como c贸digo abierto bajo los t茅rminos de la Licencia MIT.