Computer >> 컴퓨터 >  >> 프로그래밍 >> Redis

Redis 목록 및 TypeScript를 사용하여 사용자 정의 메시지 대기열 구축

자신만의 Message Queue를 만들려고 했지만 문제에 직면한 적이 있습니까? 그렇다면 혼자가 아닙니다. 이 자습서에서는 Redis 목록을 사용하여 처음부터 메시지 대기열을 구축하겠습니다. Redis를 사용하여 메시지 대기열을 구성하는 방법에는 스트림, 목록, 게시/구독 등 여러 가지가 있지만 가장 간단하고 직접적인 접근 방식인 목록에 중점을 둘 것입니다. 이 실용적인 가이드를 자세히 알아보는 데 참여해 보세요.

사용할 항목

  • 업스태시
  • 한 쌍의 손

필요한 것

  • 한 쌍의 손

Upstash Redis 설정

먼저 Redis 인스턴스를 설정해 보겠습니다. 이렇게 하려면 Upstash로 이동하여 데이터베이스 만들기를 클릭하세요. .그런 다음 아래로 스크롤하여 클라이언트를 연결하는 데 사용할 연결 문자열을 찾으세요. 여기서 자세히 설명하지는 않겠지만 기본적으로 시작하는 데 필요한 내용입니다.

연결 문자열 예시:

redis://XXXXe@social-XXX-39281.upstash.io:39281

프로젝트 시작

Bun을 사용하여 TypeScript 프로젝트를 시작해 보겠습니다. Node보다 빠르기 때문에 선택하는 것이 아니라 설정하기도 훨씬 쉽습니다. 그리고 네, 그것도 인상적으로 빠릅니다! 🚀

mkdir upstash-mq
cd upstash-mq
 
bun init
> package name (upstash-mq-tutorial): upstash-mq
> entry point (index.ts):
> Done!
 
bun add ioredis

프로젝트 구조

 ┣ 📂src
 ┃ ┣ 📂lua-scripts
 ┃ ┃ ┣ 📜add-job.lua
 ┃ ┃ ┗ 📜remove-job.lua
 ┃ ┣ 📜index.ts
 ┃ ┣ 📜job.ts
 ┃ ┣ 📜queue.ts
 ┃ ┗ 📜utils.ts
 ┣ 📜.env
 ┣ 📜.gitignore
 ┣ 📜README.md
 ┣ 📜bun.lockb
 ┣ 📜index.ts
 ┣ 📜package.json
 ┗ 📜tsconfig.json

Queue의 시각적 표현 다음과 같을 것입니다:

Redis 목록 및 TypeScript를 사용하여 사용자 정의 메시지 대기열 구축

직업

Job 클래스에는 몇 가지 핵심 사항이 필요합니다. 먼저, 각 작업의 상태를 추적해야 합니다. 이는 해당 작업을 처리할지, 다시 시도할지 또는 이미 완료된 경우 다른 곳으로 이동할지 결정하는 데 도움이 됩니다. 각 작업에는 ID와 일부 데이터도 있는데, 이는 훌륭한 사용자 경험을 제공할 수 있도록 일반적이어야 합니다. 마지막으로, 쉬운 관리를 위해 각 작업을 해당 대기열에 연결하고 대기열 이름을 포함해야 합니다.

Job의 백본은 다음과 같습니다. 수업:

type OwnerQueue = {
 redis: Redis;
 queueName: string;
};
export type JobStatuses =
 | "created"
 | "waiting"
 | "active"
 | "succeeded"
 | "failed";
 
export class Job<T> {
 id: string;
 status: JobStatuses;
 config: OwnerQueue;
 data: T;
 
 constructor(ownerConfig: OwnerQueue, data: T, jobId = randomUUID()) {
 this.id = jobId;
 this.status = "created";
 this.data = data;
 this.config = ownerConfig;
 }
}

