Computer >> 컴퓨터 >  >> 프로그램 작성 >> Ruby

Kafka를 사용한 Rails의 이벤트 스트리밍

기업은 인사이트를 얻고 보다 매력적인 고객 경험을 만들기 위해 실시간으로 대량의 데이터를 처리 및 공유해야 하는 필요성에 신속하게 대응하기를 원합니다. 따라서 오늘날의 세계에서는 기존 데이터 처리가 더 이상 실행 가능하지 않습니다.

이를 달성하려면 많은 데이터를 가능한 한 빨리 처리한 다음 추가 처리를 위해 다른 서비스로 보내야 합니다. 그러나 이러한 모든 빠른 작업의 중간에 이벤트가 발생하면 소비자에게 알리는 것이 필요하며 이벤트 스트리밍을 사용하여 이를 수행할 수 있습니다.

이것은 우리가 사용할 GitHub의 저장소입니다.

이벤트

이벤트 스트리밍에 대해 이야기하기 전에 이벤트가 무엇인지 이야기합시다. 애플리케이션 내에서 발생하는 이벤트는 사용자 프로세스 또는 단순히 비즈니스에 영향을 미치는 작업과 관련될 수 있습니다.

이벤트는 응용 프로그램을 수정하는 방법에 대한 질문이 아니라 상태 변경을 나타냅니다. 다음을 예로 고려하십시오.

  • 서비스에 로그인하는 사용자
  • 결제 거래
  • 블로그에 글을 게시하는 작가

대부분의 경우 이벤트는 더 많은 이벤트를 트리거합니다. 예를 들어 사용자가 서비스에 가입하면 앱이 기기에 알림을 보내고 데이터베이스에 레코드를 삽입한 다음 환영 이메일을 보냅니다.

이벤트 스트리밍

이벤트 스트리밍 데이터베이스와 같은 이벤트 소스에서 실시간으로 데이터를 캡처하기 위한 패턴입니다. 이벤트 스트리밍의 주요 부분은 다음과 같습니다.

  • 브로커 :이벤트 저장을 담당하는 시스템
  • 주제 :이벤트 카테고리
  • 프로듀서 :특정 주제에 대해 브로커에게 이벤트 전송
  • 소비자 :이벤트 읽기
  • 이벤트 :생산자가 소비자에게 전달하고 싶은 데이터

게시 및 구독 아키텍처 패턴(게시/구독 패턴)에 대해 이야기하는 것은 불가피합니다. 이 지점에서; 이벤트 스트리밍은 해당 패턴의 구현이지만 다음과 같이 변경되었습니다.

  • 메시지 대신 이벤트가 발생합니다.
  • 이벤트는 일반적으로 시간순으로 정렬됩니다.
  • 소비자는 주제의 특정 지점에서 이벤트를 읽을 수 있습니다.
  • 이벤트에는 일시적인 지속성이 있습니다.

흐름은 생산자가 새 이벤트 게시 주제로 (이전에 보았듯이 주제는 특정 유형의 이벤트에 대한 분류일 뿐입니다). 그런 다음 소비자 특정 카테고리의 이벤트에 관심이 있는 사용자는 해당 주제를 구독합니다. 마지막으로 브로커 주제의 소비자를 식별하고 원하는 이벤트를 사용할 수 있도록 합니다.

이벤트 스트리밍의 장점

  • 분리 게시자와 소비자는 서로를 알 필요가 없기 때문에 종속성이 없습니다. 또한 이벤트는 자신의 작업을 지정하지 않으므로 많은 소비자가 동일한 이벤트를 받고 다른 작업을 수행할 수 있습니다.

  • 낮은 지연 시간 이벤트는 분리되어 소비자가 언제든지 사용할 수 있습니다. 밀리초 내에 발생할 수 있습니다.

  • 독립성 알다시피 게시자와 소비자는 독립적이므로 다른 팀에서 다른 작업이나 목적을 위해 동일한 이벤트를 사용하여 함께 작업할 수 있습니다.

  • 내결함성 일부 이벤트 스트리밍 플랫폼은 소비자 실패를 처리하는 데 도움이 됩니다. 예를 들어 소비자는 자신의 위치를 ​​저장하고 오류가 발생하면 거기에서 다시 시작할 수 있습니다.

  • 실시간 처리 피드백은 실시간으로 수신되므로 사용자는 이벤트 응답을 보기 위해 몇 분 또는 몇 시간을 기다릴 필요가 없습니다.

  • 고성능 이벤트 플랫폼은 짧은 대기 시간으로 인해 많은 메시지를 처리할 수 있습니다(예:1초에 수천 개의 이벤트).

