Computer >> 컴퓨터 >  >> 프로그램 작성 >> Ruby

중복 Sidekiq 작업을 피하는 세 가지 방법

Ruby 코드를 작성하는 경우 Sidekiq를 사용하여 백그라운드 처리를 처리할 가능성이 있습니다. ActiveJob에서 오는 경우 또는 다른 배경 지식이 있는 경우 계속 지켜봐 주시기 바랍니다. 여기에서 다루는 팁 중 일부도 적용할 수 있습니다.

사람들은 다양한 경우에 (Sidekiq) 백그라운드 작업을 활용합니다. 일부 crunchnumbers, 일부는 사용자에게 환영 이메일을 발송하고 일부는 데이터 동기화를 예약합니다. 귀하의 경우가 무엇이든 결국에는 중복 작업을 피하기 위한 요구 사항에 부딪힐 수 있습니다. 중복 작업으로 정확히 동일한 작업을 수행하는 두 개의 작업을 상상합니다. 조금 더 자세히 살펴보겠습니다.

중복 작업을 제거해야 하는 이유

작업이 다음과 같은 시나리오를 상상해 보십시오.

class BookSalesWorker
  include Sidekiq::Worker
 
  def perform(book_id)
    crunch_some_numbers(book_id)
 
    upload_to_s3
  end
 
  ...
end

BookSalesWorker 항상 같은 작업을 수행합니다 — book_id를 기반으로 DB에 책을 쿼리합니다. 최신 판매 데이터를 가져와서 일부 숫자를 계산합니다. 그런 다음 스토리지 서비스에 업로드합니다. 웹사이트에서 책이 판매될 때마다 이 작업이 대기열에 추가됩니다.

자, 한 번에 100개의 매출이 생긴다면? 똑같은 일을 하는 작업이 100개 있습니다. 어쩌면 당신은 괜찮습니다. S3writes는 그다지 신경쓰지 않고 대기열이 혼잡하지 않으므로 로드를 처리할 수 있습니다. 하지만 "크기가 조정되나요?"™️

글쎄, 확실히 아니다. 더 많은 책에 대해 더 많은 판매를 받기 시작하면 대기열이 불필요한 작업으로 빠르게 쌓일 것입니다. 한 권의 책에 대해 동일한 작업을 수행하는 100개의 작업이 있고 병렬로 판매되는 10개의 책이 있는 경우 이제 대기열에 1000개의 작업이 깊숙이 들어갑니다. 실제로는 각 책에 대해 10개의 작업이 있을 수 있습니다.

이제 중복 작업이 대기열에 쌓이는 것을 방지할 수 있는 몇 가지 옵션을 살펴보겠습니다.

1. DIY 방법

외부 종속성 및 복잡한 논리의 팬이 아닌 경우 코드베이스에 몇 가지 사용자 지정 솔루션을 추가할 수 있습니다. 예제를 직접 사용해 보기 위해 샘플 리포지토리를 만들었습니다. 예제에 대한 각 접근 방식에 대한 링크가 있습니다.

1.1 단일 플래그 접근 방식

작업을 대기열에 넣을지 여부를 결정하는 하나의 플래그를 추가할 수 있습니다. 하나는 sales_enqueued_at를 추가할 수 있습니다. Book 테이블에서 유지 관리합니다. 예:

module BookSalesService
  def schedule_with_one_flag(book)
    # Check if the job was enqueued more than 10 minutes ago
    if book.sales_enqueued_at < 10.minutes.ago
      book.update(sales_enqueued_at: Time.current)
 
      BookSalesWorker.perform_async(book.id)
    end
  end
end

즉, 마지막 작업이 대기열에 추가된 후 10분이 지날 때까지 새 작업이 대기열에 추가되지 않습니다. 10분이 지나면 sales_enqueued_at를 업데이트합니다. 새 작업을 대기열에 추가합니다.