data를 만들려면 일반적으로 먼저 Job을 만들어야 합니다. 그 자체는 일반적입니다. 나머지는 간단한 방식으로 이어집니다. 각 Job에 대해 별도의 Redis 인스턴스를 생성할 수 있습니다. 요소이지만 이를 관리하는 것은 복잡할 것입니다.

다행스럽게도 우리의 접근 방식을 사용하면 대기열 내에서 Redis 인스턴스를 쉽게 구성할 수 있으며 필요에 따라 이 인스턴스를 간단히 전달할 수 있습니다. queueName에도 동일한 원칙이 적용됩니다. . 작업을 대기열에 저장하는 데 자주 사용하므로 작업은 상위 대기열을 인식해야 합니다. 작업을 대기열에 저장하려면 두 가지가 필요합니다. Redis와 상호 작용하는 Lua 스크립트 및 일부 유틸리티입니다.

먼저 유틸리티를 만들어 보겠습니다:

import { JobStatuses } from "./job";
 
const MQ_PREFIX = "UpstashMQ";
 
export const formatMessageQueueKey = (queueName: string, key: string) => {
 return `${MQ_PREFIX}:${queueName}:${key}`;
};
 
export const convertToJSONString = <T>(
 data: T,
 status: JobStatuses,
): string => {
 return JSON.stringify({
 data,
 status,
 });
};

Redis를 사용할 때마다 수동으로 대기열 이름을 만드는 것은 이상적이지 않으므로 formatMessageQueueKey이라는 유틸리티를 만들었습니다. .이 유틸리티는 단순히 문자열을 연결합니다. 또한 Redis에 저장할 데이터를 직렬화해야 합니다. JS 객체를 데이터 소스로 전달할 수는 없습니다. 먼저 문자열로 변환해야 합니다. 데이터가 일반적인 경우 convertToJSONString라는 일반 함수를 구현했습니다. , 이를 위해.

이제 첫 번째 Lua 스크립트를 추가해 보겠습니다.

추가 작업.lua

--[[
key 1 -> [prefix]:name:jobs
key 2 -> [prefix]:name:waiting
arg 1 -> job id
arg 2 -> job data
]]
 
 
local jobId = ARGV[1]
local payload = ARGV[2]
 
if redis.call("hexists", KEYS[1], jobId) == 1 then return nil end
redis.call("hset", KEYS[1], jobId, payload)
redis.call("lpush", KEYS[2], jobId)
 
return jobId

다음과 같이 Redis 인스턴스를 사용하여 이러한 호출을 개별적으로 수행할 수 있습니다.

  • redis.hexists(jobId)
  • redis.hset(jobId,payload)
  • redis.lpush(jobId,payload)

그러나 이 접근 방식을 사용하면 세 번의 개별 호출이 발생합니다. Redis 서버로의 왕복을 최소화하기 위해 전체 프로세스를 단일 호출로 통합하는 것을 목표로 합니다.

save()를 추가해 보겠습니다. 방법

 private createQueueKey(key: string) {
 return formatMessageQueueKey(this.config.queueName, key);
 }
 
 async save(): Promise<string | null> {
 const addJobToQueueScript = await Bun.file("./src/lua-scripts/add-job.lua").text();
 const resJobId = (await this.config.redis.eval(
 addJobToQueueScript,
 2,
 this.createQueueKey("jobs"),
 this.createQueueKey("waiting"),
 this.id,
 convertToJSONString(this.data, this.status)
 )) as string | null;
 
 if (resJobId) {
 this.id = resJobId;
 return resJobId;
 }
 return null;
 }

코드는 간단하지만 좀 더 자세히 설명하겠습니다. Lua 스크립트를 생성한 후 redis.eval를 사용하여 호출합니다. , Lua 스크립트를 실행하는 데 필요합니다. redis.eval의 매개변수 다음과 같습니다:

  • 첫 번째 매개변수에는 스크립트가 필요합니다.
  • 두 번째 매개변수는 인수 개수를 지정합니다.
  • 세 번째와 네 번째 매개변수는 키용입니다.
  • 마지막으로 실제 인수를 전달합니다.