이벤트 스트리밍의 단점

  • 모니터링 일부 이벤트 스트리밍 도구에는 완전한 모니터링 도구가 없습니다. Datadog 또는 New Relic과 같은 추가 도구를 구현해야 합니다.

  • 구성 일부 도구의 구성은 숙련된 사람에게도 압도적일 수 있습니다. 많은 매개변수가 있으며 때로는 이를 구현하기 위해 주제에 대해 깊이 알아야 할 필요가 있습니다.

  • 클라이언트 라이브러리 Java 이외의 언어로 Kafka를 구현하는 것은 쉽지 않습니다. 경우에 따라 클라이언트 라이브러리가 최신 상태가 아니거나 불안정하거나 선택할 수 있는 대안이 많지 않습니다.

이벤트 스트리밍을 위한 가장 인기 있는 도구 중 하나는 Apache Kafka입니다. . 이 도구를 사용하면 사용자가 필요할 때마다 데이터를 보내고, 저장하고, 요청할 수 있습니다. 그것에 대해 이야기합시다.

아파치 카프카

"Apache Kafka는 고성능 데이터 파이프라인, 스트리밍 분석, 데이터 통합 ​​및 미션 크리티컬 애플리케이션을 위해 수천 개의 회사에서 사용하는 오픈 소스 분산 이벤트 스트리밍 플랫폼입니다."

실시간 로그 전송을 위해 특별히 설계된 Apache Kafka는 다음이 필요한 애플리케이션에 이상적입니다.

  • 서로 다른 구성 요소 간의 안정적인 데이터 교환
  • 애플리케이션 요구 사항의 변화에 ​​따라 메시징 워크로드를 분할하는 기능
  • 데이터 처리를 위한 실시간 전송

Rails 애플리케이션에서 Kafka를 사용합시다!

레일과 함께 Kafka 사용

Ruby에서 Kafka를 사용하는 가장 유명한 보석은 Zendesk에서 ruby-kafka라고 하며 훌륭합니다! 그래도 모든 구현을 수동으로 수행해야 하므로 ruby-kafka로 빌드된 "프레임워크"가 있습니다. 또한 모든 구성 및 실행 단계를 도와줍니다.

Karafka는 Apache Kafka 기반 Ruby 애플리케이션 개발을 단순화하는 데 사용되는 프레임워크입니다.

Kafka를 사용하려면 Java를 설치해야 합니다. Kafka는 Scala 및 Java 애플리케이션이기도 하므로 Zookeeper를 설치해야 합니다.

설치에 앞서 Zookeeper에 대해 간략히 설명하겠습니다. Zookeeper는 Kafka에 필수적인 중앙 집중식 서비스입니다. 새 토픽 생성, 브로커 크래시, 브로커 제거, 토픽 삭제 등과 같은 변경 사항이 있는 경우 알림을 보냅니다.

주요 임무는 Kafka 브로커를 관리하고 해당 메타데이터가 포함된 목록을 유지하며 상태 확인 메커니즘을 용이하게 하는 것입니다. 또한 주제의 다양한 파티션에 대한 주요 중개인을 선택하는 데 도움이 됩니다.

요구사항

MacOS의 경우:

이제 다음 명령으로 Java와 Zookeeper를 설치해 보겠습니다.

brew install java
brew install zookeeper

