Computer >> 컴퓨터 >  >> 프로그램 작성 >> Ruby

Ruby 동시성 도구 상자 열기

동시성과 병렬성은 Ruby 개발자에게 그 어느 때보다 중요합니다. 그들은 최대한의 잠재력을 발휘할 수 있는 하드웨어를 활용하여 우리의 애플리케이션을 더 빠르게 만들 수 있습니다. 이 기사에서는 현재 모든 Rubyist가 사용할 수 있는 도구와 Ruby가 이 부서에서 곧 제공할 예정인 도구를 살펴보겠습니다.

모든 사람이 동시성을 직접 사용하는 것은 아니지만 우리는 모두 Sidekiq와 같은 도구를 통해 간접적으로 동시성을 사용합니다. Ruby 동시성을 이해하는 것은 자신만의 솔루션을 구축하는 데 도움이 되지 않습니다. 기존 항목을 이해하고 문제를 해결하는 데 도움이 됩니다.

하지만 먼저 한 발 물러나서 큰 그림을 살펴보겠습니다.

동시성 대 병렬성

이러한 용어는 느슨하게 사용되지만 뚜렷한 의미가 있습니다.

  • 동시성: 한 번에 하나씩 많은 작업을 수행하는 기술. 이들 사이를 빠르게 전환함으로써 사용자에게 동시에 발생하는 것처럼 보일 수 있습니다.
  • 병렬성: 말 그대로 동시에 많은 작업을 수행합니다. 동시에 나타나는 대신 동시에 나타납니다.

동시성은 IO가 많은 애플리케이션에 가장 자주 사용됩니다. 예를 들어 웹 앱은 정기적으로 데이터베이스와 상호 작용하거나 많은 네트워크 요청을 할 수 있습니다. 동시성을 사용하면 데이터베이스가 쿼리에 응답하기를 기다리는 동안에도 애플리케이션의 응답성을 유지할 수 있습니다.

이것은 Ruby VM이 IO 중에 하나가 대기하는 동안 다른 스레드가 실행되도록 허용하기 때문에 가능합니다. 프로그램이 수십 개의 요청을 해야 하는 경우에도 동시성을 사용하면 요청이 거의 동시에 이루어집니다.

반면에 병렬 처리는 현재 Ruby에서 지원되지 않습니다.

<블록 인용>

루비에 병렬 처리가 없는 이유는 무엇입니까?

오늘날에는 기본 Ruby 구현(일반적으로 MRI 또는 ​​Cruby라고 함)을 사용하여 단일 Ruby 프로세스 내에서 병렬 처리를 달성할 수 있는 방법이 없습니다. Ruby VM은 여러 스레드가 동시에 Ruby 코드를 실행하지 못하도록 하는 잠금(GVM 또는 전역 VM 잠금)을 시행합니다. 이 잠금은 가상 머신의 내부 상태를 보호하고 VM 충돌을 초래할 수 있는 시나리오를 방지하기 위해 존재합니다. 이것은 좋은 위치는 아니지만 모든 희망을 잃지는 않았습니다. Ruby 3가 곧 출시될 예정이며 코드명 Guild(이 기사의 마지막 섹션에서 설명)라는 개념을 도입하여 이 핸디캡을 해결할 것을 약속합니다.

스레드

스레드는 Ruby의 동시성 작업 도구입니다. 그것들을 사용하는 방법과 알아야 할 함정을 더 잘 이해하기 위해 예를 들어보겠습니다. API를 사용하고 그 결과를 동시성을 사용하여 데이터 저장소에 저장하는 작은 프로그램을 빌드합니다.

API 클라이언트를 빌드하기 전에 API가 필요합니다. 아래는 숫자를 받아들이고 제공된 숫자가 홀수인 경우 일반 텍스트로 응답하는 작은 API의 구현입니다. 구문이 이상해 보이더라도 걱정하지 마십시오. 이것은 동시성과 관련이 없습니다. 이것은 우리가 사용할 도구일 뿐입니다.

app =
  Proc.new do |env|
    sleep 0.05
    qs = env['QUERY_STRING']
    number = Integer(qs.match(/number=(\d+)/)[1])
    [
      '200',
      { 'Content-Type' => 'text/plain' },
      [number.even? ? 'even' : 'odd']
    ]
  end

run app

이 웹 앱을 실행하려면 rack gem을 설치하고 rackup config.ru를 실행해야 합니다. .