Job을 마치기 전에 앞으로는 몇 가지 메소드를 더 추가해 보겠습니다.

fromId = async <T>(jobId: string): Promise<Job<T> | null> => {
 const jobData = await this.config.redis.hget(this.createQueueKey("jobs"), jobId);
 if (jobData) {
 return this.fromData<T>(jobId, jobData);
 }
 return null;
 };
 
private fromData = <T>(jobId: string, stringifiedJobData: string): Job<T> => {
 const parsedData = JSON.parse(stringifiedJobData) as Job<T>;
 const job = new Job<T>(this.config, parsedData.data, jobId);
 job.status = parsedData.status;
 return job;
};

지금 당장은 이러한 기능이 필요하지 않을 수도 있지만 앞으로 작업 처리를 시작할 때 중요한 기능이 될 것입니다. 이 시점에서는 작업 ID (jobId)만 갖게 됩니다. , 처음부터 작업을 재구성하는 방법이 필요합니다. 이것이 바로 fromId입니다. 성취하다. Redis에서 작업 데이터를 검색하고 이를 Job 인스턴스로 변환한 후 반환하므로 대기열이 나중에 이 작업을 처리할 수 있습니다.

큐로 이동

save() 완료 부분에서는 Queue의 세부 사항을 살펴보겠습니다. 우리의 목표는 다음과 같습니다:

  • 나중에 재처리가 필요할 수 있으므로 성공 또는 실패 시 대기열에서 데이터를 유지하거나 제거하는 것을 목표로 합니다.
  • 우리는 동시성을 위한 대기열을 설계하여 여러 작업이 동시에 실행될 수 있도록 할 예정입니다.
  • 우리의 목표는 데이터 처리를 위한 콜백 함수 전달을 허용하는 것입니다. 이 함수는 더 나은 개발자 경험을 위해 작업 유형을 추론해야 합니다.
  • Job.save()에 전화를 걸 수 있도록 할 계획입니다. 대기열 내에서 Redis 인스턴스와 queueName을 전달할 수 있습니다. .
  • 마지막으로, 필요한 경우 대기열을 삭제하고 대기열에서 작업을 제거할 수 있는 기능을 보장하려고 합니다.

대기열 정의부터 시작하겠습니다

export type QueueConfig = {
 redis: Redis;
 queueName: string;
 keepOnSuccess?: boolean;
 keepOnFailure?: boolean;
};
 
export class Queue extends EventEmitter {
 config: QueueConfig;
 concurrency = 0;
 worker: any;
 running = 0;
 queued = 0;
 
 constructor(config: QueueConfig) {
 super();
 this.config = {
 redis: config.redis,
 queueName: config.queueName,
 keepOnFailure: config.keepOnFailure ?? true,
 keepOnSuccess: config.keepOnSuccess ?? true,
 };
 }
 
 createQueueKey(key: string) {
 return formatMessageQueueKey(this.config.queueName, key);
 }
}

Queue 클래스에는 대기열 이름, Redis 인스턴스, 데이터 유지 또는 제거 설정과 같은 외부 정보를 허용하는 구성이 포함되어 있습니다. Upstash를 특별히 선호하지만 😌 사용자가 선호하는 Redis 구현을 환영합니다. 이러한 유연성을 통해 사용자는 대기열을 기존 시스템에 쉽게 통합할 수 있습니다.

그리고 우리 클래스는 이벤트 이미터(Event Emitter)를 확장하여 어떤 일이 발생할 때 이를 알립니다.

다음은 초기화 예시입니다:

const queue = new Queue({
 redis: new Redis(process.env.UPSTASH_REDIS_URL),
 queueName: "upstash-rocks",
 keepOnFailure: true,
 keepOnSuccess: true,
});

추가

async add<T>(payload: T) {
 return new Job<T>(this.config, payload).save();
 }

우리 작업은 상위 대기열의 구성 세부 정보와 Redis에 저장될 데이터인 페이로드를 사용합니다. 그런 다음 저장하면 됩니다.