그런 다음 다음을 실행하여 Kafka를 계속 설치할 수 있습니다.

brew install kafka

Kafka와 Zookeeper를 설치했으면 다음과 같이 서비스를 시작해야 합니다.

brew services start zookeeper
brew services start kafka

Windows 및 Linux의 경우:

지침:

  1. 자바 설치
  2. Zookeeper 다운로드

레일 설정

평소와 같이 간단한 Rails 애플리케이션을 생성하면 됩니다.

rails new karafka_example

Gemfile 내에 karafka gem을 추가합니다:

gem 'karafka'

그런 다음 bundle install를 실행합니다. 최근에 추가된 gem을 설치하고 모든 Karafka 항목을 가져오려면 다음 명령을 실행하는 것을 잊지 마십시오.

bundle exec karafka install

이 명령은 몇 가지 흥미로운 파일을 생성해야 합니다. 첫 번째 파일은 karafka.rb입니다. 루트 디렉토리, app/consumers/application_consumer.rbapp/responders/application_responder.rb .

카라프카 이니셜라이저

karafka.rb 파일은 Rails 구성에서 분리된 초기화 응용 프로그램과 같습니다. 이를 통해 Karafka 애플리케이션을 구성하고 API 측면에서 Rails 애플리케이션 경로와 유사한 일부 경로를 그릴 수 있습니다. 하지만 여기서는 주제와 소비자를 위한 것입니다.

프로듀서

제작자 이벤트 생성을 담당하며 app/responders에 추가할 수 있습니다. 폴더. 이제 사용자를 위한 간단한 생산자를 만들어 보겠습니다.

# app/responders/users_responder.rb

class UsersResponder < ApplicationResponder
  topic :users

  def respond(event_payload)
    respond_to :users, event_payload
  end
end

소비자

소비자 생산자가 보낸 모든 이벤트/메시지를 읽을 책임이 있습니다. 이것은 수신된 메시지를 기록하는 소비자일 뿐입니다.

# app/consumers/users_consumer.rb

class UsersConsumer < ApplicationConsumer
  def consume
    Karafka.logger.info "New [User] event: #{params}"
  end
end

params를 사용합니다. 이벤트를 받기 위해. 그러나 일괄 처리로 이벤트를 읽고 구성 config.batch_fetching이 있는 경우 true인 경우 params_batch를 사용해야 합니다. .

테스트

Karafka 서비스(이벤트를 수신할 서비스)를 실행하려면 콘솔로 이동하여 새 탭을 열고 Rails 프로젝트로 이동하여 다음을 실행합니다.

bundle exec karafka server

성공적인 이벤트

이제 다른 콘솔 탭을 열고 Rails 프로젝트로 이동하여 다음을 입력합니다.

rails c

이제 응답자와 함께 이벤트를 생성해 보겠습니다.

> UsersResponder.call({ event_name: "user_created", payload: { user_id: 1 } })

Rails 콘솔을 확인하면 이벤트가 생성된 후 다음 메시지를 받게 됩니다.

Successfully appended 1 messages to users/0 on 192.168.1.77:9092 (node_id=0)
=> {"users"=>[["{\"event_name\":\"user_created\",\"payload\":{\"user_id\":1}}", {:topic=>"users"}]]}

Karafka 서비스 탭에 다음과 같은 내용이 표시됩니다.

New [User] event: #<Karafka::Params::Params:0x00007fa76f0316c8>
Inline processing of topic users with 1 messages took 0 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:1 as processed
[[karafka_example] {}:] Committing offsets: users/0:2
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 28 to 192.168.1.77:9092

그러나 메시지 페이로드만 원하면 params.payload를 추가할 수 있습니다. 소비자에 다음과 같은 내용이 표시됩니다.

Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "payload"=>{"user_id"=>1}}
Inline processing of topic users with 1 messages took 1 ms
1 message on users topic delegated to UsersConsumer

실패한 이벤트

email과 같은 속성으로 사용자 모델을 생성할 수 있습니다. , first_namelast_name 다음 명령 실행:

