Project

kube_queue

0.01
No release in over 3 years
Low commit activity in last 3 years
A background job processing with Kubernetes job for Ruby
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies
 Project Readme

KubeQueue

Build Status

Installation

Add this line to your application's Gemfile:

gem 'kube_queue'

And then execute:

$ bundle

Or install it yourself as:

$ gem install kube_queue

Getting Started

Implement worker:

class TestWorker
  include KubeQueue::Worker

  job_name 'kube-queue-test'
  image "my-registry/my-image"
  container_name 'kube-queue-test'

  command 'bundle', 'exec', 'kube_queue', 'TestWorker', '-r', './test_worker.rb'

  def perform(payload)
    puts payload['message']
  end
end

Setting kubernetes configuration.

KubeQueue.kubernetes_configure do |client|
  client.url = ENV['K8S_URL']
  client.ssl_ca_file = ENV['K8S_CA_CERT_FILE']
  client.auth_token = File.read(ENV['K8S_TOKEN'])
end

and run:

TestWorker.enqueue(message: 'hello')

# delay
TestWorker.enqueue_at(message: 'hello', Time.now + 100)

ActiveJob Support

Write to application.rb:

Rails.application.config.active_job.adapter = :kube_queue

Just put your job into app/jobs . Example:

# app/jobs/print_message_job.rb
class PrintMessageJob < ApplicationJob
  include KubeQueue::Worker

  worker_name 'print-message-job'
  image "your-registry/your-image"
  container_name 'your-container-name'

  def perform(payload)
    logger.info payload[:message]
  end
end

and run:

irb(main):001:0> job = PrintMessageJob.perform_later(message: 'hello, kubernetes!')
Enqueued PrintMessageJob (Job ID: 0bf15b35-62d8-4380-9173-99839ce735ff) to KubeQueue(default) with arguments: {:message=>"hello, kubernetes!"}
=> #<PrintMessageJob:0x00007fbfd00c7848 @arguments=[{:message=>"hello, kubernetes!"}], @job_id="0bf15b35-62d8-4380-9173-99839ce735ff", @queue_name="default", @priority=nil, @executions=0>
irb(main):002:0> job.status
=> #<K8s::Resource startTime="2019-08-12T15:56:37Z", active=1>
irb(main):003:0> job.status
=> #<K8s::Resource conditions=[{:type=>"Complete", :status=>"True", :lastProbeTime=>"2019-08-12T15:57:03Z", :lastTransitionTime=>"2019-08-12T15:57:03Z"}], startTime="2019-08-12T15:56:37Z", completionTime="2019-08-12T15:57:03Z", succeeded=1>

See more examples in here.

Run job on locally

bundle exec kube_queue runner JOB_NAME [PAYLOAD]

See more information by kube_queue help or read here.

Advanced Tips

Get a job status

job = ComputePiJob.perform_later
job.status

scheduled job dosent supported now.

Check a generating manifest

# from class
puts ComputePiJob.manifest

# from instance
job = ComputePiJob.perform_later
puts job.manifest

Retry job

Kubernetes Job has a own retry mechanism, if set backoff_limit and/or restart_policy to use it.

class ComputePiJob
  include KubeQueue::Worker

  worker_name 'pi'
  image 'perl'
  container_name 'pi'
  command "perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"

  backoff_limit 10
  restart_policy 'Never'
end

More information, see the official document here.

Timeout

Kubernetes Job has a own timeout mechanism, if set the active_deadline_seconds to use it.

class ComputePiJob
  include KubeQueue::Worker

  worker_name 'pi'
  image 'perl'
  container_name 'pi'
  command "perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"

  active_deadline_seconds 300
end

More information, see the official document here.

Managing container resources

When you specify a Pod, you can optional specify hou much CPU and memory container needs.

class ComputePiJob
  include KubeQueue::Worker

  worker_name 'pi'
  image 'perl'
  container_name 'pi'
  command "perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"

  cpu_limit '0.3'
  cpu_request '0.2'
  memory_limit '100m'
  memory_request '50m'
end

More information, see the official document here.

Use environment variable from ConfigMap/Secret

class ComputePiJob
  include KubeQueue::Worker

  worker_name 'pi'
  image 'perl'
  container_name 'pi'
  command "perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"

  env_from_secret 'mysecret1', 'mysecret2'
  env_from_config_map 'myapp'
end

Features

  • Add tests.
  • Support multiple kubernetes client configuration.
  • Logging informations.
  • Support to get CronJob status.

Development(on GCP/GKE)

setup:

# create service account and cluster role.
kubectl apply -f examples/k8s/service-account.yaml

# get ca.crt and token
kubectl get secret -n kube-system kube-queue-test-token-xxx -o jsonpath="{['data']['token']}" | base64 -d > secrets/token
kubectl get secret -n kube-system kube-queue-test-token-xxx -o jsonpath="{['data']['ca\.crt']}" | base64 -d > secrets/ca.crt

# build image
gcloud builds submit --config cloudbuild.yaml .

run:

K8S_URL=https://xx.xxx.xxx.xxx K8S_CA_CERT_FILE=$(pwd)/secrets/ca.crt K8S_TOKEN=$(pwd)/secrets/token IMAGE_NAME=gcr.io/your-project/kube-queue bin/console

irb(main):001:0> TestWorker.enqueue(message: 'hello, kubernetes!')