또한 모의 데이터 저장소가 필요합니다. 다음은 키-값 데이터베이스를 시뮬레이션하는 클래스입니다.

class Datastore
  # ... accessors and initialization omitted ...
  def read(key)
    data[key]
  end

  def write(key, value)
    data[key] = value
  end
end

이제 동시 솔루션의 구현을 살펴보겠습니다. run 메소드가 있습니다. , 동시에 1,000개의 레코드를 가져와 데이터 저장소에 저장합니다.

class ThreadPoweredIntegration
  # ... accessors and initialization ...
  def run
    threads = []
    (1..1000).each_slice(250) do |subset|
      threads << Thread.new do
        subset.each do |number|
          uri = 'https://localhost:9292/' \
            "even_or_odd?number=#{number}"
          status, body = AdHocHTTP.new(uri).blocking_get
          handle_response(status, body)
        rescue Errno::ETIMEDOUT
          retry # Try again if the server times out.
        end
      end
    end
    threads.each(&:join)
  end
  # ...
end

각각 250개의 레코드를 처리하는 4개의 스레드를 만듭니다. 타사 API 또는 자체 시스템을 압도하지 않기 위해 이 전략을 사용합니다.

여러 스레드를 사용하여 요청을 동시에 수행함으로써 전체 실행은 순차 구현에 소요되는 시간의 일부를 차지합니다. 각 스레드는 HTTP 요청을 통해 설정하고 통신하는 데 필요한 모든 단계에서 비활성 순간이 있지만 Ruby VM에서는 다른 스레드가 실행을 시작할 수 있습니다. 이것이 이 구현이 순차 구현보다 훨씬 빠른 이유입니다.

<블록 인용>

AdHocHTTP 클래스는 스레드로 구동되는 코드와 섬유로 구동되는 코드 간의 차이점에만 집중할 수 있도록 이 기사를 위해 특별히 구현된 간단한 HTTP 클라이언트입니다. 구현에 대해 논의하는 것은 이 문서의 범위를 벗어나지만, 궁금한 경우 여기에서 확인할 수 있습니다.

마지막으로 내부 루프가 끝날 때까지 서버의 응답을 처리합니다. handle_response 메소드는 다음과 같습니다. 외모:

# ... inside the ThreadPoweredIntegration class ...

attr_reader :ds

def initialize
  @ds = Datastore.new(even: 0, odd: 0)
end

# ...

def handle_response(status, body)
  return if status != '200'
  key = body.to_sym
  curr_count = ds.read(key)
  ds.write(key, curr_count + 1)
end

이 방법이 괜찮아 보이죠? 실행하여 데이터 저장소에서 어떤 결과가 나오는지 살펴보겠습니다.

{ even: 497, odd: 489 }

1과 1000 사이에 500개의 짝수와 500개의 홀수가 있다고 확신하기 때문에 이것은 꽤 이상합니다. 다음 섹션에서는 무슨 일이 일어나고 있는지 이해하고 이 버그를 해결하는 방법 중 하나를 간략하게 살펴보겠습니다.

스레드 및 데이터 경쟁:악마는 세부 사항에 있습니다

스레드를 사용하면 IO가 많은 프로그램을 훨씬 더 빠르게 실행할 수 있지만 제대로 하기도 어렵습니다. 위 결과의 오류는 handle_response의 경쟁 조건으로 인해 발생합니다. 방법. 경쟁 조건은 두 스레드가 동일한 데이터를 조작할 때 발생합니다.

공유 리소스(ds datastore 객체), 우리는 원자가 아닌 작업에 특히 주의해야 합니다. 먼저 데이터 저장소에서 읽고 두 번째 명령문에서 1씩 증가된 개수를 기록합니다. 이는 읽기 후 쓰기 전에 스레드 실행이 중지될 수 있기 때문에 문제가 됩니다. 그런 다음 다른 스레드가 실행되고 관심 있는 키 값을 증가시키면 원래 스레드가 재개될 때 오래된 카운트를 작성합니다.

<블록 인용>

스레드 사용의 위험을 완화하는 한 가지 방법은 상위 수준 추상화를 사용하여 동시 구현을 구조화하는 것입니다. 사용할 다양한 패턴과 보다 안전한 스레드 기반 프로그램에 대해서는 동시 루비 보석을 확인하십시오.