rails g model User email first_name last_name

그런 다음 다음을 사용하여 마이그레이션을 실행할 수 있습니다.

rails db:migrate

이제 다음과 같은 유효성 검사를 추가합니다.

class User < ApplicationRecord
  validates :email, uniqueness: true
end

마지막으로 소비자를 변경할 수 있습니다.

class UsersConsumer < ApplicationConsumer
  def consume
    Karafka.logger.info "New [User] event: #{params.payload}"
    User.create!(params.payload['user'])
  end
end

동일한 이메일로 두 개의 이벤트를 생성해 보겠습니다.

UsersResponder.call({ event_name: "user_created", user: { user_id: 1, email: '[email protected]', first_name: 'Bruce', last_name: 'Wayne' } } )

UsersResponder.call({ event_name: "user_created", user: { user_id: 2, email: '[email protected]', first_name: 'Bruce', last_name: 'Wayne' } } )

이를 통해 데이터베이스에 첫 번째 이벤트가 생성됩니다.

New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>1, "email"=>"[email protected]", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
  TRANSACTION (0.1ms)  BEGIN
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  User Create (9.6ms)  INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id"  [["user_id", "1"], ["email", "[email protected]"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:29:14.827778"], ["updated_at", "2021-03-10 04:29:14.827778"]]
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  TRANSACTION (5.0ms)  COMMIT
  ↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 70 ms
1 message on users topic delegated to UsersConsumer

그러나 이메일이 고유하다는 유효성 검사가 있기 때문에 두 번째 이메일은 실패합니다. 기존 이메일로 다른 레코드를 추가하려고 하면 다음과 같이 표시됩니다.