이제 그렇게 할 수 있습니다:

const queue = new Queue({
 redis: new Redis(process.env.UPSTASH_REDIS_URL!),
 queueName: "mytest-queue",
 keepOnFailure: true,
 keepOnSuccess: true,
});
 
const payload = {
 upstash: "best-redis-ever",
};
 
await queue.add(payload);

이제 이를 처리할 방법이 필요합니다.

처리 중

이는 대기열 구현에서 가장 어려운 부분입니다. 사용자는 동시 프로세스 수를 지정하고 작업 유형을 추론하는 작업을 처리하는 콜백 함수인 작업자를 제공해야 합니다. 또한 현재 실행 중이거나 대기열에 있는 작업 수를 추적하여 대기열에서 다음 작업을 안전하게 선택할 수 있는 메커니즘이 필요합니다.

 async process<TJobPayload>(
 worker: (job: TJobPayload) => void,
 concurrency: number
 ): Promise<void> {
 this.concurrency = concurrency;
 this.worker = worker;
 this.running = 0;
 this.queued = 1;
 
 this.jobTick();
 }

일반 TJobPayload를 허용하는 주요 목적 사용자를 위한 개발자 경험을 향상시키는 것입니다. 우리는 대기열을 사용할 때 Intellisense의 이점을 누릴 수 있도록 하는 것을 목표로 합니다. 사용자는 {hello: "world"}과 같은 데이터를 저장했다는 것을 알고 있습니다. 하지만 TypeScript는 정확한 intellisense를 제공하기 위해 도움이 필요합니다. 이것이 바로 우리가 TypeScript에 정보를 제공하고 더 나은 개발자 경험을 추론하도록 강제하는 메커니즘을 마련한 이유입니다.

jobTick()로 진행하기 전에 , 프로세스를 신중하게 고려해 보겠습니다.

  • 우리 대기열은 FIFO(선입선출) 방식으로 작동하므로 대기열 오른쪽에서 작업을 팝하는 것부터 시작해야 합니다.
  • 다음으로 이 작업에 대해 작업자 기능을 실행합니다.
  • 작업이 완료된 후 결과를 사용자에게 내보냅니다.
  • 마지막으로 jobTick()을 호출합니다. 다시 다음 작업을 처리합니다.

따라서 jobTick() 이 세 가지 중요한 부분으로 구성됩니다."

private jobTick() {
 this.getNextJob()
 .then(async (jobId) => {
 this.running += 1;
 this.queued -= 1;
 if (this.running + this.queued < this.concurrency) {
 this.queued += 1;
 setImmediate(this.jobTick);
 }
 
 if (!jobId) {
 return;
 }
 
 const jobCreatedById = await new Job(this.config, null).fromId(jobId);
 if (jobCreatedById) {
 await this.executeJob(jobCreatedById);
 } else {
 console.error(`Job not found with ID: ${jobId}`);
 }
 })
 .catch((error) => {
 console.error("Error in jobTick:", error);
 })
 .finally(() => {
 setImmediate(() => this.jobTick());
 });
 }

기능별로 설명해 보도록 하겠습니다. getNextJob()부터 시작해 보겠습니다.

 private async getNextJob() {
 try {
 const jobId = await this.config.redis.brpoplpush(
 this.createQueueKey("waiting"),
 this.createQueueKey("active"),
 0
 );
 return jobId;
 } catch (error) {
 console.error("Error fetching the next job:", error);
 throw error;
 }
 }
 

우리는 단순히 Redis를 호출하고 있지만 전략적 접근 방식을 사용하여 lpush와 결합된 차단 호출을 사용합니다. 왕복 횟수를 최소화하기 위해. 차단 호출을 사용하는 것은 의도적인 것입니다. 우리는 다른 작업자가 동일한 작업을 동시에 처리하는 것을 방지하여 경쟁 조건을 방지하고 싶습니다. 또한 작업을 '대기' 상태에서 '활성' 상태로 전환하여 프로세스의 다음 단계를 효과적으로 준비합니다.