데이터 경쟁을 해결하는 방법에는 여러 가지가 있습니다. 간단한 해결책은 뮤텍스를 사용하는 것입니다. 이 동기화 메커니즘은 주어진 코드 세그먼트에 한 번에 하나씩 액세스하도록 합니다. 다음은 뮤텍스를 사용하여 수정된 이전 구현입니다.

# ... inside ThreadPoweredIntegration class ...
def initialize
  # ...
  @semaphore = Mutex.new
end
# ...
def handle_response(status, body)
  return if status != '200'
  key = body.to_sym
  semaphore.synchronize do
    curr_count = ds.read(key)
    ds.write(key, curr_count + 1)
  end
end

<블록 인용>

Rails 애플리케이션 내에서 스레드를 사용하려는 경우 공식 가이드 Rails의 스레딩 및 코드 실행 필독서이다. 이 지침을 따르지 않으면 데이터베이스 연결 누출과 같은 매우 불쾌한 결과가 발생할 수 있습니다.

수정된 구현을 실행한 후 예상 결과를 얻습니다.

{ even: 500, odd: 500 }

뮤텍스를 사용하는 대신 스레드를 완전히 삭제하고 Ruby에서 사용할 수 있는 다른 동시성 도구에 도달하여 데이터 경쟁을 제거할 수도 있습니다. 다음 섹션에서는 IO가 많은 앱의 성능을 개선하기 위한 메커니즘으로 Fiber를 살펴보겠습니다.

Fiber:동시성을 위한 슬림한 도구

Ruby Fibers를 사용하면 단일 스레드 내에서 협력적 동시성을 달성할 수 있습니다. 이것은 광섬유가 선점되지 않고 프로그램 자체에서 스케줄링을 수행해야 함을 의미합니다. 광섬유가 시작하고 멈출 때 프로그래머가 제어하기 때문에 경쟁 조건을 피하기가 훨씬 쉽습니다.

스레드와 달리 파이버는 IO가 발생할 때 더 나은 성능을 제공하지 않습니다. 다행히 Ruby는 IO 클래스를 통해 비동기식 읽기 및 쓰기를 제공합니다. 이러한 비동기 방식을 사용하여 IO 작업이 광섬유 기반 코드를 차단하는 것을 방지할 수 있습니다.

동일한 시나리오, 이제 Fiber 사용

같은 예를 살펴보겠습니다. 하지만 이제 Ruby IO 클래스의 비동기 기능과 결합된 파이버를 사용합니다. Ruby의 비동기 IO에 대한 모든 세부 정보를 설명하는 것은 이 기사의 범위를 벗어납니다. 그래도 작동의 필수 부분을 다룰 것이며 궁금한 점이 있으면 AdHocHTTP의 관련 메서드(방금 탐색한 스레드 솔루션에 나타나는 동일한 클라이언트)의 구현을 살펴볼 수 있습니다.

run부터 살펴보겠습니다. 광섬유 구동 방식:

class FiberPoweredIntegration
  # ... accessors and initialization ...
  def run
    (1..1000).each_slice(250) do |subset|
      Fiber.new do
        subset.each do |number|
          uri = 'https://127.0.0.1:9292/' \
            "even_or_odd?number=#{number}"
          client = AdHocHTTP.new(uri)
          socket = client.init_non_blocking_get
          yield_if_waiting(client,
                           socket,
                           :connect_non_blocking_get)
          yield_if_waiting(client,
                           socket,
                           :write_non_blocking_get)
          status, body =
            yield_if_waiting(client,
                             socket,
                             :read_non_blocking_get)
          handle_response(status, body)
        ensure
          client&.close_non_blocking_get
        end
      end.resume
    end

    wait_all_requests
  end
  # ...
end

먼저 짝수인지 홀수인지 확인하려는 숫자의 각 하위 집합에 대해 섬유를 만듭니다.

그런 다음 yield_if_waiting을 호출하여 숫자를 반복합니다. . 이 방법은 현재 광섬유를 중지하고 다른 광섬유가 다시 시작되도록 합니다.

또한 Fiber를 생성한 후 resume을 호출합니다. . 이로 인해 광섬유가 실행되기 시작합니다. resume 호출 생성 직후 1에서 1000으로 가는 메인 루프가 끝나기도 전에 HTTP 요청을 하기 시작합니다.

run이 끝나면 메소드, wait_all_requests에 대한 호출이 있습니다. . 이 방법은 실행할 준비가 된 광섬유를 선택하고 의도한 모든 요청을 보장합니다. 이 섹션의 마지막 부분에서 살펴보겠습니다.