할 수 있는 또 다른 일은 부울인 플래그 하나를 설정하는 것입니다(예:crunching_sales). . crunching_sales를 설정했습니다. 첫 번째 작업이 대기열에 추가되기 전에 true로 설정됩니다. 그런 다음 작업이 완료되면 false로 설정합니다. 예약을 시도하는 다른 모든 작업은 crunching_sales까지 거부됩니다. 거짓입니다.

내가 만든 예제 저장소에서 이 접근 방식을 시도할 수 있습니다.

1.2 두 개의 플래그 접근 방식

작업을 10분 동안 대기열에 넣지 못하도록 "잠금"하는 것이 너무 무섭게 들리지만 코드에 추가 플래그가 있으면 여전히 문제가 없는 경우 다음 제안이 도움이 될 수 있습니다.

기존 sales_enqueued_at에 다른 플래그를 추가할 수 있습니다. — sales_calculated_at .그러면 코드는 다음과 같을 것입니다.

module BookSalesService
  def schedule_with_two_flags(book)
    # Check if sales are being calculated right now
    if book.sales_enqueued_at <= book.sales_calculated_at
      book.update(sales_enqueued_at: Time.current)
 
      BookSalesWorker.perform_async(book.id)
    end
  end
end
 
class BookSalesWorker
  include Sidekiq::Worker
 
  def perform(book_id)
    crunch_some_numbers(book_id)
 
    upload_to_s3
 
    # New adition
    book.update(sales_calculated_at: Time.current)
  end
 
  ...
end

사용해 보려면 예제 리포지토리의 지침을 확인하세요.

이제 작업이 대기열에 추가되고 완료되는 시간 사이의 일부를 제어합니다. 이 시간 동안에는 대기열에 넣을 수 있는 작업이 없습니다. 작업이 실행되는 동안 sales_enqueued_at sales_calculated_at보다 큽니다. . 작업 실행이 완료되면 sales_calculated_at sales_enqueued_at보다 더 큽니다(최근). 새 작업이 대기열에 추가됩니다.

두 개의 플래그를 사용하는 것이 흥미로울 수 있으므로 UI에서 판매 번호가 업데이트된 마지막 시간을 표시할 수 있습니다. 그러면 그것을 읽는 사용자는 데이터가 얼마나 최신인지 알 수 있습니다. 윈-윈 상황입니다.

플래그 요약

필요할 때 이와 같은 솔루션을 만들고 싶은 생각이 들 수도 있지만 제게는 그것들이 약간 서툴고 약간의 오버헤드가 추가됩니다. 사용 사례가 간단한 경우 이 방법을 사용하는 것이 좋지만 복잡하거나 충분하지 않은 것으로 판명되면 다른 옵션을 시도해 보시기 바랍니다.

플래그 접근 방식의 큰 단점은 10분 동안 대기열에 넣으려는 모든 작업을 잃게 된다는 것입니다. 큰 장점은 종속성을 가져오지 않고 대기열의 작업 수를 상당히 빠르게 줄일 수 있다는 것입니다.

1.3 대기열 순회

취할 수 있는 또 다른 접근 방식은 동일한 작업이 대기열에 들어가는 것을 방지하기 위해 사용자 지정 잠금 메커니즘을 만드는 것입니다. 관심 있는 Sidekiq 대기열 착용을 확인하고 작업(작업자)이 이미 있는지 확인합니다. 코드는 다음과 같습니다.

module BookSalesService
  def schedule_unique_across_queue(book)
    queue = Sidekiq::Queue.new('default')
 
    queue.each do |job|
      return if job.klass == BookSalesWorker.to_s &&
        job.args == [book.id]
    end
 
    BookSalesWorker.perform_async(book.id)
  end
end
 
class BookSalesWorker
  include Sidekiq::Worker
 
  def perform(book_id)
    crunch_some_numbers(book_id)
 
    upload_to_s3
  end
 
  ...
end