this.getNextJob().then(async (jobId) => {
 this.running += 1;
 this.queued -= 1;
 if (this.running + this.queued < this.concurrency) {
 this.queued += 1;
 setImmediate(this.jobTick);
 }
 
 if (!jobId) {
 return;
 }
 
 const jobCreatedById = await new Job(this.config, null).fromId(jobId);
 if (jobCreatedById) {
 await this.executeJob(jobCreatedById);
 } else {
 console.error(`Job not found with ID: ${jobId}`);
 }
});

이제 jobId가 생겼습니다. , 실행 중인 작업 수를 1개 늘리고 대기 중인 작업 수를 1개 줄입니다. 또한 동시성 제한을 준수하면서 가능한 한 많은 새 작업을 시작하려고 노력합니다.

if (this.running + this.queued < this.concurrency) {
 this.queued += 1;
 setImmediate(this.jobTick);
}

모든 것이 순조롭게 진행되면 Job 구성을 진행합니다. fromId 사용 .ID로 작업을 성공적으로 재구성한 후에는 작업자 기능을 사용하여 작업 실행으로 넘어갑니다.

executeJob로 넘어가겠습니다.

private async executeJob<TJobPayload>(jobCreatedById: Job<TJobPayload>) {
 let hasError = false;
 try {
 await this.worker(jobCreatedById.data);
 this.running -= 1;
 this.queued += 1;
 } catch (error) {
 hasError = true;
 } finally {
 const [jobStatus, job] = await this.finishJob<TJobPayload>(jobCreatedById, hasError);
 this.emit(jobStatus, job.id);
 return;
 }
 }

이제 Job 데이터가 있으므로 이 데이터를 worker에 전달합니다. . 성공적으로 실행되면 대기 중인 작업 수를 1만큼 늘리고 실행 중인 작업 수를 1만큼 줄입니다. 이 단계는 매우 중요합니다. 올바르게 처리하지 않으면 새 작업을 동시에 시작하는 능력에 영향을 미칠 수 있습니다. 작업자 실행 중 오류가 발생하는 경우 간단히 hasError를 전환하면 됩니다. 깃발. 마지막으로 finishJob을 호출합니다. 우리 jobCreatedById와 함께 그리고 hasError jobId으로 상태를 플래그 지정하고 내보냅니다. .

참고:이제 사용자는 이와 같이 내보낸 업데이트를 들을 수 있습니다.

queue.on("succeeded", (jobId) => console.log("Succeeded jobId", jobId));

finishJob로 넘어가겠습니다.

private async finishJob<TJobPayload>(
 job: Job<TJobPayload>,
 hasFailed?: boolean
 ): Promise<[JobStatuses, Job<TJobPayload>]> {
 const multi = this.config.redis.multi();
 
 multi.lrem(this.createQueueKey("active"), 0, job.id);
 
 if (hasFailed) {
 if (this.config.keepOnFailure) {
 multi.hset(this.createQueueKey("jobs"), job.id, convertToJSONString(job.data, job.status));
 multi.sadd(this.createQueueKey("failed"), job.id);
 } else {
 multi.hdel(this.createQueueKey("jobs"), job.id);
 }
 job.status = "failed";
 } else {
 if (this.config.keepOnSuccess) {
 multi.hset(this.createQueueKey("jobs"), job.id, convertToJSONString(job.data, job.status));
 multi.sadd(this.createQueueKey("succeeded"), job.id);
 } else {
 multi.hdel(this.createQueueKey("jobs"), job.id);
 }
 job.status = "succeeded";
 }
 
 await multi.exec();
 return [job.status, job];
 }