이제 yield_if_waiting을 살펴보겠습니다. 자세히:

# ... inside FiberPoweredIntegration ...
def initialize
  @ds = Datastore.new(even: 0, odd: 0)
  @waiting = { wait_readable: {}, wait_writable: {} }
end
# ...
def yield_if_waiting(client, socket, operation)
  res_or_status = client.send(operation)
  is_waiting =
    [:wait_readable,
     :wait_writable].include?(res_or_status)
  return res_or_status unless is_waiting

  waiting[res_or_status][socket] = Fiber.current
  Fiber.yield
  waiting[res_or_status].delete(socket)
  yield_if_waiting(client, socket, operation)
rescue Errno::ETIMEDOUT
  retry # Try again if the server times out.
end

먼저 클라이언트를 사용하여 작업(연결, 읽기 또는 쓰기)을 수행하려고 합니다. 두 가지 주요 결과가 가능합니다.

  • 성공: 그럴 때 우리는 돌아옵니다.
  • 심볼을 받을 수 있습니다. 즉, 기다려야 합니다.

어떻게 "대기"합니까?

  1. 현재 광섬유와 결합된 소켓을 인스턴스 변수 waiting에 추가하여 일종의 체크포인트를 생성합니다. (이것은 Hash ).
  2. 우리는 이 쌍을 클라이언트로부터 받은 결과에 따라 읽기 또는 쓰기를 기다리는 IO를 보유하는 컬렉션에 저장합니다(잠시 이것이 중요한 이유를 알게 될 것입니다).
  3. 현재 광섬유의 실행을 중지하고 다른 광섬유가 실행되도록 합니다. 일시 중지된 광섬유는 연결된 네트워크 소켓이 준비된 후 어느 시점에서 작업을 재개할 기회를 얻습니다. 그런 다음 IO 작업이 다시 시도됩니다(이번에는 성공함).
<블록 인용>

모든 Ruby 프로그램은 그 자체가 스레드(프로세스 내부의 모든 것)의 일부인 섬유 내부에서 실행됩니다. 결과적으로 첫 번째 파이버를 만들고 실행한 다음 어느 시점에서 생성하면 프로그램의 중앙 부분 실행을 다시 시작합니다.

이제 광섬유가 IO를 기다리고 있을 때 실행을 생성하는 데 사용되는 메커니즘을 이해했으므로 이 광섬유 구동 구현을 이해하는 데 필요한 마지막 부분을 살펴보겠습니다.

def wait_all_requests
  while(waiting[:wait_readable].any? ||
        waiting[:wait_writable].any?)

    ready_to_read, ready_to_write =
      IO.select(waiting[:wait_readable].keys,
                waiting[:wait_writable].keys)

    ready_to_read.each do |socket|
      waiting[:wait_readable][socket].resume
    end

    ready_to_write.each do |socket|
      waiting[:wait_writable][socket].resume
    end
  end
end

여기서 주요 아이디어는 보류 중인 모든 IO 작업이 완료될 때까지 대기(즉, 루프)하는 것입니다.

이를 위해 IO.select를 사용합니다. . 보류 중인 IO 개체의 두 가지 컬렉션을 허용합니다. 하나는 읽기용이고 다른 하나는 쓰기용입니다. 작업을 마친 IO 개체를 반환합니다. 이러한 IO 개체를 실행을 담당하는 파이버와 연결했기 때문에 해당 파이버를 다시 시작하는 것이 간단합니다.

모든 요청이 실행되고 완료될 때까지 이 단계를 계속 반복합니다.

그랜드 피날레:비교할 수 있는 성능, 잠금 장치가 필요 없음

handle_response 메소드는 스레드를 사용하는 코드(뮤텍스가 없는 버전)에서 처음 사용된 것과 정확히 동일합니다. 그러나 모든 섬유가 동일한 스레드 내에서 실행되기 때문에 데이터 경합이 발생하지 않습니다. 코드를 실행하면 예상 결과를 얻습니다.

{ even: 500, odd: 500 }
<블록 인용>

비동기 IO를 활용할 때마다 모든 파이버 스위칭 비즈니스를 처리하고 싶지는 않을 것입니다. 다행히도 일부 보석은 이 모든 작업을 추상화하고 개발자가 생각할 필요가 없는 섬유 사용을 만듭니다. async 프로젝트를 시작하는 것이 좋습니다.

높은 확장성이 필수인 경우 섬유는 빛을 발합니다.