위의 예에서 'default' 대기열에 클래스 이름이 BookSalesWorker인 작업이 있습니다. . 또한 작업 인수가 책 ID와 일치하는지 확인하고 있습니다. BookSalesWorker 동일한 도서 ID를 가진 작업이 대기열에 있는 경우 다른 작업을 예약하지 않고 일찍 반환합니다.

대기열이 비어 있기 때문에 작업을 너무 빨리 예약하면 일부 작업이 예약될 수 있습니다. 다음을 사용하여 로컬에서 테스트할 때 정확한 일이 발생했습니다.

100.times { BookSalesService.schedule_unique_across_queue(book) }

예제 리포지토리에서 사용해 볼 수 있습니다.

이 접근 방식의 좋은 점은 필요한 경우 기존 작업을 검색하기 위해 모든 대기열을 탐색할 수 있다는 것입니다. 단점은 대기열이 비어 있고 한 번에 많은 작업을 예약하는 경우 여전히 중복 작업을 가질 수 있다는 것입니다. 대기열입니다.

2. Sidekiq Enterprise로 업그레이드

귀하 또는 귀하의 조직에 돈이 있다면 Sidekiq의 Enterprise 버전으로 업그레이드할 수 있습니다. 한 달에 $179부터 시작하며 중복 작업을 방지하는 데 도움이 되는 멋진 기능이 있습니다. 안타깝게도 저는 SidekiqEnterprise가 없지만 설명서가 충분하다고 생각합니다. 다음 코드를 사용하여 고유한(중복되지 않은) 작업을 쉽게 가질 수 있습니다.

class BookSalesWorker
  include Sidekiq::Worker
  sidekiq_options unique_for: 10.minutes
 
  def perform(book_id)
    crunch_some_numbers(book_id)
 
    upload_to_s3
  end
 
  ...
end

그리고 그게 다야. 'One Flag Approach' 섹션에서 설명한 것과 유사한 작업 구현이 있습니다. 작업은 10분 동안 고유합니다. 즉, 동일한 인수를 가진 다른 작업은 해당 기간에 예약할 수 없습니다.

꽤 멋진 한 라이너, 응? Enterprise Sidekiq이 있고 이 기능에 대해 방금 알게 된 경우 도움이 된 것 같아 정말 기쁩니다. 우리 대부분은 그것을 사용하지 않을 것이므로 다음 솔루션으로 넘어가겠습니다.

3. sidekiq-unique-jobs 구조

예, 우리가 보석에 대해 언급하려고 한다는 것을 압니다. 그리고 예, 일부 Lua 파일이 있습니다. 하지만 저를 참아주세요, 당신이 그것을 얻는 것은 정말 달콤한 거래입니다. sidekiq-unique-jobgem에는 필요한 것보다 더 많은 잠금 및 기타 구성 옵션이 있습니다.

빠르게 시작하려면 sidekiq-unique-jobs를 입력하세요. gem을 Gemfile에 넣고 bundle 다음과 같이 작업자를 구성합니다.

class UniqueBookSalesWorker
  include Sidekiq::Worker
 
  sidekiq_options lock: :until_executed,
                  on_conflict: :reject
 
  def perform(book_id)
    book = Book.find(book_id)
 
    logger.info "I am a Sidekiq Book Sales worker - I started"
    sleep 2
    logger.info "I am a Sidekiq Book Sales worker - I finished"
 
    book.update(sales_calculated_at: Time.current)
    book.update(crunching_sales: false)
  end
end

많은 옵션이 있지만 저는 이것을 단순화하여 사용하기로 결정했습니다.

sidekiq_options lock: :until_executed, on_conflict: :reject

lock: :until_executed 첫 번째 UniqueBookSalesWorker를 잠급니다. 작업이 실행될 때까지. on_conflict: :reject 사용 , 우리는 실행을 시도하는 다른 모든 작업이 데드 큐로 거부되기를 원한다고 말하고 있습니다. 여기에서 얻은 것은 위 주제의 DIY 예제에서 수행한 것과 유사합니다.