여기서 중요한 측면은 multi()를 사용하는 것입니다. 우리의 목표는 항상 왕복 여행을 최소화하는 것이기 때문입니다. multi을 사용하여 , Redis는 exec()을 호출할 때까지 실행을 연기합니다. .사용자가 keepOnFailure를 설정한 경우 및 keepOnSuccess 데이터를 보존하기 위해 두 세트를 생성합니다. 하나는 작업 데이터가 있고 다른 하나는 이 데이터에 액세스하기 위한 작업 ID 목록이 포함되어 있습니다. 이 접근 방식은 성공 및 실패 시나리오 모두에 적용됩니다. 당연히 그에 따라 작업 상태를 조정합니다. 마지막으로 exec를 사용하여 multi 명령을 실행합니다. 이벤트 발생 목적으로 작업 상태와 작업 자체를 반환합니다.

마지막으로 남은 두 가지 방법이 있는데, 이 방법은 우리에게 이미 익숙한 개념을 활용하므로 자세히 설명하지 않겠습니다.

 async removeJob(jobId: string) {
 const addJobToQueueScript = await Bun.file("./src/lua-scripts/remove-job.lua").text();
 return await this.config.redis.eval(
 addJobToQueueScript,
 5,
 this.createQueueKey("succeeded"),
 this.createQueueKey("failed"),
 this.createQueueKey("waiting"),
 this.createQueueKey("active"),
 this.createQueueKey("jobs"),
 jobId
 );
 }
 
 async destroy() {
 const args = ["id", "jobs", "waiting", "active", "succeeded", "failed"].map((key) =>
 this.createQueueKey(key)
 );
 const res = await this.config.redis.del(...args);
 return res;
 }

제거-job.lua

--[[
key 1 -> [prefix]:test:succeeded
key 2 -> [prefix]:test:failed
key 3 -> [prefix]:test:waiting
key 4 -> [prefix]:test:active
key 5 -> [prefix]:test:jobs
arg 1 -> jobId
]]
 
local jobId = ARGV[1]
 
if (redis.call("sismember", KEYS[1], jobId) + redis.call("sismember", KEYS[2], jobId)) == 0 then
 redis.call("lrem", KEYS[3], 0, jobId)
 redis.call("lrem", KEYS[4], 0, jobId)
end
 
redis.call("srem", KEYS[1], jobId)
redis.call("srem", KEYS[2], jobId)
redis.call("hdel", KEYS[5], jobId)
 

destroy() 전체 대기열을 완전히 지우는 것과 다른 하나는 대기열에서 특정 작업을 삭제하는 것입니다.

모든 것이 실제로 작동하는 모습을 봅시다

import { sleep } from "bun";
import Redis from "ioredis";
 
import { Queue } from "./queue";
 
type Payload = {
 id: number;
 data: string;
};
 
const queue = new Queue({
 redis: new Redis(process.env.UPSTASH_REDIS_URL),
 queueName: "mytest-queue",
});
 
async function main() {
 await generateQueueItems(queue, 20);
 console.log("Sleep starting for 5 sec");
 await sleep(5000);
 
 queue.on("succeeded", (jobId) => console.log("Succeeded jobId", jobId));
 await queue.process<Payload>((job) => {
 console.log("Processing job:", job.data);
 sleep(1000);
 }, 3);
}
 
main();
 
async function generateQueueItems(queue: Queue, itemCount: number) {
 for (let i = 0; i < itemCount; i++) {
 const payload = {
 id: i,
 data: `dummy-data-${i}`,
 // Add more properties as needed for your testing
 };
 const jobId = await queue.add(payload);
 console.log(`Added item ${i} with jobId: ${jobId}`);
 }
}

보너스 챌린지

  • Redis 액세스 및 작업자 프로세스 모두에 대해 지수 백오프를 사용하여 재시도 논리를 구현합니다.
  • '최소 한 번' 보장 메커니즘을 개발합니다.
  • 더 나은 성능을 위해 서비스 워커에서 워커를 실행해 보세요
  • 예약된 작업 추가

마무리

무언가를 배우는 가장 좋은 방법은 그것을 구축하는 것이며, 더 나은 접근 방식은 Upstash Redis를 사용하는 것입니다. 계속 흔들어 보세요.

🔗 프로젝트 Github 주소