Computer >> 컴퓨터 >  >> 프로그램 작성 >> Ruby

동시성 심층 분석:이벤트 루프

동시성에 관한 시리즈의 마지막 Ruby Magic 기사에 오신 것을 환영합니다. 이전 버전에서는 다중 프로세스와 다중 스레드를 사용하여 채팅 서버를 구현했습니다. 이번에는 이벤트 루프를 사용하여 동일한 작업을 수행합니다.

요약

우리는 이전 기사에서 사용한 것과 동일한 클라이언트와 동일한 서버 설정을 사용할 것입니다. 우리의 목표는 다음과 같은 채팅 시스템을 구축하는 것입니다.

기본 설정에 대한 자세한 내용은 이전 기사를 참조하십시오. 이 문서의 예제에 사용된 전체 소스 코드는 GitHub에서 사용할 수 있으므로 직접 실험해 볼 수 있습니다.

이벤트 루프를 사용하는 채팅 서버

채팅 서버에 이벤트 루프를 사용하려면 스레드나 프로세스를 사용하는 것과는 다른 멘탈 모델이 필요합니다. 고전적인 접근 방식에서 스레드 또는 프로세스는 단일 연결을 처리합니다. 이벤트 루프를 사용하면 여러 연결을 처리하는 단일 프로세스에 단일 스레드가 있습니다. 분해하여 이것이 어떻게 작동하는지 봅시다.

이벤트 루프

예를 들어 EventMachine 또는 NodeJS에서 사용하는 이벤트 루프는 다음과 같이 작동합니다. 우리는 특정 이벤트에 관심이 있는 운영 체제에 알리는 것으로 시작합니다. 예를 들어 소켓에 대한 연결이 열린 경우입니다. 연결 또는 소켓과 같은 일부 IO 개체에 대한 관심을 등록하는 함수를 호출하여 이를 수행합니다.

이 IO 개체에서 어떤 일이 발생하면 운영 체제는 이벤트를 프로그램에 보냅니다. 우리는 이러한 이벤트를 대기열에 넣습니다. 이벤트 루프는 목록에서 이벤트를 계속 팝하고 하나씩 처리합니다.

어떤 의미에서 이벤트 루프는 진정한 동시성이 아닙니다. 효과를 시뮬레이션하기 위해 매우 작은 배치로 순차적으로 작동합니다.

관심을 등록하고 운영 체제가 IO 이벤트를 전달하도록 하려면 Ruby 표준 라이브러리에 이에 대한 API가 없기 때문에 C 확장을 작성해야 합니다. 이에 대해 자세히 알아보는 것은 이 기사의 범위를 벗어나므로 IO.select를 사용하겠습니다. 대신 이벤트를 생성합니다. IO.select IO 배열을 취합니다. 모니터링할 개체. 배열에서 하나 이상의 개체가 읽거나 쓸 준비가 될 때까지 기다렸다가 해당 IO만 포함하는 배열을 반환합니다. 개체.

연결과 관련된 모든 것을 처리하는 코드는 Fiber로 구현됩니다. :이제부터 이 코드를 "핸들러"라고 부를 것입니다. Fiber 일시 중지 및 재개할 수 있는 코드 블록입니다. Ruby VM은 이 작업을 자동으로 수행하지 않으므로 수동으로 재개하고 양보해야 합니다. IO.select의 입력을 사용합니다. 연결이 읽거나 쓸 준비가 되면 핸들러에게 알립니다.

이전 게시물의 스레드 및 다중 프로세스 예제와 마찬가지로 전송된 메시지와 클라이언트를 추적하기 위해 약간의 저장소가 필요합니다. Mutex가 필요하지 않습니다. 이 시간. 이벤트 루프는 단일 스레드에서 실행되므로 다른 스레드에 의해 동시에 개체가 변경될 위험이 없습니다.

client_handlers = {}
messages = []

클라이언트 핸들러는 다음 Fiber에서 구현됩니다. . 소켓에서 읽거나 쓸 수 있는 경우 Fiber 응답합니다. 상태가 :readable인 경우 소켓에서 한 줄을 읽고 이를 messages에 푸시합니다. 정렬. 상태가 :writable인 경우 클라이언트에 대한 마지막 쓰기 이후 다른 클라이언트로부터 받은 모든 메시지를 씁니다. 이벤트를 처리한 후 Fiber.yield를 호출합니다. , 그래서 일시 중지하고 다음 이벤트를 기다립니다.

