Computer >> 컴퓨터 >  >> 프로그램 작성 >> Ruby

Ruby의 백그라운드 처리 시스템 구축을 통한 학습

오늘 포스팅에서는 재미삼아 순진한 백그라운드 처리 시스템을 구현해보도록 하겠습니다! Sidekiq와 같은 인기 있는 백그라운드 처리 시스템의 내부를 들여다보면서 몇 가지를 배울 수 있습니다. 이 재미의 제품은 결코 프로덕션 용도로 사용되지 않습니다.

하나 이상의 웹 사이트를 로드하고 제목을 추출하는 작업이 애플리케이션에 있다고 가정해 보겠습니다. 우리는 이러한 웹사이트의 성능에 영향을 미치지 않기 때문에 기본 스레드(또는 현재 요청—웹 애플리케이션을 구축하는 경우) 외부에서 작업을 수행하지만 백그라운드에서 수행하고 싶습니다.

작업 캡슐화

백그라운드 처리를 시작하기 전에 당면한 작업을 수행할 서비스 개체를 빌드해 보겠습니다. OpenURI 및 Nokogiri를 사용하여 제목 태그의 내용을 추출합니다.

require 'open-uri'
require 'nokogiri'
 
class TitleExtractorService
  def call(url)
    document = Nokogiri::HTML(open(url))
    title = document.css('html > head > title').first.content
    puts title.gsub(/[[:space:]]+/, ' ').strip
  rescue
    puts "Unable to find a title for #{url}"
  end
end

서비스를 호출하면 주어진 URL의 제목이 출력됩니다.