이러한 DIY 예제에 비해 약간 개선된 점은 발생한 일에 대해 일종의 로그오프가 있다는 것입니다. 어떻게 보이는지 이해하기 위해 다음을 시도해 보겠습니다.

5.times { UniqueBookSalesWorker.perform_async(Book.last.id) }

하나의 작업만 완전히 실행되고 나머지 4개의 작업은 다시 시도할 수 있는 데드 대기열로 보내집니다. 이 접근 방식은 중복 작업이 무시된 예와 다릅니다.

잠금 및 충돌 해결과 관련하여 선택할 수 있는 옵션이 많이 있습니다. 특정 사용 사례에 대해서는 gem 설명서를 참조하는 것이 좋습니다.

훌륭한 통찰력

이 보석의 가장 큰 장점은 잠금과 대기열에서 다운된 내역을 볼 수 있다는 것입니다. config/routes.rb에 다음 줄을 추가하기만 하면 됩니다. :

# config/routes.rb
require 'sidekiq_unique_jobs/web'

Rails.application.routes.draw do
  mount Sidekiq::Web, at: '/sidekiq'
end

여기에는 원래 Sidekiq 클라이언트가 포함되지만 작업 잠금용 페이지와 변경 로그용 페이지 등 두 페이지가 더 제공됩니다. 이렇게 생겼습니다:

"잠금"과 "변경 로그"라는 두 개의 새 페이지가 어떻게 생겼는지 확인하세요. 아주 멋진 기능입니다.

gem이 설치되고 사용할 준비가 된 exampleproject에서 이 모든 것을 시도할 수 있습니다.

왜 루아인가?

우선, 나는 보석의 저자가 아니므로 여기에서 가정합니다. 보석을 처음 봤을 때 루비 보석 내부에 Lua를 사용하는 이유가 무엇인지 궁금했습니다. 처음에는 이상하게 보일 수 있지만 Redis는 Lua 스크립트 실행을 지원합니다. 보석 작성자가 이 점을 염두에 두고 Lua에서 더 민첩한 논리를 만들고 싶었을 것입니다.

gem의 repo에 있는 Lua 파일을 보면 그렇게 복잡하지 않습니다. 모든 Lua 스크립트는 나중에 SidekiqUniqueJobs::Script::Caller의 Ruby 코드에서 호출됩니다. 여기. 소스 코드를 살펴보고 어떻게 작동하는지 알아내는 것이 재미있습니다.

대체 보석

ActiveJob을 사용하는 경우 광범위하게 active-job-uniqueness를 시도할 수 있습니다. gem right here. 아이디어는 비슷하지만 사용자 지정 Lua 스크립트 대신 [Redlock]을 사용하여 Redis에서 항목을 잠급니다.

이 보석을 사용하여 고유한 작업을 수행하려면 다음과 같은 작업을 상상할 수 있습니다.

class BookSalesJob < ActiveJob::Base
  unique :until_executed
 
  def perform
    ...
  end
end

구문은 덜 장황하지만 sidekiq-unique-jobs와 매우 유사합니다. 보석. ActiveJob에 크게 의존하는 경우 문제를 해결할 수 있습니다. .

최종 생각

앱에서 중복 작업을 처리하는 방법에 대한 지식을 얻으셨기를 바랍니다. 나는 확실히 다른 솔루션을 연구하고 가지고 노는 것이 재미있었습니다. 원하는 것을 찾지 못했다면 몇 가지 예를 통해 자신만의 것을 만들 수 있는 영감을 얻으셨기를 바랍니다.

다음은 모든 코드 조각이 포함된 예제 프로젝트입니다.

다음 편에서 뵙겠습니다, 건배.

추신 Ruby Magic 게시물이 언론에 공개되는 즉시 읽고 싶다면 Ruby Magic 뉴스레터를 구독하고 게시물을 놓치지 마세요!