def create_client_handler(nickname, socket)
  Fiber.new do
    last_write = Time.now
    loop do
      state = Fiber.yield
 
      if state == :readable
        # Read a message from the socket
        incoming = read_line_from(socket)
        # All good, add it to the list to write
        $messages.push(
          :time => Time.now,
          :nickname => nickname,
          :text => incoming
        )
      elsif state == :writable
        # Write messages to the socket
        get_messages_to_send(last_write, nickname, $messages).each do |message|
          socket.puts "#{message[:nickname]}: #{message[:text]}"
        end
        last_write = Time.now
      end
    end
  end
end

그렇다면 Fiber Socket 준비되었다? 4단계로 구성된 이벤트 루프를 사용합니다.

loop do
  # Step 1: Accept incoming connections
  accept_incoming_connections
 
  # Step 2: Get connections that are ready for reading or writing
  get_ready_connections
 
  # Step 3: Read from readable connections
  read_from_readable_connections
 
  # Step 4: Write to writable connections
  write_to_writable_connections
end

여기에는 마법이 없습니다. 이것은 일반적인 Ruby 루프입니다.

1단계:들어오는 연결 수락

새로운 수신 연결이 있는지 확인하십시오. accept_nonblock을 사용합니다. , 클라이언트가 연결할 때까지 기다리지 않습니다. 대신 새 클라이언트가 없으면 오류가 발생하고 해당 오류가 발생하면 이를 포착하고 다음 단계로 이동합니다. 새 클라이언트가 있으면 이에 대한 핸들러를 만들고 이를 clients에 배치합니다. 가게. 소켓 개체를 해당 Hash의 키로 사용합니다. 나중에 클라이언트 핸들러를 찾을 수 있도록 합니다.

begin
  socket = server.accept_nonblock
  nickname = socket.gets.chomp
  $client_handlers[socket] = create_client_handler(nickname, socket)
  puts "Accepted connection from #{nickname}"
rescue IO::WaitReadable, Errno::EINTR
  # No new incoming connections at the moment
end

2단계:읽거나 쓸 준비가 된 연결 가져오기

다음으로 연결이 준비되면 OS에 알려달라고 요청합니다. client_handlers의 키를 전달합니다. 읽기, 쓰기 및 오류 처리를 위한 저장소입니다. 이 키는 1단계에서 수락한 소켓 개체입니다. 이 일이 발생할 때까지 10밀리초를 기다립니다.

readable, writable = IO.select(
  $client_handlers.keys,
  $client_handlers.keys,
  $client_handlers.keys,
  0.01
)

3단계:읽을 수 있는 연결에서 읽기

읽을 수 있는 연결이 있으면 클라이언트 핸들러를 트리거하고 readable로 다시 시작합니다. 상태. Socket IO.select가 반환하는 객체 핸들러 저장소의 키로 사용됩니다.

if readable
  readable.each do |ready_socket|
    # Get the client from storage
    client = $client_handlers[ready_socket]
 
    client.resume(:readable)
  end
end

4단계:쓰기 ​​가능한 연결에 쓰기

쓰기 가능한 연결이 있으면 클라이언트 핸들러를 트리거하고 writable로 다시 시작합니다. 상태.

if writable
  writable.each do |ready_socket|
    # Get the client from storage
    client = $client_handlers[ready_socket]
    next unless client
 
    client.resume(:writable)
  end
end

핸들러를 생성하는 루프에서 이 네 단계를 사용하고 readablewritable 적시에 이 핸들러에서 완전한 기능의 이벤트 채팅 서버를 만들었습니다. 연결당 오버헤드가 거의 없으며 이를 많은 수의 동시 클라이언트로 확장할 수 있습니다.

이 접근 방식은 루프의 틱당 작업량을 작게 유지하는 한 매우 잘 작동합니다. 이는 이벤트 루프가 단일 스레드에서 실행되고 따라서 단일 CPU만 사용할 수 있기 때문에 계산과 관련된 작업에 특히 중요합니다. 프로덕션 시스템에는 이러한 제한을 해결하기 위해 이벤트 루프를 실행하는 여러 프로세스가 있는 경우가 많습니다.

결론

이 세 가지 방법 중 어떤 방법을 사용해야 합니까?

  • 대부분의 앱에서 스레딩은 의미가 있습니다. 가장 간단하게 작업할 수 있는 방법입니다.
  • 장시간 실행되는 스트림으로 동시성이 높은 앱을 실행하는 경우 이벤트 루프를 사용하여 확장할 수 있습니다.
  • 프로세스가 충돌할 것으로 예상되는 경우 가장 강력한 접근 방식인 오래된 다중 프로세스를 선택하세요.

이것으로 동시성에 대한 시리즈를 마칩니다. 전체 요약을 원하시면 원본 마스터링 동시성 기사와 다중 프로세스 및 다중 스레드 사용에 대한 자세한 기사를 확인하십시오.