TitleExtractorService.new.call('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir

이것은 예상대로 작동하지만 다른 백그라운드 처리 시스템처럼 보이고 느껴지도록 구문을 약간 개선할 수 있는지 봅시다. Magique::Worker 생성 모듈에서 서비스 개체에 구문 설탕을 추가할 수 있습니다.

module Magique
  module Worker
    def self.included(base)
      base.extend(ClassMethods)
    end
 
    module ClassMethods
      def perform_now(*args)
        new.perform(*args)
      end
    end
 
    def perform(*)
      raise NotImplementedError
    end
  end
end

모듈은 perform을 추가합니다. 작업자 인스턴스에 대한 메소드 및 perform_now 호출을 조금 더 좋게 만들기 위해 작업자 클래스에 메서드를 추가합니다.

모듈을 서비스 객체에 포함시키자. 그 동안 TitleExtractorWorker로 이름을 변경하겠습니다. call 변경 perform 방법 .

class TitleExtractorWorker
  include Magique::Worker
 
  def perform(url)
    document = Nokogiri::HTML(open(url))
    title = document.css('html > head > title').first.content
    puts title.gsub(/[[:space:]]+/, ' ').strip
  rescue
    puts "Unable to find a title for #{url}"
  end
end

호출은 여전히 ​​동일한 결과를 갖지만 무슨 일이 일어나고 있는지 조금 더 명확합니다.

TitleExtractorWorker.perform_now('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir

비동기 처리 구현

이제 제목 추출이 작동하므로 과거 Ruby Magic 기사에서 모든 제목을 가져올 수 있습니다. 이를 위해 RUBYMAGIC가 있다고 가정하겠습니다. 과거 기사의 모든 URL 목록과 함께 상수입니다.

RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_now(url)
end
 
# Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# Bindings and Lexical Scope in Ruby | AppSignal Blog
# Building a Ruby C Extension From Scratch | AppSignal Blog
# Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...

우리는 과거 기사의 제목을 얻었지만 모두 추출하는 데 시간이 걸립니다. 다음 요청으로 넘어가기 전에 각 요청이 완료될 때까지 기다리기 때문입니다.

perform_async를 도입하여 이를 개선하겠습니다. 메서드를 작업자 모듈에 추가합니다. 속도를 높이기 위해 각 URL에 대해 새 스레드를 만듭니다.

module Magique
  module Worker
    module ClassMethods
      def perform_async(*args)
        Thread.new { new.perform(*args) }
      end
    end
  end
end

호출을 TitleExtractorWorker.perform_async(url)로 변경한 후 , 우리는 거의 한 번에 모든 제목을 얻습니다. 그러나 이는 동시에 Ruby Magic 블로그에 대한 20개 이상의 연결을 시작한다는 의미이기도 합니다. (블로그를 엉망으로 만들어서 죄송합니다. 😂)

자신의 구현을 따르고 장기 실행 프로세스(예:웹 서버) 외부에서 이것을 테스트하는 경우 loop { sleep 1 }과 같은 것을 추가하는 것을 잊지 마십시오. 프로세스가 즉시 종료되지 않도록 스크립트 끝에 추가합니다.

작업 대기

모든 호출에 대해 새 스레드를 만드는 접근 방식을 사용하면 결국 리소스 제한에 도달하게 됩니다(당사 측과 액세스하는 웹 사이트 모두). 좋은 시민이 되고 싶기 때문에 구현을 비동기적이지만 서비스 거부 공격처럼 느껴지지 않는 것으로 변경해 보겠습니다.

이 문제를 해결하는 일반적인 방법은 생산자/소비자 패턴을 사용하는 것입니다. 하나 이상의 생산자가 작업을 대기열에 푸시하는 동안 한 명 이상의 소비자가 대기열에서 작업을 가져와 처리합니다.

큐는 기본적으로 요소 목록입니다. 이론적으로 간단한 배열이 작업을 수행합니다. 그러나 동시성을 다룰 때 한 번에 하나의 생산자 또는 소비자만 대기열에 액세스할 수 있도록 해야 합니다. 이에 주의하지 않으면 두 사람이 한 번에 문을 밀고 들어가려는 것처럼 혼란에 빠지게 됩니다.

이 문제는 생산자-소비자 문제로 알려져 있으며 이에 대한 여러 솔루션이 있습니다. 운 좋게도 이것은 매우 일반적인 문제이며 Ruby는 적절한 Queue와 함께 제공됩니다. 스레드 동기화에 대해 걱정할 필요 없이 사용할 수 있는 구현입니다.

그것을 사용하기 위해 생산자와 소비자 모두가 대기열에 액세스할 수 있는지 확인합시다. Magique에 클래스 메서드를 추가하여 이 작업을 수행합니다. 모듈 및 Queue 인스턴스 할당 그것에.

module Magique
  def self.backend
    @backend
  end
 
  def self.backend=(backend)
    @backend = backend
  end
end
 
Magique.backend = Queue.new

다음으로 perform_async를 변경합니다. 자체 새 스레드를 만드는 대신 대기열에 작업을 푸시하는 구현입니다. 작업은 작업자 클래스에 대한 참조와 perform_async에 전달된 인수를 포함하는 해시로 표시됩니다. 방법.

module Magique
  module Worker
    module ClassMethods
      def perform_async(*args)
        Magique.backend.push(worker: self, args: args)
      end
    end
  end
end

그것으로 우리는 생산자 측 작업을 마쳤습니다. 다음으로 소비자 측면을 살펴보겠습니다.

각 소비자는 대기열에서 작업을 가져와 수행하는 별도의 스레드입니다. 스레드와 같이 한 작업 후에 중지하는 대신 소비자는 대기열에서 다른 작업을 가져와 수행하는 식입니다. 다음은 Magique::Processor라는 소비자의 기본 구현입니다. . 각 프로세서는 무한 루프하는 새 스레드를 만듭니다. 반복할 때마다 대기열에서 새 작업을 가져오고 작업자 클래스의 새 인스턴스를 만들고 perform을 호출합니다. 주어진 인수로 메소드.

module Magique
  class Processor
    def self.start(concurrency = 1)
      concurrency.times { |n| new("Processor #{n}") }
    end
 
    def initialize(name)
      thread = Thread.new do
        loop do
          payload = Magique.backend.pop
          worker_class = payload[:worker]
          worker_class.new.perform(*payload[:args])
        end
      end
 
      thread.name = name
    end
  end
end

처리 루프 외에 Magique::Processor.start라는 편리한 메서드를 추가합니다. . 이를 통해 한 번에 여러 프로세서를 회전할 수 있습니다. 스레드 이름을 지정할 필요는 없지만 실제로 예상대로 작동하는지 확인할 수 있습니다.

TitleExtractorWorker의 출력을 조정해 보겠습니다. 현재 스레드의 이름을 포함합니다.

puts "[#{Thread.current.name}] #{title.gsub(/[[:space:]]+/, ' ').strip}"

백그라운드 처리 설정을 테스트하려면 먼저 작업을 대기열에 추가하기 전에 프로세서 세트를 가동해야 합니다.

Magique.backend = Queue.new
Magique::Processor.start(5)
 
RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_async(url)
end
 
# [Processor 3] Bindings and Lexical Scope in Ruby | AppSignal Blog
# [Processor 4] Building a Ruby C Extension From Scratch | AppSignal Blog
# [Processor 1] Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# [Processor 0] Ruby's Hidden Gems, StringScanner | AppSignal Blog
# [Processor 2] Fibers and Enumerators in Ruby: Turning Blocks Inside Out | AppSignal Blog
# [Processor 4] Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...

이것이 실행될 때 우리는 여전히 모든 기사의 제목을 얻습니다. 모든 작업에 별도의 스레드를 사용하는 것만큼 빠르지는 않지만 백그라운드 처리가 없는 초기 구현보다 여전히 빠릅니다. 추가된 프로세서 이름 덕분에 모든 프로세서가 대기열을 통해 작업하고 있음을 확인할 수도 있습니다. 동시 프로세서 수를 조정하여 처리 속도와 기존 리소스 제한 간의 균형을 찾을 수 있습니다.

여러 프로세스 및 시스템으로 확장

지금까지 백그라운드 처리 시스템의 현재 구현은 충분히 잘 작동합니다. 그러나 여전히 동일한 프로세스로 제한됩니다. 리소스를 많이 사용하는 작업은 여전히 ​​전체 프로세스의 성능에 영향을 미칩니다. 마지막 단계로 여러 프로세스와 여러 시스템에 워크로드를 분산하는 방법을 살펴보겠습니다.

대기열은 생산자와 소비자 간의 유일한 연결입니다. 지금은 인메모리 구현을 사용하고 있습니다. Sidekiq에서 더 많은 영감을 얻고 Redis를 사용하여 대기열을 구현해 보겠습니다.

Redis는 작업을 푸시하고 가져올 수 있는 목록을 지원합니다. 또한 Redis Ruby gem은 스레드로부터 안전하며 목록을 수정하는 Redis 명령은 원자적입니다. 이러한 속성을 사용하면 동기화 문제 없이 비동기 백그라운드 처리 시스템에 사용할 수 있습니다.

push를 구현하는 Redis 지원 대기열을 만들어 보겠습니다. 및 shift Queue와 같은 메소드 이전에 사용했습니다.

require 'json'
require 'redis'
 
module Magique
  module Backend
    class Redis
      def initialize(connection = ::Redis.new)
        @connection = connection
      end
 
      def push(job)
        @connection.lpush('magique:queue', JSON.dump(job))
      end
 
      def shift
        _queue, job = @connection.brpop('magique:queue')
        payload = JSON.parse(job, symbolize_names: true)
        payload[:worker] = Object.const_get(payload[:worker])
        payload
      end
    end
  end
end

Redis는 Ruby 객체에 대해 아무것도 모르기 때문에 lpush를 사용하여 데이터베이스에 저장하기 전에 작업을 JSON으로 직렬화해야 합니다. 목록의 맨 앞에 요소를 추가하는 명령입니다.

대기열에서 작업을 가져오기 위해 brpop 목록에서 마지막 요소를 가져오는 명령입니다. 목록이 비어 있으면 새 요소를 사용할 수 있을 때까지 차단됩니다. 이것은 사용 가능한 작업이 없을 때 프로세서를 일시 중지하는 좋은 방법입니다. 마지막으로 Redis에서 작업을 가져온 후 Object.const_get을 사용하여 작업자 이름을 기반으로 실제 Ruby 클래스를 조회해야 합니다. .

마지막 단계로 여러 프로세스로 나누어 보겠습니다. 생산자 측에서는 백엔드를 새로 구현된 Redis 대기열로 변경하기만 하면 됩니다.

# ...
 
Magique.backend = Magique::Backend::Redis.new
 
RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_async(url)
end

소비자 측면에서 다음과 같은 몇 줄로 해결할 수 있습니다.

# ...
 
Magique.backend = Magique::Backend::Redis.new
Magique::Processor.start(5)
 
loop { sleep 1 }

실행될 때 소비자 프로세스는 새 작업이 대기열에 도착할 때까지 기다립니다. 작업을 대기열로 푸시하는 생산자 프로세스를 시작하면 즉시 처리되는 것을 볼 수 있습니다.

책임감 있게 즐기며 프로덕션에 사용하지 마십시오.

프로덕션에서 사용하는 실제 설정과 거리를 유지하면서(그렇지 않습니다!) 백그라운드 프로세서를 구축하는 몇 가지 단계를 수행했습니다. 프로세스를 백그라운드 서비스로 실행하는 것으로 시작했습니다. 그런 다음 비동기로 만들고 Queue를 사용했습니다. 생산자-소비자 문제를 해결하기 위해 그런 다음 인메모리 구현 대신 Redis를 사용하여 프로세스를 여러 프로세스 또는 시스템으로 확장했습니다.

앞서 언급했듯이 이것은 백그라운드 처리 시스템의 단순화된 구현입니다. 명시적으로 다루지 않고 누락된 부분이 많습니다. 여기에는 오류 처리, 다중 대기열, 일정, 연결 풀링 및 신호 처리가 포함되지만 이에 국한되지 않습니다.

그럼에도 불구하고, 우리는 이 글을 작성하는 것이 즐거웠고 백그라운드 처리 시스템의 내부를 엿볼 수 있기를 바랍니다. 어쩌면 한두 가지를 가져갔을 수도 있습니다.