New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"[email protected]", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
  TRANSACTION (0.2ms)  BEGIN
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  User Exists? (0.3ms)  SELECT 1 AS one FROM "users" WHERE "users"."email" = $1 LIMIT $2  [["email", "[email protected]"], ["LIMIT", 1]]
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  TRANSACTION (0.2ms)  ROLLBACK
  ↳ app/consumers/users_consumer.rb:14:in `consume'
[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42 -- ActiveRecord::RecordInvalid: Validation failed: Email has already been taken

마지막 줄에서 오류를 볼 수 있습니다. ActiveRecord::RecordInvalid: Validation failed: Email has already been taken . 그러나 여기서 흥미로운 점은 Kafka가 계속해서 이벤트를 처리하려고 시도한다는 것입니다. Karafka 서버를 다시 시작해도 마지막 이벤트 처리를 시도합니다. Kafka는 어디서부터 시작해야 하는지 어떻게 압니까?

콘솔이 표시되면 오류 후에 다음이 표시됩니다.

[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42

어떤 오프셋이 처리되었는지 알려줍니다. 이 경우 오프셋 42였습니다. 따라서 Karafka 서비스를 다시 시작하면 해당 오프셋에서 시작됩니다.

[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {users: 0}:] Fetching batches

사용자 모델에 이메일 유효성 검사가 있기 때문에 여전히 실패합니다. 이 시점에서 Karafka 서버를 중지하고 해당 유효성 검사를 제거하거나 주석을 달고 서버를 다시 시작하십시오. 이벤트가 성공적으로 처리되는 방법을 확인할 수 있습니다.

[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 5 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 5 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 5 from 192.168.1.77:9092
Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"[email protected]", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
  TRANSACTION (0.2ms)  BEGIN
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  User Create (3.8ms)  INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id"  [["user_id", "2"], ["email", "[email protected]"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:49:37.832452"], ["updated_at", "2021-03-10 04:49:37.832452"]]
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  TRANSACTION (5.5ms)  COMMIT
  ↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 69 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:43 as processed

마지막으로 마지막 줄에서 다음 메시지를 볼 수 있습니다. Marking users/0:43 as processed .

콜백

이것은 Karafka가 제공하는 멋진 것입니다. 소비자에서 콜백을 사용할 수 있습니다. 그렇게 하려면 모듈을 가져와서 사용하기만 하면 됩니다. 그런 다음 UserConsumer를 엽니다. 다음을 추가하십시오:

class UsersConsumer < ApplicationConsumer
  include Karafka::Consumers::Callbacks

  before_poll do
    Karafka.logger.info "*** Checking something new for #{topic.name}"
  end

  after_poll do
    Karafka.logger.info '*** We just checked for new messages!'
  end

  def consume
    Karafka.logger.info "New [User] event: #{params.payload}"
    User.create!(params.payload['user'])
  end
end

Poll은 현재 파티션 오프셋을 기반으로 레코드를 가져오는 매체입니다. 따라서 이러한 콜백은 before_pollafter_poll , 이름에서 알 수 있듯이 그 순간에 실행됩니다. 우리는 메시지를 기록하고 있으며 Karafka 서버에서 메시지를 볼 수 있습니다. 하나는 가져오기 전이고 다른 하나는 그 이후입니다.

*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 325 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 326 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 326 from 192.168.1.77:9092
*** We just checked for new messages!

하트비트

하트비트는 소비자인 우리가 Kafka에게 우리가 살아 있다고 말하는 방식일 뿐입니다. 그렇지 않으면 Kafka는 소비자가 죽었다고 가정합니다.

Karafka에는 일정 기간 동안 이 작업을 수행하는 기본 구성이 있습니다. kafka.heartbeat_interval입니다. 기본값은 10초입니다. Karafka 서버에서 이 하트비트를 볼 수 있습니다.

*** Checking something new for users
[[karafka_example_example] {}:] Sending heartbeat...
[[karafka_example_example] {}:] [heartbeat] Sending heartbeat API request 72 to 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Waiting for response 72 from 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Received response 72 from 192.168.1.77:9092
*** We just checked for new messages!

Sending heartbeat... , Kafka는 우리가 살아 있고 우리가 소비자 그룹의 유효한 구성원이라는 것을 알고 있습니다. 또한 더 많은 레코드를 사용할 수 있습니다.

커밋

오프셋을 소비된 것으로 표시하는 것을 오프셋 커밋이라고 합니다. Kafka에서는 오프셋 토픽이라는 내부 Kafka 토픽에 작성하여 오프셋 커밋을 기록합니다. 오프셋이 오프셋 주제에 커밋된 경우에만 메시지가 소비된 것으로 간주됩니다.

Karafka는 매번 이 커밋을 자동으로 수행하는 설정을 가지고 있습니다. 구성은 kafka.offset_commit_interval입니다. , 그 값은 기본적으로 10초입니다. 이를 통해 Karakfa는 10초마다 오프셋 커밋을 수행하고 Karafka 서버에서 해당 메시지를 볼 수 있습니다.

*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 307 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 308 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 308 from 192.168.1.77:9092
[[karafka_example] {}:] Committing offsets: users/0:44
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 69 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 69 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 69 from 192.168.1.77:9092
*** We just checked for new messages!

Committing offsets: users/0:44 커밋하는 오프셋을 알려주십시오. 제 경우에는 주제 0에서 오프셋 번호 44를 커밋할 수 있다고 Kafka에 알렸습니다. 이렇게 하면 서비스에 문제가 발생하면 Karafka가 해당 오프셋에서 이벤트를 처리하기 위해 다시 시작할 수 있습니다.

결론

이벤트 스트리밍은 더 빠르고 데이터를 더 잘 활용하며 더 나은 사용자 경험을 디자인하는 데 도움이 됩니다. 실제로 많은 회사에서 이 패턴을 사용하여 모든 서비스를 전달하고 다양한 이벤트에 실시간으로 대응할 수 있습니다. 앞서 언급했듯이, Karafka 외에 Rails와 함께 사용할 수 있는 다른 대안이 있습니다. 당신은 이미 기본을 가지고 있습니다. 이제 자유롭게 실험해 보세요.

참조

  • https://kafka.apache.org/
  • https://github.com/karafka/karafka
  • https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern