Computer >> 컴퓨터 >  >> 프로그램 작성 >> Ruby

AppSignal 메트릭으로 대기열 작업자를 효율적으로 확장

대부분의 웹 앱은 오류가 발생하기 쉬우거나 시간이 많이 소요되는 부업을 처리하는 데 자주 사용되는 백그라운드 대기열의 이점을 누릴 수 있습니다. 이러한 백그라운드 작업은 이메일 전송, 캐시 업데이트, 핵심 비즈니스 로직 수행에 이르기까지 다양합니다.

백그라운드 대기열 시스템이 처리해야 하는 작업의 수를 확장함에 따라 해당 작업을 처리하는 작업자 풀도 확장되어야 합니다. 대기열에 추가되는 작업의 비율이 다양한 경우 대기열 작업자의 수를 확장하는 것이 핵심 측면이 됩니다. 처리 속도를 유지하는 데 있습니다. 또한 대기열 처리량이 낮은 동안 작업자를 축소하면 상당한 비용을 절감할 수 있습니다.

불행히도 많은 대기열 백엔드에는 작업자를 켜거나 끄는 확장 로직이 없습니다. 하지만 몇 가지 간단한 수학 및 성능 데이터를 사용하여 대기열에서 대기 중인 작업을 기반으로 최적의 작업자 수를 찾을 수 있습니다.

대기열 규칙

작업이 대기열 작업자가 처리하는 것보다 더 높은 속도로 대기열에 추가되면 대기열의 깊이가 증가하고 각 작업이 대기열에서 보내는 시간도 증가합니다. 일반적으로 우리는 대기 시간( 대기열) 각 작업에 대해 0초에서 허용 가능한 한도까지 가능한 한 낮게 설정합니다. 원하는 대기 시간을 충족하는 데 필요한 작업자 수를 추정하려면 QROT(Queueing Rule of Thumb)를 사용할 수 있습니다. 일반적으로, QROT는 작업 대기열을 처리하는 데 필요한 서버 수를 설명하는 부등식으로 표현되지만 하나의 형식은 다음과 같이 작성할 수 있습니다.

workers = (number_of_jobs * avg_service_time_per_job) / time_to_finish_queue

따라서 원하는 시간(예:30초) 동안 대기열에 서비스를 제공하는 데 필요한 작업자 수를 파악하려면 작업 수(대기열 크기)와 소요되는 평균 시간만 알면 됩니다. 각 작업 실행

성능 지표 액세스

대기열에 있는 작업의 평균 서비스 시간을 추정하려면 각 작업 클래스에 대한 성능 메트릭에 액세스해야 합니다. 다행히 AppSignal은 기본적으로 공통 대기열 백엔드에 대한 성능 데이터를 기록하여 각 시간에 대한 메트릭을 기록합니다. 작업이 실행되었습니다.

곧 출시될 AppSignal GraphQL API를 사용하여 지난 24시간 동안 각 작업 유형의 평균 기간을 얻을 수 있습니다. 이 API는 아직 완전히 공개되지 않았지만 현재 AppSignal의 성능 그래프 및 기타 데이터 표시에 사용됩니다. 다행히 GraphQL API는 자체 문서화를 목적으로 하며 GraphiQL과 같은 도구를 사용하여 API를 검사하고 API가 노출하는 데이터 개체를 찾을 수 있습니다.

GraphQL 쿼리를 작성하는 프로세스는 이 게시물의 범위를 벗어나지만 아래는 인기 있는 Faraday HTTP 클라이언트 라이브러리를 사용하여 AppSignal GraphQL API에 연결하여 기본 메트릭 집계를 쿼리하는 Ruby 클래스의 예입니다.

require 'json'
require 'faraday'
 
class AppsignalClient
  BASE_URL = 'https://appsignal.com/'
  DEFAULT_APP_ID = ENV['APPSIGNAL_APP_ID']
  DEFAULT_TOKEN = ENV['APPSIGNAL_API_TOKEN']
  # GraphQL query to fetch the "mean" metric for the selected app.
  METRICS_QUERY = <<~GRAPHQL.freeze
    query($appId: String!, $query: [MetricAggregation!]!, $timeframe: TimeframeEnum!) {
      app(id: $appId) {
        metrics {
          list(timeframe: $timeframe, query: $query) {
            start
            end
            rows {
              fields {
                key
                value
              }
            }
          }
        }
      }
    }
  GRAPHQL
 
  def initialize(app_id: DEFAULT_APP_ID, client_secret: DEFAULT_TOKEN)
    @app_id = app_id
    @client_secret = client_secret
  end
 
  # Fetch the average duration for a job class's perform action
  # Default timeframe is last 24 hours
  def average_job_duration(job_class, timeframe: 'R24H')
    response =
      connection.post(
        'graphql',
        JSON.dump(
          query: METRICS_QUERY,
          variables: {
            appId: @app_id,
            timeframe: timeframe,
            query: [
              name: 'transaction_duration',
              headerType: legacy
tags: [
                { key: 'namespace', value: 'background' },
                { key: 'action', value: "#{job_class.name}#perform" },
              ],
              fields: [{ field: 'MEAN', aggregate: 'AVG' }],
            ],
          }
        )
      )
    data = JSON.parse(response.body, symbolize_names: true)
    rows = data.dig(:data, :app, :metrics, :list, :rows)
    # There may be no metrics in the selected timeframe
    return 0.0 if rows.empty?
 
    rows.first[:fields].first[:value]
  end
 
  private
 
  def connection
    @connection ||= Faraday.new(
      url: BASE_URL,
      params: { token: @client_secret },
      headers: { 'Content-Type' => 'application/json' },
      request: { timeout: 10 }
    ) do |faraday|
      faraday.response :raise_error
      faraday.adapter Faraday.default_adapter
    end
  end
end

이 클래스를 사용하면 주어진 ActiveJob 클래스의 평균 작업 시간을 밀리초 단위로 반환할 수 있습니다.

AppsignalClient.new.average_job_duration(MyMailerJob)
# => 233.1

기본적으로 이것은 지난 24시간 동안의 데이터에 대한 작업의 평균 트랜잭션 지속 시간을 요구합니다. 작업이 그보다 훨씬 더 자주 실행되는 경우 해당 창을 단축하여 최근 실행에 더 많은 가중치를 부여할 수 있습니다. 평균입니다. 예를 들어 한 시간에 수백 번 실행되는 작업이 있는 경우 timeframe을 변경할 수 있습니다. 최대 1시간(R1H ) 이러한 작업을 지금 실행하는 경우 해당 작업의 기간을 더 잘 추정할 수 있습니다.

이 성능 데이터는 서버 사용률 데이터와 별개입니다. 이 데이터는 각 작업에 필요한 작업을 실제로 수행하는 데 걸리는 시간을 알려줍니다. 이것은 사용률 측정항목과 같은 외부 측정보다 작업자를 확장하는 데 더 유용합니다. .

대기열 검사

다음으로, 대기열을 검사하여 서비스할 작업을 결정해야 합니다. 일반적인 Ruby 대기열 백엔드는 Resque이며, ActiveJob과도 잘 통합됩니다. Resque에서 주어진 대기열의 대기열에 있는 작업에 액세스한 다음 AppsignalClient를 사용하여 클래스를 기반으로 하는 각 작업 위에서 클래스.

require 'resque'
 
class ResqueEstimator
  def initialize(queue: 'default')
    @queue = queue
    @cache = {}
    @appsignal_client = AppsignalClient.new
  end
 
  def enqueued_duration_estimate
    Resque.data_store.everything_in_queue(queue).map do |job|
      estimate_job_duration decode_activejob_args(job)
    end.sum
  end
 
  def estimate_job_duration(job)
    @cache[job['job_class']] ||= @appsignal_client
                                 .average_job_duration job['job_class']
  end
 
  private
 
  # ActiveJob-specific method for parsing job arguments
  # for ActiveJob+Resque integration
  def decode_activejob_args(job)
    decoded_job = job
    decoded_job = Resque.decode(job) if job.is_a? String
    decoded_job['args'].first
  end
end

이 클래스를 사용하는 것은 다음과 같이 간단합니다.

ResqueEstimator.new(queue: 'my_queue').enqueued_duration_estimate
# => 23000 (ms)

estimate_job_duration에서 작업 기간에 대한 간단한 메모를 사용합니다. AppSignal API에 대한 중복 호출을 피하기 위한 메서드입니다. 대부분의 경우 대기열에는 동일한 클래스의 많은 작업이 포함될 것이며 각 클래스의 실행을 한 번만 추정하여 오버헤드를 줄일 수 있습니다.

성능 데이터를 사용하여 확장

이 모든 것을 종합하면 이제 최근 성능 데이터를 사용하여 대기열의 내용에 따라 대기열 작업자를 확장하거나 축소할 수 있습니다! 언제든지 대기열의 작업을 보고 필요한 작업자를 추정할 수 있습니다. 원하는 시간 제한에 서비스를 제공합니다.

원하는 대기열 시간 제한(모든 작업이 대기열에서 기다려야 하는 최대 시간)을 결정해야 합니다. 30초. 또한 최소 및 최대 작업자 수를 지정해야 합니다. 대기열이 잠시 비워진 후 대기열에 추가된 첫 번째 작업을 처리하려면 대기열에 대해 최소한 한 명의 작업자를 계속 실행하는 것이 좋습니다. 또한 너무 많은 작업자로 인해 데이터베이스 연결 및/또는 서버 사용 비용이 과도하게 확장되는 것을 방지하기 위해 최대 작업자 수를 원합니다.

이 로직을 처리할 클래스를 만들 수 있습니다. 이는 기본적으로 이전의 Thumb 대기열 규칙을 구현한 것일 뿐입니다.

class ResqueWorkerScaler
  def initialize(queue: 'default', workers_range: 1..100, desired_wait_ms: 300_000)
    @queue = queue
    @workers_range = workers_range
    @desired_wait_ms = desired_wait_ms
    @estimator = ResqueEstimator.new(queue: @queue)
  end
 
  def desired_workers
    total_time_ms = @estimator.enqueued_duration_estimate
    workers_required = [(total_time_ms / desired_wait_ms).ceil, workers_range.last].min
    [workers_required, workers_range.first].max
  end
 
  def scale
    # using platform-specific scaling interface, scale to desired_workers
  end
end

수요에 따라 확장 및 축소할 수 있도록 정기적으로 작업자를 확장하려고 합니다. ResqueWorkerScaler를 호출하는 Rake 작업을 만들 수 있습니다. 작업자를 확장하는 클래스:

# inside lib/tasks/resque_workers.rake
 
namespace :resque_workers do
  desc 'Scale worker pool based on enqueued jobs'
  task :scale, [:queue] => [:environment] do |_t, args|
    queue = args[:queue] || 'default'
    ResqueWorkerScaler.new(queue: queue).scale
  end
end

그런 다음 이 Rake 작업을 정기적으로 실행하도록 cron 작업을 설정할 수 있습니다.

*/5 * * * * /path/to/our/rake resque_workers:scale
# scale a non-default queue:
*/5 * * * * /path/to/our/rake resque_workers:scale['my_queue']

스케일링 작업을 5분마다 실행하도록 설정했습니다. 각 새 작업자는 온라인 상태가 되어 작업 처리를 시작하는 데 어느 정도의 시간이 걸립니다. 코드베이스의 크기와 gem의 수에 따라 10-40초 정도 소요됩니다. 따라서 1분마다 작업자를 확장하려고 하면 원하는 변경 사항이 적용되기 전에 다시 확장하거나 축소할 수 있습니다. 앱에서 하루 중 다른 시간에 대기열 사용량이 변동하는 것을 볼 수만 있다면 다음을 수행할 수 있습니다. 1시간 간격으로 Rake 작업을 호출할 가능성이 높습니다. 그러나 대기열 크기가 한 시간 내에 변동하는 경우 위의 5분과 같이 더 자주 대기열을 검사하는 것이 좋습니다.

다음 단계

실제 성능 데이터를 사용하여 인프라를 확장하는 이러한 시스템은 수요에 매우 민감하고 다양한 사용량에 탄력적으로 대처할 수 있습니다. 특히 백그라운드 처리와 같은 환경에서 메모리 사용량 및 로드 평균과 같은 호스트 메트릭이 변하지 않을 가능성이 있는 환경에서는 성능 메트릭을 사용하여 다음을 수행합니다. 규모가 훨씬 더 적절합니다.

대체 대기열 확장 구현은 전체 대기열을 검사하는 대신 작업당 평균 대기 시간을 측정할 수 있지만 대기열 내용과 크기가 급격하게 변경되면 해당 측정항목이 대표성이 없을 수 있습니다. 시스템에서 많은 가변 로드가 발생하고 한 번에 많은 작업이 대기열에 들어가는 경우, 또는 작업 실행 시간이 광범위하게 변하는 경우 대기열 내부 검사가 응답하는 데 훨씬 빠르고 안정적으로 수정됩니다.

그러나 대기열 자체 검사 시스템에서 고려해야 할 몇 가지 제한 사항이 있습니다. 대기열이 충분히 크면 각 작업에서 실행 추정치를 확인하는 것이 엄청나게 느릴 것입니다. 이러한 경우 총 작업 수를 찾은 다음 다음을 선택하는 것이 좋습니다. 대기열에서 작업을 무작위로 샘플링하고 해당 샘플에서 평균 실행을 계산합니다. 또는 작업 클래스에 아직 연결된 성능 데이터가 없는 경우 실행 및 기록될 때까지 가정된 실행 시간을 사용해야 합니다. 몇 번.

위에 설명된 시스템은 또한 몇 가지 조정으로 크게 개선할 수 있습니다. 각 추정이 격리되고 멱등성이 있으므로 각 작업 클래스의 실행 시간을 병렬로 추정하는 것을 고려하십시오. 또한 현재 실행 중인 작업을 포함하도록 대기열 내부 검사를 업데이트할 수 있습니다. 총 서비스 시간 추정치의 정확도를 개선하기 위해 작업자를 추가합니다. 여러 대기열이 있는 백그라운드 처리 아키텍처의 경우 대기열 우선순위를 기반으로 각 대기열에 원하는 대기 시간을 할당하고 작업자를 적절하게 확장할 수 있습니다.

대기열 시스템은 모든 프로젝트에서 매우 가변적인 작업을 많이 수집하는 경향이 있습니다. 대기열에서 작업 실행에 대한 성능 데이터를 사용하면 리소스를 효율적으로 확장하여 응답하고 효율적인 방식으로 모든 작업을 처리할 수 있습니다.

즐거운 스케일링!

추신 Ruby Magic 게시물이 언론에 공개되는 즉시 읽고 싶다면 Ruby Magic 뉴스레터를 구독하고 게시물을 놓치지 마세요!