소규모 시나리오에서도 데이터 경쟁의 위험을 사실상 제거하는 이점을 얻을 수 있지만 광섬유는 높은 확장성이 필요할 때 훌륭한 도구입니다. 섬유는 실보다 훨씬 가볍습니다. 사용 가능한 동일한 리소스가 주어지면 스레드를 만드는 것이 파이버보다 훨씬 빨리 시스템을 압도할 것입니다. 주제에 대한 훌륭한 탐구를 위해 백만을 향한 여정 프레젠테이션을 추천합니다. Ruby Core Team의 Samuel Williams 작성

길드 - Ruby의 병렬 프로그래밍

지금까지 Ruby에서 동시성을 위한 두 가지 유용한 도구를 보았습니다. 그러나 둘 다 순수 계산의 성능을 향상시킬 수 없습니다. 이를 위해서는 현재 Ruby에 없는 진정한 병렬 처리가 필요합니다(여기서는 기본 구현인 MRI를 고려하고 있습니다).

이것은 "길드"라는 새로운 기능이 추가되면서 Ruby 3에서 변경될 수 있습니다. 세부 사항은 아직 모호하지만 다음 섹션에서는 이 진행 중인 기능이 Ruby에서 병렬 처리를 허용하는 방법을 살펴보겠습니다.

길드의 작동 방식

동시/병렬 솔루션을 구현할 때 가장 큰 고통을 주는 원인은 공유 메모리입니다. 스레드 섹션에서 우리는 이미 실수를 하고 언뜻 보기에 무해해 보이지만 실제로는 미묘한 버그가 포함된 코드를 작성하는 것이 얼마나 쉬운지 보았습니다.

새로운 Guild 기능의 개발을 이끄는 Ruby Core Team 멤버인 Koichi Sasada는 여러 스레드 간에 메모리를 공유하는 위험에 맞서는 솔루션을 설계하기 위해 열심히 노력하고 있습니다. 2018 RubyConf에서의 프레젠테이션에서 그는 길드를 사용할 때 변경 가능한 개체를 단순히 공유할 수 없다고 설명합니다. 주요 아이디어는 다른 길드 간에 변경할 수 없는 개체만 공유하도록 허용하여 데이터 경쟁을 방지하는 것입니다.

길드 간의 공유 메모리를 어느 정도 허용하기 위해 Ruby에 특수 데이터 구조가 도입될 예정이지만 이것이 정확히 어떻게 작동하는지에 대한 세부 사항은 아직 완전히 구체화되지 않았습니다. 길드 간에 개체를 복사하거나 이동할 수 있는 API와 다른 길드로 이동한 개체가 참조되지 않도록 하는 보호 장치도 있습니다.

길드를 사용하여 공통 시나리오 탐색

병렬로 실행하여 계산 속도를 높이고 싶은 경우가 많이 있습니다. 동일한 데이터 세트의 평균과 평균을 계산해야 한다고 가정해 보겠습니다.

아래 예는 길드에서 이를 수행하는 방법을 보여줍니다. 이 코드는 현재 작동하지 않으며 길드가 해제된 후에도 작동하지 않을 수 있습니다.

# A frozen array of numeric values is an immutable object.
dataset = [88, 43, 37, 85, 84, 38, 13, 84, 17, 87].freeze
# The overhead of using guilds will probably be
# considerable, so it will only make sense to
# parallelize work when a dataset is large / when
# performing lots of operations.

g1 = Guild.new do
  mean = dataset.reduce(:+).fdiv(dataset.length)
  Guild.send_to(:mean, Guild.parent)
end

g2 = Guild.new do
  median = Median.calculate(dataset.sort)
  Guild.send_to(:median, Guild.parent)
end

results = {}
# Every Ruby program will be run inside a main guild;
# therefore, we can also receive messages in the main
# section of our program.
Guild.receive(:mean, :median) do |tag, result|
  results[tag] = result
end

요약

동시성과 병렬성은 Ruby의 주요 강점은 아니지만 이 부서에서도 언어는 대부분의 사용 사례를 처리하기에 충분한 도구를 제공합니다. Ruby 3가 출시되고 Guild primitive가 도입되면 상황이 상당히 좋아질 것 같습니다. 제 생각에 Ruby는 여전히 많은 상황에서 매우 적합한 선택이며 커뮤니티는 언어를 더욱 향상시키기 위해 열심히 노력하고 있습니다. 앞으로의 소식에 귀를 기울이십시오!