오늘 포스팅에서는 재미삼아 순진한 백그라운드 처리 시스템을 구현해보도록 하겠습니다! Sidekiq와 같은 인기 있는 백그라운드 처리 시스템의 내부를 들여다보면서 몇 가지를 배울 수 있습니다. 이 재미의 제품은 결코 프로덕션 용도로 사용되지 않습니다.
하나 이상의 웹 사이트를 로드하고 제목을 추출하는 작업이 애플리케이션에 있다고 가정해 보겠습니다. 우리는 이러한 웹사이트의 성능에 영향을 미치지 않기 때문에 기본 스레드(또는 현재 요청—웹 애플리케이션을 구축하는 경우) 외부에서 작업을 수행하지만 백그라운드에서 수행하고 싶습니다.피>
작업 캡슐화
백그라운드 처리를 시작하기 전에 당면한 작업을 수행할 서비스 개체를 빌드해 보겠습니다. OpenURI 및 Nokogiri를 사용하여 제목 태그의 내용을 추출합니다.
require 'open-uri'
require 'nokogiri'
class TitleExtractorService
def call(url)
document = Nokogiri::HTML(open(url))
title = document.css('html > head > title').first.content
puts title.gsub(/[[:space:]]+/, ' ').strip
rescue
puts "Unable to find a title for #{url}"
end
end
서비스를 호출하면 주어진 URL의 제목이 출력됩니다.
TitleExtractorService.new.call('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir
이것은 예상대로 작동하지만 다른 백그라운드 처리 시스템처럼 보이고 느껴지도록 구문을 약간 개선할 수 있는지 봅시다. Magique::Worker
생성 모듈에서 서비스 개체에 구문 설탕을 추가할 수 있습니다.
module Magique
module Worker
def self.included(base)
base.extend(ClassMethods)
end
module ClassMethods
def perform_now(*args)
new.perform(*args)
end
end
def perform(*)
raise NotImplementedError
end
end
end
모듈은 perform
을 추가합니다. 작업자 인스턴스에 대한 메소드 및 perform_now
호출을 조금 더 좋게 만들기 위해 작업자 클래스에 메서드를 추가합니다.
모듈을 서비스 객체에 포함시키자. 그 동안 TitleExtractorWorker
로 이름을 변경하겠습니다. call
변경 perform
방법 .
class TitleExtractorWorker
include Magique::Worker
def perform(url)
document = Nokogiri::HTML(open(url))
title = document.css('html > head > title').first.content
puts title.gsub(/[[:space:]]+/, ' ').strip
rescue
puts "Unable to find a title for #{url}"
end
end
호출은 여전히 동일한 결과를 갖지만 무슨 일이 일어나고 있는지 조금 더 명확합니다.
TitleExtractorWorker.perform_now('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir
비동기 처리 구현
이제 제목 추출이 작동하므로 과거 Ruby Magic 기사에서 모든 제목을 가져올 수 있습니다. 이를 위해 RUBYMAGIC
가 있다고 가정하겠습니다. 과거 기사의 모든 URL 목록과 함께 상수입니다.
RUBYMAGIC.each do |url|
TitleExtractorWorker.perform_now(url)
end
# Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# Bindings and Lexical Scope in Ruby | AppSignal Blog
# Building a Ruby C Extension From Scratch | AppSignal Blog
# Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...
우리는 과거 기사의 제목을 얻었지만 모두 추출하는 데 시간이 걸립니다. 다음 요청으로 넘어가기 전에 각 요청이 완료될 때까지 기다리기 때문입니다.
perform_async
를 도입하여 이를 개선하겠습니다. 메서드를 작업자 모듈에 추가합니다. 속도를 높이기 위해 각 URL에 대해 새 스레드를 만듭니다.
module Magique
module Worker
module ClassMethods
def perform_async(*args)
Thread.new { new.perform(*args) }
end
end
end
end
호출을 TitleExtractorWorker.perform_async(url)
로 변경한 후 , 우리는 거의 한 번에 모든 제목을 얻습니다. 그러나 이는 동시에 Ruby Magic 블로그에 대한 20개 이상의 연결을 시작한다는 의미이기도 합니다. (블로그를 엉망으로 만들어서 죄송합니다. 😂)
자신의 구현을 따르고 장기 실행 프로세스(예:웹 서버) 외부에서 이것을 테스트하는 경우 loop { sleep 1 }
과 같은 것을 추가하는 것을 잊지 마십시오. 프로세스가 즉시 종료되지 않도록 스크립트 끝에 추가합니다.
작업 대기
모든 호출에 대해 새 스레드를 만드는 접근 방식을 사용하면 결국 리소스 제한에 도달하게 됩니다(당사 측과 액세스하는 웹 사이트 모두). 좋은 시민이 되고 싶기 때문에 구현을 비동기적이지만 서비스 거부 공격처럼 느껴지지 않는 것으로 변경해 보겠습니다.
이 문제를 해결하는 일반적인 방법은 생산자/소비자 패턴을 사용하는 것입니다. 하나 이상의 생산자가 작업을 대기열에 푸시하는 동안 한 명 이상의 소비자가 대기열에서 작업을 가져와 처리합니다.
큐는 기본적으로 요소 목록입니다. 이론적으로 간단한 배열이 작업을 수행합니다. 그러나 동시성을 다룰 때 한 번에 하나의 생산자 또는 소비자만 대기열에 액세스할 수 있도록 해야 합니다. 이에 주의하지 않으면 두 사람이 한 번에 문을 밀고 들어가려는 것처럼 혼란에 빠지게 됩니다.
이 문제는 생산자-소비자 문제로 알려져 있으며 이에 대한 여러 솔루션이 있습니다. 운 좋게도 이것은 매우 일반적인 문제이며 Ruby는 적절한 Queue
와 함께 제공됩니다. 스레드 동기화에 대해 걱정할 필요 없이 사용할 수 있는 구현입니다.
그것을 사용하기 위해 생산자와 소비자 모두가 대기열에 액세스할 수 있는지 확인합시다. Magique
에 클래스 메서드를 추가하여 이 작업을 수행합니다. 모듈 및 Queue
인스턴스 할당 그것에.
module Magique
def self.backend
@backend
end
def self.backend=(backend)
@backend = backend
end
end
Magique.backend = Queue.new
다음으로 perform_async
를 변경합니다. 자체 새 스레드를 만드는 대신 대기열에 작업을 푸시하는 구현입니다. 작업은 작업자 클래스에 대한 참조와 perform_async
에 전달된 인수를 포함하는 해시로 표시됩니다. 방법.
module Magique
module Worker
module ClassMethods
def perform_async(*args)
Magique.backend.push(worker: self, args: args)
end
end
end
end
그것으로 우리는 생산자 측 작업을 마쳤습니다. 다음으로 소비자 측면을 살펴보겠습니다.
각 소비자는 대기열에서 작업을 가져와 수행하는 별도의 스레드입니다. 스레드와 같이 한 작업 후에 중지하는 대신 소비자는 대기열에서 다른 작업을 가져와 수행하는 식입니다. 다음은 Magique::Processor
라는 소비자의 기본 구현입니다. . 각 프로세서는 무한 루프하는 새 스레드를 만듭니다. 반복할 때마다 대기열에서 새 작업을 가져오고 작업자 클래스의 새 인스턴스를 만들고 perform
을 호출합니다. 주어진 인수로 메소드.
module Magique
class Processor
def self.start(concurrency = 1)
concurrency.times { |n| new("Processor #{n}") }
end
def initialize(name)
thread = Thread.new do
loop do
payload = Magique.backend.pop
worker_class = payload[:worker]
worker_class.new.perform(*payload[:args])
end
end
thread.name = name
end
end
end
처리 루프 외에 Magique::Processor.start
라는 편리한 메서드를 추가합니다. . 이를 통해 한 번에 여러 프로세서를 회전할 수 있습니다. 스레드 이름을 지정할 필요는 없지만 실제로 예상대로 작동하는지 확인할 수 있습니다.
TitleExtractorWorker
의 출력을 조정해 보겠습니다. 현재 스레드의 이름을 포함합니다.
puts "[#{Thread.current.name}] #{title.gsub(/[[:space:]]+/, ' ').strip}"
백그라운드 처리 설정을 테스트하려면 먼저 작업을 대기열에 추가하기 전에 프로세서 세트를 가동해야 합니다.
Magique.backend = Queue.new
Magique::Processor.start(5)
RUBYMAGIC.each do |url|
TitleExtractorWorker.perform_async(url)
end
# [Processor 3] Bindings and Lexical Scope in Ruby | AppSignal Blog
# [Processor 4] Building a Ruby C Extension From Scratch | AppSignal Blog
# [Processor 1] Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# [Processor 0] Ruby's Hidden Gems, StringScanner | AppSignal Blog
# [Processor 2] Fibers and Enumerators in Ruby: Turning Blocks Inside Out | AppSignal Blog
# [Processor 4] Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...
이것이 실행될 때 우리는 여전히 모든 기사의 제목을 얻습니다. 모든 작업에 별도의 스레드를 사용하는 것만큼 빠르지는 않지만 백그라운드 처리가 없는 초기 구현보다 여전히 빠릅니다. 추가된 프로세서 이름 덕분에 모든 프로세서가 대기열을 통해 작업하고 있음을 확인할 수도 있습니다. 동시 프로세서 수를 조정하여 처리 속도와 기존 리소스 제한 간의 균형을 찾을 수 있습니다.
여러 프로세스 및 시스템으로 확장
지금까지 백그라운드 처리 시스템의 현재 구현은 충분히 잘 작동합니다. 그러나 여전히 동일한 프로세스로 제한됩니다. 리소스를 많이 사용하는 작업은 여전히 전체 프로세스의 성능에 영향을 미칩니다. 마지막 단계로 여러 프로세스와 여러 시스템에 워크로드를 분산하는 방법을 살펴보겠습니다.
대기열은 생산자와 소비자 간의 유일한 연결입니다. 지금은 인메모리 구현을 사용하고 있습니다. Sidekiq에서 더 많은 영감을 얻고 Redis를 사용하여 대기열을 구현해 보겠습니다.
Redis는 작업을 푸시하고 가져올 수 있는 목록을 지원합니다. 또한 Redis Ruby gem은 스레드로부터 안전하며 목록을 수정하는 Redis 명령은 원자적입니다. 이러한 속성을 사용하면 동기화 문제 없이 비동기 백그라운드 처리 시스템에 사용할 수 있습니다.
push
를 구현하는 Redis 지원 대기열을 만들어 보겠습니다. 및 shift
Queue
와 같은 메소드 이전에 사용했습니다.
require 'json'
require 'redis'
module Magique
module Backend
class Redis
def initialize(connection = ::Redis.new)
@connection = connection
end
def push(job)
@connection.lpush('magique:queue', JSON.dump(job))
end
def shift
_queue, job = @connection.brpop('magique:queue')
payload = JSON.parse(job, symbolize_names: true)
payload[:worker] = Object.const_get(payload[:worker])
payload
end
end
end
end
Redis는 Ruby 객체에 대해 아무것도 모르기 때문에 lpush
를 사용하여 데이터베이스에 저장하기 전에 작업을 JSON으로 직렬화해야 합니다. 목록의 맨 앞에 요소를 추가하는 명령입니다.
대기열에서 작업을 가져오기 위해 brpop
목록에서 마지막 요소를 가져오는 명령입니다. 목록이 비어 있으면 새 요소를 사용할 수 있을 때까지 차단됩니다. 이것은 사용 가능한 작업이 없을 때 프로세서를 일시 중지하는 좋은 방법입니다. 마지막으로 Redis에서 작업을 가져온 후 Object.const_get
을 사용하여 작업자 이름을 기반으로 실제 Ruby 클래스를 조회해야 합니다. .
마지막 단계로 여러 프로세스로 나누어 보겠습니다. 생산자 측에서는 백엔드를 새로 구현된 Redis 대기열로 변경하기만 하면 됩니다.
# ...
Magique.backend = Magique::Backend::Redis.new
RUBYMAGIC.each do |url|
TitleExtractorWorker.perform_async(url)
end
소비자 측면에서 다음과 같은 몇 줄로 해결할 수 있습니다.
# ...
Magique.backend = Magique::Backend::Redis.new
Magique::Processor.start(5)
loop { sleep 1 }
실행될 때 소비자 프로세스는 새 작업이 대기열에 도착할 때까지 기다립니다. 작업을 대기열로 푸시하는 생산자 프로세스를 시작하면 즉시 처리되는 것을 볼 수 있습니다.
책임감 있게 즐기며 프로덕션에 사용하지 마십시오.
프로덕션에서 사용하는 실제 설정과 거리를 유지하면서(그렇지 않습니다!) 백그라운드 프로세서를 구축하는 몇 가지 단계를 수행했습니다. 프로세스를 백그라운드 서비스로 실행하는 것으로 시작했습니다. 그런 다음 비동기로 만들고 Queue
를 사용했습니다. 생산자-소비자 문제를 해결하기 위해 그런 다음 인메모리 구현 대신 Redis를 사용하여 프로세스를 여러 프로세스 또는 시스템으로 확장했습니다.
앞서 언급했듯이 이것은 백그라운드 처리 시스템의 단순화된 구현입니다. 명시적으로 다루지 않고 누락된 부분이 많습니다. 여기에는 오류 처리, 다중 대기열, 일정, 연결 풀링 및 신호 처리가 포함되지만 이에 국한되지 않습니다.
그럼에도 불구하고, 우리는 이 글을 작성하는 것이 즐거웠고 백그라운드 처리 시스템의 내부를 엿볼 수 있기를 바랍니다. 어쩌면 한두 가지를 가져갔을 수도 있습니다.