작성자:스티븐 산우(Stephen Sanwo)
작동하는 풀 스택 애플리케이션을 구축하려면 고려해야 할 움직이는 부분이 너무 많습니다. 그리고 앱의 성공을 위해서는 많은 결정을 내려야 합니다.
예를 들어, 어떤 언어를 사용하고 어떤 플랫폼에 배포할 예정인가요? 서버에 컨테이너화된 소프트웨어를 배포할 예정입니까, 아니면 서버리스 기능을 사용하여 백엔드를 처리할 예정입니까? 인증이나 결제 등 애플리케이션의 복잡한 부분을 처리하기 위해 타사 API를 사용할 계획인가요? 데이터를 어디에 저장하나요?
이 모든 것 외에도 사용자 인터페이스, 애플리케이션의 디자인 및 유용성 등에 대해서도 생각해야 합니다.
이것이 바로 복잡한 대규모 애플리케이션에는 앱 구축을 위해 협력하는 다기능 개발팀이 필요한 이유입니다.
풀 스택 애플리케이션을 개발하는 방법을 배우는 가장 좋은 방법 중 하나는 엔드투엔드 개발 프로세스를 포괄하는 프로젝트를 구축하는 것입니다. 아키텍처 설계, API 서비스 개발, 사용자 인터페이스 개발, 최종적으로 애플리케이션 배포 과정을 거치게 됩니다.
따라서 이 튜토리얼에서는 AI 챗봇을 구축하는 과정을 안내하여 이러한 개념을 심도 있게 배울 수 있습니다.
우리가 다룰 주제는 다음과 같습니다:
- Python, FastAPI 및 WebSocket을 사용하여 API를 구축하는 방법
- Redis를 사용하여 실시간 시스템을 구축하는 방법
- React를 사용하여 채팅 사용자 인터페이스를 구축하는 방법
중요 사항: 이는 기본적인 Python 및 JavaScript 지식이 필요한 중급 풀 스택 소프트웨어 개발 프로젝트입니다.
전체 애플리케이션을 코딩하고 싶지 않은 경우를 대비해 중요한 단계를 쉽게 선택할 수 있도록 프로젝트를 여러 섹션으로 세심하게 나누었습니다.
여기 My Github에서 전체 저장소를 다운로드할 수 있습니다.
목차
섹션 1
- 애플리케이션 아키텍처
- 개발 환경 설정 방법
섹션 2
- Python, FastAPI 및 WebSocket을 사용하여 채팅 서버를 구축하는 방법
- Python 환경 설정 방법
- FastAPI 서버 설정
- API에 경로를 추가하는 방법
- UUID를 사용하여 채팅 세션 토큰을 생성하는 방법
- Postman으로 API를 테스트하는 방법
- 웹소켓 및 연결 관리자
- FastAPI의 종속성 주입
섹션 3
- Redis를 사용하여 실시간 시스템을 구축하는 방법
- Redis 및 분산 메시징 대기열
- Redis 클라이언트를 사용하여 Python에서 Redis 클러스터에 연결하는 방법
- Redis 스트림으로 작업하는 방법
- 채팅 데이터를 모델링하는 방법
- Redis JSON으로 작업하는 방법
- 토큰 종속성을 업데이트하는 방법
섹션 4
- AI 모델을 사용하여 챗봇에 지능을 추가하는 방법
- Huggingface를 시작하는 방법
- 언어 모델과 상호작용하는 방법
- AI 모델의 단기 기억을 시뮬레이션하는 방법
- 메시지 대기열에서 스트림 소비자 및 실시간 데이터 가져오기
- AI 응답으로 채팅 클라이언트를 업데이트하는 방법
- 새로고침 토큰
- Postman에서 여러 클라이언트와의 채팅을 테스트하는 방법
애플리케이션 아키텍처
솔루션 아키텍처를 스케치하면 애플리케이션, 사용하려는 도구, 구성 요소가 서로 통신하는 방법에 대한 높은 수준의 개요를 얻을 수 있습니다.
draw.io를 사용하여 아래에 간단한 아키텍처를 작성했습니다.
풀스택 챗봇 아키텍처
아키텍처의 다양한 부분을 더 자세히 살펴보겠습니다.
클라이언트/사용자 인터페이스
우리는 React 버전 18을 사용하여 사용자 인터페이스를 구축할 것입니다. Chat UI는 WebSocket을 통해 백엔드와 통신합니다.
GPT-J-6B 및 Huggingface 추론 API
GPT-J-6B는 60억 개의 매개변수로 훈련되었으며 일부 작업에서 OpenAI의 GPT-3과 긴밀하게 작동하는 생성 언어 모델입니다.
GPT-J-6B는 오픈 소스 모델이고 간단한 사용 사례에는 유료 토큰이 필요하지 않기 때문에 사용하기로 선택했습니다.
Huggingface는 또한 이 모델에 거의 무료로 연결할 수 있는 주문형 API를 제공합니다. GPT-J-6B 및 포옹 얼굴 추론 API에 대해 자세히 알아볼 수 있습니다.
레디스
GPT에 메시지를 보낼 때 메시지를 저장하고 응답을 쉽게 검색할 수 있는 방법이 필요합니다. Redis JSON을 사용하여 채팅 데이터를 저장하고 Redis Streams를 사용하여 Huggingface 추론 API와의 실시간 통신을 처리합니다.
Redis는 JSON과 유사한 데이터를 초고속으로 가져오고 저장할 수 있는 인메모리 키-값 저장소입니다. 이 튜토리얼에서는 테스트 목적으로 Redis Enterprise에서 제공하는 관리형 무료 Redis 스토리지를 사용합니다.
웹 소켓 및 Chat API
클라이언트와 서버 간에 실시간으로 메시지를 보내려면 소켓 연결을 열어야 합니다. 이는 HTTP 연결만으로는 클라이언트와 서버 간의 실시간 양방향 통신을 보장하기에 충분하지 않기 때문입니다.
FastAPI는 우리가 사용할 수 있는 빠르고 현대적인 Python 서버를 제공하므로 채팅 서버에 FastAPI를 사용할 것입니다. WebSocket에 대해 자세히 알아보려면 FastAPI 문서를 확인하세요.
개발 환경 설정 방법
원하는 OS를 사용하여 이 앱을 빌드할 수 있습니다. 저는 현재 MacOS와 Visual Studio Code를 사용하고 있습니다. Python과 NodeJ가 설치되어 있는지 확인하세요.
프로젝트 구조를 설정하려면 fullstack-ai-chatbot라는 폴더를 생성하세요. . 그런 다음 프로젝트 내에 client라는 두 개의 폴더를 만듭니다. 그리고 server . 서버는 백엔드용 코드를 보유하고 클라이언트는 프런트엔드용 코드를 보유합니다.
다음으로 프로젝트 디렉터리 내에서 "git init" 명령을 사용하여 프로젝트 폴더 루트 내에서 Git 저장소를 초기화합니다. 그런 다음 "touch .gitignore"를 사용하여 .gitignore 파일을 만듭니다.
git init
touch .gitignore
다음 섹션에서는 FastAPI와 Python을 사용하여 채팅 웹 서버를 구축하겠습니다.
Python, FastAPI 및 WebSocket을 사용하여 채팅 서버를 구축하는 방법
이번 섹션에서는 사용자와 통신하기 위해 FastAPI를 사용하여 채팅 서버를 구축해 보겠습니다. WebSocket을 사용하여 클라이언트와 서버 간의 양방향 통신을 보장함으로써 사용자에게 실시간으로 응답을 보낼 수 있습니다.
Python 환경 설정 방법
서버를 시작하려면 Python 환경을 설정해야 합니다. VS Code 내에서 프로젝트 폴더를 열고 터미널을 엽니다.
프로젝트 루트에서 서버 디렉터리로 cd하고 python3.8 -m venv env을 실행합니다. . 그러면 가상 환경이 생성됩니다. env라는 Python 프로젝트의 경우 . 가상 환경을 활성화하려면 source env/bin/activate를 실행하세요.
다음으로 Python 환경에 몇 가지 라이브러리를 설치하세요.
pip install fastapi uuid uvicorn gunicorn WebSockets python-dotenv aioredis
다음으로 touch .env을 실행하여 환경 파일을 만듭니다. 터미널에서. .env 내에서 앱 변수와 비밀 변수를 정의하겠습니다. 파일.
앱 환경 변수를 추가하고 export APP_ENV=development와 같이 "개발"로 설정합니다. . 다음으로 FastAPI 서버로 개발 서버를 설정하겠습니다.
FastAPI 서버 설정
서버 디렉터리의 루트에 main.py이라는 새 파일을 만듭니다. 그런 다음 개발 서버에 아래 코드를 붙여넣으세요.
from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv
load_dotenv()
api = FastAPI()
@api.get("/test")
async def root():
return {"msg": "API is Online"}
if __name__ == "__main__":
if os.environ.get('APP_ENV') == "development":
uvicorn.run("main:api", host="0.0.0.0", port=3500,
workers=4, reload=True)
else:
pass
먼저 import FastAPI api로 초기화하세요. . 그럼 우리는 import load_dotenv python-dotenv에서 라이브러리를 초기화하고 .env에서 변수를 로드합니다. 파일
그런 다음 API를 테스트하기 위한 간단한 테스트 경로를 만듭니다. 테스트 경로는 API가 온라인 상태임을 알려주는 간단한 JSON 응답을 반환합니다.
마지막으로 uvicorn.run를 사용하여 개발 서버를 설정했습니다. 필요한 인수를 제공합니다. API는 포트 3500에서 실행됩니다. .
마지막으로 python main.py를 사용하여 터미널에서 서버를 실행합니다. . Application startup complete이 표시되면 터미널에서 브라우저의 URL http://localhost:3500/test로 이동하면 다음과 같은 웹페이지가 표시됩니다:
API 테스트 페이지
API에 경로를 추가하는 방법
이 섹션에서는 API에 경로를 추가합니다. src라는 새 폴더를 만듭니다. . 이는 모든 API 코드가 위치할 디렉터리입니다.
routes라는 하위 폴더를 만듭니다. , 폴더에 CD를 넣고 chat.py라는 새 파일을 만듭니다. 그런 다음 아래 코드를 추가하세요.
import os
from fastapi import APIRouter, FastAPI, WebSocket, Request
chat = APIRouter()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(request: Request):
return None
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None
# @route Websocket /chat
# @desc Socket for chatbot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket = WebSocket):
return None
우리는 세 개의 엔드포인트를 만들었습니다:
/token채팅 세션에 액세스하기 위해 사용자에게 세션 토큰을 발급합니다. 채팅 앱이 공개적으로 공개되므로 인증에 대해 걱정하지 않고 단순하게 유지하고 싶지만 여전히 각 고유 사용자 세션을 식별할 수 있는 방법이 필요합니다./refresh_token연결이 끊어진 경우 토큰이 여전히 활성 상태이고 만료되지 않은 한 사용자의 세션 기록을 가져옵니다./chat클라이언트와 서버 간에 메시지를 보내기 위해 WebSocket을 엽니다.
다음으로 채팅 경로를 기본 API에 연결합니다. 먼저 import chat from src.chat이 필요합니다. 우리 main.py 내에서 파일. 그런 다음 문자 그대로 include_router를 호출하여 라우터를 포함시킵니다. 초기화된 FastAPI의 메서드 클래스를 선택하고 채팅을 인수로 전달합니다.
api.py 업데이트 아래와 같은 코드:
from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv
from routes.chat import chat
load_dotenv()
api = FastAPI()
api.include_router(chat)
@api.get("/test")
async def root():
return {"msg": "API is Online"}
if __name__ == "__main__":
if os.environ.get('APP_ENV') == "development":
uvicorn.run("main:api", host="0.0.0.0", port=3500,
workers=4, reload=True)
else:
pass
UUID를 사용하여 채팅 세션 토큰을 생성하는 방법
사용자 토큰을 생성하려면 uuid4을 사용합니다. 채팅 엔드포인트에 대한 동적 경로를 생성합니다. 이는 공개적으로 사용 가능한 엔드포인트이므로 JWT 및 인증에 대해 자세히 설명할 필요가 없습니다.
uuid을 설치하지 않은 경우 처음에는 pip install uuid을 실행하세요. . 다음으로 chat.py에서 UUID를 가져오고 /token를 업데이트하세요. 아래 코드를 사용하여 라우팅하세요:
from fastapi import APIRouter, FastAPI, WebSocket, Request, BackgroundTasks, HTTPException
import uuid
# @route POST /token
# @desc Route generating chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
token = str(uuid.uuid4())
data = {"name": name, "token": token}
return data
위 코드에서 클라이언트는 필수인 이름을 제공합니다. 이름 필드가 비어 있지 않은지 빠르게 확인한 다음 uuid4를 사용하여 토큰을 생성합니다.
세션 데이터는 이름과 토큰에 대한 간단한 사전입니다. 궁극적으로 우리는 이 세션 데이터를 유지하고 시간 제한을 설정해야 하지만 지금은 클라이언트에 반환하기만 하면 됩니다.
Postman으로 API를 테스트하는 방법
WebSocket 엔드포인트를 테스트할 것이기 때문에 이를 허용하는 Postman과 같은 도구를 사용해야 합니다(FastAPI의 기본 Swagger 문서는 WebSocket을 지원하지 않기 때문입니다).
Postman에서 개발 환경에 대한 컬렉션을 만들고 POST 요청을 localhost:3500/token로 보냅니다. 이름을 쿼리 매개변수로 지정하고 값을 전달합니다. 아래와 같은 응답을 받아야 합니다:
토큰 생성기 우편 배달부
웹소켓 및 연결 관리자
src 루트에서 socket이라는 새 폴더를 만듭니다. connection.py라는 파일을 추가하세요. . 이 파일에서는 WebSocket에 대한 연결을 제어하는 클래스와 연결 및 연결 해제를 위한 모든 도우미 메서드를 정의합니다.
connection.py에서 아래 코드를 추가하세요:
from fastapi import WebSocket
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
ConnectionManager 클래스는 active_connections으로 초기화됩니다. 활성 연결 목록인 속성입니다.
그런 다음 비동기 connect 메소드는 WebSocket을 허용합니다. 활성 연결 목록에 추가하고 disconnect 메서드는 Websocket를 제거합니다. 활성 연결 목록에서.
마지막으로 send_personal_message 메소드는 메시지와 Websocket을 받습니다. 우리는 메시지를 비동기적으로 보내고 싶습니다.
WebSocket은 매우 광범위한 주제이므로 여기서는 표면적인 내용만 살펴보았습니다. 그러나 이는 여러 연결을 만들고 해당 연결에 대한 메시지를 비동기적으로 처리하는 데 충분해야 합니다.
FastAPI 웹소켓 및 소켓 프로그래밍에 대해 자세히 알아볼 수 있습니다.
ConnectionManager를 사용하려면 , src.routes.chat.py 내에서 가져오고 초기화하세요. , /chat을 업데이트하세요. 아래 코드를 사용한 WebSocket 경로:
from ..socket.connection import ConnectionManager
manager = ConnectionManager()
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
websocket_endpoint에서 WebSocket을 사용하는 함수를 사용하여 연결 관리자에 새 WebSocket을 추가하고 while True을 실행합니다. 루프를 사용하여 소켓이 열려 있는지 확인합니다. 소켓이 끊어지는 경우를 제외하고.
연결이 열려 있는 동안 클라이언트가 websocket.receive_test()로 보낸 모든 메시지를 받습니다. 지금은 터미널에 인쇄하세요.
그런 다음 지금은 하드 코딩된 응답을 클라이언트에 다시 보냅니다. 최종적으로 클라이언트로부터 받은 메시지는 AI 모델로 전송되고, 클라이언트로 다시 전송되는 응답은 AI 모델의 응답이 됩니다.
Postman에서는 새 WebSocket 요청을 생성하고 WebSocket 엔드포인트 localhost:3500/chat에 연결하여 이 엔드포인트를 테스트할 수 있습니다. .
연결을 클릭하면 메시지 창에 API 클라이언트가 URL에 연결되고 소켓이 열려 있음이 표시됩니다.
이를 테스트하려면 채팅 서버에 "Hello Bot" 메시지를 보내면 아래와 같이 "응답:GPT 서비스의 응답 시뮬레이션"이라는 즉각적인 테스트 응답을 받아야 합니다.
우체부 채팅 테스트
FastAPI의 종속성 주입
두 개의 서로 다른 클라이언트 세션을 구별하고 채팅 세션을 제한할 수 있도록 WebSocket 연결에 쿼리 매개변수로 전달되는 시간 제한 토큰을 사용합니다.
소켓 폴더에 utils.py라는 파일을 생성합니다. 그런 다음 아래 코드를 추가하세요:
from fastapi import WebSocket, status, Query
from typing import Optional
async def get_token(
websocket: WebSocket,
token: Optional[str] = Query(None),
):
if token is None or token == "":
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return token
get_token 함수는 WebSocket과 토큰을 받은 다음 토큰이 None 또는 null인지 확인합니다.
이 경우 함수는 정책 위반 상태를 반환하고 가능한 경우 토큰만 반환합니다. 나중에 추가적인 토큰 검증을 통해 이 기능을 확장할 예정입니다.
이 함수를 사용하려면 /chat에 삽입합니다. 경로. FastAPI는 종속성을 쉽게 주입할 수 있는 종속 클래스를 제공하므로 데코레이터를 조작할 필요가 없습니다.
/chat 업데이트 다음으로 연결하세요:
from ..socket.utils import get_token
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
이제 /chat에 연결하려고 하면 Postman에서 엔드포인트를 사용하면 403 오류가 발생합니다. 지금은 쿼리 매개변수로 토큰을 제공하고 토큰에 값을 제공하세요. 그러면 이전처럼 연결할 수 있습니다. 이제 연결에는 토큰이 필요합니다.
토큰을 사용한 Postman 채팅 테스트
여기까지 도달한 것을 축하합니다! 당신의 chat.py 이제 파일은 다음과 같아야 합니다:
import os
from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect, Request, Depends, HTTPException
import uuid
from ..socket.connection import ConnectionManager
from ..socket.utils import get_token
chat = APIRouter()
manager = ConnectionManager()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
data = {"name": name, "token": token}
return data
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None
# @route Websocket /chat
# @desc Socket for chatbot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
이 튜토리얼의 다음 부분에서는 애플리케이션의 상태를 처리하고 클라이언트와 서버 간에 데이터를 전달하는 데 중점을 둘 것입니다.
Redis로 실시간 시스템을 구축하는 방법
우리 애플리케이션은 현재 어떤 상태도 저장하지 않으며 사용자를 식별하거나 채팅 데이터를 저장하고 검색할 방법이 없습니다. 또한 채팅 세션 중에 클라이언트에 하드 코딩된 응답을 반환하고 있습니다.
튜토리얼의 이 부분에서는 다음 내용을 다룰 것입니다:
- Redis 클러스터에 연결하는 방법 Python으로 Redis 클라이언트를 설정합니다.
- Redis JSON을 사용하여 데이터를 저장하고 검색하는 방법
- Redis 스트림 설정 방법 웹 서버와 작업자 환경 간의 메시지 대기열로
Redis 및 분산 메시징 대기열
Redis는 데이터베이스, 캐시, 메시지 브로커 및 스트리밍 엔진으로 사용할 수 있는 오픈 소스 인 메모리 데이터 저장소입니다. 다양한 데이터 구조를 지원하며 실시간 기능을 갖춘 분산 애플리케이션을 위한 완벽한 솔루션입니다.
Redis 엔터프라이즈 클라우드 인프라에 대한 걱정 없이 Redis 클러스터를 무한한 규모로 배포하는 데 도움이 되는 Redis에서 제공하는 완전 관리형 클라우드 서비스입니다.
이 튜토리얼에서는 무료 Redis Enterprise Cloud 인스턴스를 사용합니다. 여기에서 무료로 Redis Cloud를 시작하고 이 튜토리얼에 따라 Redis 데이터베이스 및 Redis와 상호 작용하기 위한 GUI인 Redis Insight를 설정할 수 있습니다.
Redis 데이터베이스를 설정한 후 프로젝트 루트(서버 폴더 외부)에 worker이라는 새 폴더를 만듭니다. .
클라이언트가 WebSocket에 메시지를 보낼 때 웹 서버가 타사 서비스에 대한 요청을 처리할 필요가 없도록 작업자 환경을 웹 서버에서 격리하겠습니다. 또한 다른 사용자를 위해 리소스를 확보할 수 있습니다.
추론 API와의 백그라운드 통신은 Redis를 통해 이 작업자 서비스에 의해 처리됩니다.
연결된 모든 클라이언트의 요청은 메시지 대기열(생산자)에 추가되고, 작업자는 메시지를 소비하고 요청을 추론 API로 보내고 응답을 응답 대기열에 추가합니다.
API가 응답을 받으면 이를 클라이언트로 다시 보냅니다.
생산자와 소비자 사이의 이동 중에 클라이언트는 여러 메시지를 보낼 수 있으며 이러한 메시지는 대기열에 추가되어 순서대로 응답됩니다.
이상적으로는 완전히 다른 서버, 자체 환경에서 이 작업자를 실행하도록 할 수 있지만 지금은 로컬 시스템에 자체 Python 환경을 생성하겠습니다.
직원이 왜 필요한가요? 궁금하실 겁니다. 웹 서버가 타사 서비스에 대한 요청도 생성하는 시나리오를 상상해 보세요. 이는 소켓 연결 중에 타사 서비스의 응답을 기다리는 동안 API에서 응답을 얻을 때까지 서버가 차단되고 리소스가 묶여 있음을 의미합니다.
임의의 수면 time.sleep(10)을 생성하여 이를 시도해 볼 수 있습니다. 하드 코딩된 응답을 보내고 새 메시지를 보내기 전에. 그런 다음 새 우편 배달부 세션에서 다른 토큰으로 연결해 보십시오.
무작위 절전 시간이 초과될 때까지 채팅 세션이 연결되지 않는다는 것을 알 수 있습니다.
보다 생산 중심적인 서버 설정에서 비동기 기술과 작업자 풀을 사용할 수 있지만, 동시 사용자 수가 증가함에 따라 그것만으로는 충분하지 않습니다.
궁극적으로 우리는 Redis를 사용하여 채팅 API와 타사 API 간의 통신을 중개함으로써 웹 서버 리소스를 묶는 것을 피하고 싶습니다.
다음으로 새 터미널을 열고 작업자 폴더로 cd한 후 1부에서 수행한 것과 유사한 새 Python 가상 환경을 만들고 활성화합니다.
다음으로, 다음 종속성을 설치하십시오:
pip install aiohttp aioredis python-dotenv
Redis 클라이언트를 사용하여 Python에서 Redis 클러스터에 연결하는 방법
aioredis 클라이언트를 사용하여 Redis 데이터베이스에 연결하겠습니다. 또한 요청 라이브러리를 사용하여 Huggingface 추론 API에 요청을 보냅니다.
두 개의 파일 .env 만들기 및 main.py . 그런 다음 src이라는 폴더를 만듭니다. . 또한 redis이라는 폴더를 생성하세요. config.py이라는 새 파일을 추가하세요. .
.env에서 파일에 다음 코드를 추가하고 Redis 클러스터에 제공된 자격 증명으로 필드를 업데이트했는지 확인하세요.
export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
config.py에서 아래 Redis 클래스를 추가하세요:
import os
from dotenv import load_dotenv
import aioredis
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
Redis 객체를 생성하고 환경 변수에서 필수 매개변수를 초기화합니다. 그런 다음 비동기 메서드 create_connection를 만듭니다. Redis 연결을 생성하고 aioredis에서 얻은 연결 풀을 반환합니다. 메소드 from_url .
다음으로, 아래 코드를 실행하여 main.py에서 Redis 연결을 테스트합니다. 그러면 새로운 Redis 연결 풀이 생성되고, 단순 키 "key"가 설정되며, 여기에 문자열 "value"가 할당됩니다.
from src.redis.config import Redis
import asyncio
async def main():
redis = Redis()
redis = await redis.create_connection()
print(redis)
await redis.set("key", "value")
if __name__ == "__main__":
asyncio.run(main())
이제 Redis Insight를 엽니다(튜토리얼을 따라 다운로드하여 설치한 경우). 다음과 같은 내용이 표시됩니다:
Redis 인사이트 테스트
Redis 스트림으로 작업하는 방법
이제 작업자 환경이 설정되었으므로 웹 서버에 생산자를 만들고 작업자에 소비자를 만들 수 있습니다.
먼저 서버에서 Redis 클래스를 다시 생성해 보겠습니다. server.src에서 redis라는 폴더를 생성하세요 두 개의 파일 config.py을 추가합니다. 및 producer.py .
config.py에서 , 작업자 환경에서 했던 것처럼 아래 코드를 추가하세요:
import os
from dotenv import load_dotenv
import aioredis
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
.env 파일에 Redis 자격 증명도 추가합니다:
export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
마지막으로 server.src.redis.producer.py에서 다음 코드를 추가하세요:
from .config import Redis
class Producer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def add_to_stream(self, data: dict, stream_channel):
try:
msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
print(f"Message id {msg_id} added to {stream_channel} stream")
return msg_id
except Exception as e:
print(f"Error sending msg to stream => {e}")
Redis 클라이언트로 초기화되는 생산자 클래스를 만들었습니다. 우리는 이 클라이언트를 사용하여 add_to_stream를 사용하여 스트림에 데이터를 추가합니다. 데이터와 Redis 채널 이름을 가져오는 메서드입니다.
스트림 채널에 데이터를 추가하기 위한 Redis 명령은 xadd입니다. aioredis에는 상위 수준 기능과 하위 수준 기능이 모두 있습니다.
다음으로 새로 생성된 생산자를 실행하려면 chat.py을 업데이트하세요. 및 WebSocket /chat 아래와 같은 끝점. 업데이트된 채널 이름 message_channel을 확인하세요. .
from ..redis.producer import Producer
from ..redis.config import Redis
chat = APIRouter()
manager = ConnectionManager()
redis = Redis()
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
try:
while True:
data = await websocket.receive_text()
print(data)
stream_data = {}
stream_data[token] = data
await producer.add_to_stream(stream_data, "message_channel")
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
다음으로 Postman에서 연결을 만들고 Hello라는 메시지를 원하는 만큼 보냅니다. . 아래와 같이 스트림 메시지가 터미널에 인쇄되어야 합니다:
터미널 채널 메시지 테스트
Redis Insight에는 새로운 mesage_channel가 표시됩니다. 클라이언트에서 전송된 메시지로 채워지고 타임스탬프가 표시된 대기열이 생성됩니다. 타임스탬프가 표시된 이 대기열은 메시지 순서를 유지하는 데 중요합니다.
Redis 인사이트 채널
채팅 데이터를 모델링하는 방법
다음으로 채팅 메시지에 대한 모델을 생성하겠습니다. WebSocket을 통해 텍스트 데이터를 전송하지만 채팅 데이터에는 텍스트보다 더 많은 정보가 포함되어야 한다는 점을 기억하세요. 채팅이 전송된 시간을 타임스탬프하고, 각 메시지에 대한 ID를 생성하고, 채팅 세션에 대한 데이터를 수집한 다음 이 데이터를 JSON 형식으로 저장해야 합니다.
WebSocket은 상태를 저장하지 않기 때문에 연결이 끊어져도 채팅 기록이 손실되지 않도록 이 JSON 데이터를 Redis에 저장할 수 있습니다.
server.src에서 schema라는 새 폴더를 생성하세요 . 그런 다음 chat.py라는 파일을 만듭니다. server.src.schema에서 다음 코드를 추가하세요:
from datetime import datetime
from pydantic import BaseModel
from typing import List, Optional
import uuid
class Message(BaseModel):
id = uuid.uuid4()
msg: str
timestamp = str(datetime.now())
class Chat(BaseModel):
token: str
messages: List[Message]
name: str
session_start = str(datetime.now())
우리는 Pydantic의 BaseModel를 사용하고 있습니다. 채팅 데이터를 모델링하는 클래스입니다. Chat 클래스는 단일 Chat 세션에 대한 데이터를 보유합니다. datetime.now()를 사용하여 토큰, 사용자 이름, 채팅 세션 시작 시간에 대해 자동으로 생성된 타임스탬프를 저장합니다. .
이 채팅 세션 내에서 주고받은 메시지는 Message로 저장됩니다. uuid4를 사용하여 즉시 채팅 ID를 생성하는 클래스 . 이 Message를 초기화할 때 제공해야 하는 유일한 데이터입니다. 클래스는 메시지 텍스트입니다.
Redis JSON으로 작업하는 방법
Redis JSON의 채팅 기록 저장 기능을 사용하려면 Redis 연구소에서 제공하는 rejson을 설치해야 합니다.
터미널에서 server으로 cd하세요. pip install rejson로 rejson을 설치하세요. . 그런 다음 Redis을 업데이트하세요. server.src.redis.config.py의 수업 create_rejson_connection을 포함하려면 방법:
import os
from dotenv import load_dotenv
import aioredis
from rejson import Client
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
self.REDIS_HOST = os.environ['REDIS_HOST']
self.REDIS_PORT = os.environ['REDIS_PORT']
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
def create_rejson_connection(self):
self.redisJson = Client(host=self.REDIS_HOST,
port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)
return self.redisJson
create_rejson_connection을 추가합니다 rejson Client를 사용하여 Redis에 연결하는 방법 . 이는 aioredis에서는 사용할 수 없는 Redis에서 JSON 데이터를 생성하고 조작하는 방법을 제공합니다.
다음으로 server.src.routes.chat.py에서 /token을 업데이트할 수 있습니다 새 Chat을 생성하는 엔드포인트 다음과 같이 인스턴스화하고 Redis JSON에 세션 데이터를 저장합니다.
@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
# Create new chat session
json_client = redis.create_rejson_connection()
chat_session = Chat(
token=token,
messages=[],
name=name
)
# Store chat session in redis JSON with the token as key
json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
# Set a timeout for redis data
redis_client = await redis.create_connection()
await redis_client.expire(str(token), 3600)
return chat_session.dict()
참고:이것은 데모 앱이기 때문에 Redis에 채팅 데이터를 너무 오랫동안 저장하고 싶지 않습니다. 그래서 aioredis 클라이언트를 사용하여 토큰에 60분 시간 초과를 추가했습니다(rejson은 시간 초과를 구현하지 않습니다). 즉, 60분 후에는 채팅 세션 데이터가 손실됩니다.
이는 사용자를 인증하지 않고 정의된 기간이 지나면 채팅 데이터를 덤프하려고 하기 때문에 필요합니다. 이 단계는 선택사항이므로 포함할 필요가 없습니다.
다음으로 Postman에서 새 토큰을 생성하기 위해 POST 요청을 보내면 아래와 같은 구조화된 응답을 받게 됩니다. Redis Insight를 확인하여 토큰과 함께 JSON 키로 저장된 채팅 데이터와 값으로 데이터를 확인할 수도 있습니다.
토큰 생성기 업데이트됨
토큰 종속성을 업데이트하는 방법
이제 토큰이 생성되고 저장되었으므로 get_token를 업데이트할 좋은 기회입니다. /chat의 종속성 웹소켓. 채팅 세션을 시작하기 전에 유효한 토큰을 확인하기 위해 이 작업을 수행합니다.
server.src.socket.utils.py에서 get_token 업데이트 Redis 인스턴스에 토큰이 존재하는지 확인하는 함수입니다. 그렇다면 토큰을 반환합니다. 이는 소켓 연결이 유효하다는 것을 의미합니다. 존재하지 않으면 연결을 닫습니다.
/token에 의해 생성된 토큰 60분 후에는 더 이상 존재하지 않습니다. 따라서 채팅을 시작하는 동안 오류 응답이 생성되면 새 토큰을 생성하도록 사용자를 리디렉션하는 몇 가지 간단한 로직을 프런트엔드에 적용할 수 있습니다.
from ..redis.config import Redis
async def get_token(
websocket: WebSocket,
token: Optional[str] = Query(None),
):
if token is None or token == "":
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
redis_client = await redis.create_connection()
isexists = await redis_client.exists(token)
if isexists == 1:
return token
else:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Session not authenticated or expired token")
종속성을 테스트하려면 우리가 사용한 임의의 토큰을 사용하여 채팅 세션에 연결하면 403 오류가 발생합니다. (Redis Insight에서는 토큰을 수동으로 삭제해야 한다는 점에 유의하세요.)
이제 /token에 게시물 요청을 보낼 때 생성된 토큰을 복사하세요. 엔드포인트를 생성하거나 새 요청을 생성하여 /chat에 필요한 토큰 쿼리 매개변수에 값으로 붙여넣습니다. 웹소켓. 그런 다음 연결하십시오. 성공적으로 연결되어야 합니다.
토큰을 사용한 채팅 세션
이 모든 것을 종합하면 chat.py는 아래와 같아야 합니다.
import os
from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect, Request, Depends
import uuid
from ..socket.connection import ConnectionManager
from ..socket.utils import get_token
import time
from ..redis.producer import Producer
from ..redis.config import Redis
from ..schema.chat import Chat
from rejson import Path
chat = APIRouter()
manager = ConnectionManager()
redis = Redis()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
# Create nee chat session
json_client = redis.create_rejson_connection()
chat_session = Chat(
token=token,
messages=[],
name=name
)
print(chat_session.dict())
# Store chat session in redis JSON with the token as key
json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
# Set a timeout for redis data
redis_client = await redis.create_connection()
await redis_client.expire(str(token), 3600)
return chat_session.dict()
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None
# @route Websocket /chat
# @desc Socket for chat bot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
json_client = redis.create_rejson_connection()
try:
while True:
data = await websocket.receive_text()
stream_data = {}
stream_data[token] = data
await producer.add_to_stream(stream_data, "message_channel")
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
여기까지 도달하길 잘했어요! 다음 섹션에서는 AI 모델과 통신하고 클라이언트, 서버, 작업자 및 외부 API 간의 데이터 전송을 처리하는 데 중점을 둘 것입니다.
AI 모델로 챗봇에 지능을 추가하는 방법
이 섹션에서는 변환기 모델과 통신하고, 사용자의 프롬프트를 대화 형식으로 API에 보내고, 채팅 애플리케이션에 대한 응답을 수신 및 변환하는 래퍼를 구축하는 데 중점을 둘 것입니다.
Huggingface를 시작하는 방법
우리는 Hugginface에서 어떤 언어 모델도 구축하거나 배포하지 않을 것입니다. 대신 Huggingface의 가속 추론 API를 사용하여 사전 훈련된 모델에 연결하는 데 중점을 둘 것입니다.
우리가 사용할 모델은 EleutherAI에서 제공하는 GPT-J-6B 모델입니다. 60억 개의 매개변수로 학습된 생성 언어 모델입니다.
Huggingface는 이 모델에 거의 무료로 연결할 수 있는 주문형 제한 API를 제공합니다.
Huggingface를 시작하려면 무료 계정을 만드세요. 설정에서 새 액세스 토큰을 생성하세요. 최대 30,000개의 토큰에 대해 Huggingface는 무료로 추론 API에 대한 액세스를 제공합니다.
여기에서 API 사용량을 모니터링할 수 있습니다. 이 토큰을 안전하게 보관하고 공개적으로 노출하지 않도록 하세요.
참고:우리는 무료 계정을 사용하고 있으므로 HTTP 연결을 사용하여 API와 통신합니다. 그러나 PRO Huggingface 계정은 WebSocket을 통한 스트리밍(병렬 처리 및 일괄 작업 참조)을 지원합니다.
이는 모델과 채팅 애플리케이션 사이의 응답 시간을 크게 향상시키는 데 도움이 될 수 있으며, 후속 기사에서 이 방법을 다루기를 바랍니다.
언어 모델과 상호작용하는 방법
먼저 작업자 디렉터리 내의 .env 파일에 Huggingface 연결 자격 증명을 추가합니다.
export HUGGINFACE_INFERENCE_TOKEN=<HUGGINGFACE ACCESS TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B
다음으로 worker.src에서 model라는 폴더를 생성하세요 그런 다음 gptj.py 파일을 추가하세요. . 그런 다음 아래에 GPT 클래스를 추가하세요.
import os
from dotenv import load_dotenv
import requests
import json
load_dotenv()
class GPT:
def __init__(self):
self.url = os.environ.get('MODEL_URL')
self.headers = {
"Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
self.payload = {
"inputs": "",
"parameters": {
"return_full_text": False,
"use_cache": True,
"max_new_tokens": 25
}
}
def query(self, input: str) -> list:
self.payload["inputs"] = input
data = json.dumps(self.payload)
response = requests.request(
"POST", self.url, headers=self.headers, data=data)
print(json.loads(response.content.decode("utf-8")))
return json.loads(response.content.decode("utf-8"))
if __name__ == "__main__":
GPT().query("Will artificial intelligence help humanity conquer the universe?")
GPT 클래스는 Huggingface 모델 url으로 초기화됩니다. , 인증 header , 사전 정의된 payload . 그러나 페이로드 입력은 query에서 제공하는 동적 필드입니다. Huggingface 엔드포인트에 요청을 보내기 전에 업데이트됩니다.
마지막으로 GPT 클래스의 인스턴스에서 쿼리 메서드를 직접 실행하여 이를 테스트합니다. 터미널에서 python src/model/gptj.py를 실행하세요. , 그러면 다음과 같은 응답을 받게 됩니다(귀하의 응답은 확실히 다음과 다를 것이라는 점을 명심하세요):
[{'generated_text': ' (AI) could solve all the problems on this planet? I am of the opinion that in the short term artificial intelligence is much better than human beings, but in the long and distant future human beings will surpass artificial intelligence.\n\nIn the distant'}]
다음으로 입력 형식을 변경하여 모델과의 상호 작용을 더욱 대화적으로 만들기 위해 입력에 약간의 조정을 추가합니다.
GPT 업데이트 다음과 같은 수업:
class GPT:
def __init__(self):
self.url = os.environ.get('MODEL_URL')
self.headers = {
"Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
self.payload = {
"inputs": "",
"parameters": {
"return_full_text": False,
"use_cache": False,
"max_new_tokens": 25
}
}
def query(self, input: str) -> list:
self.payload["inputs"] = f"Human: {input} Bot:"
data = json.dumps(self.payload)
response = requests.request(
"POST", self.url, headers=self.headers, data=data)
data = json.loads(response.content.decode("utf-8"))
text = data[0]['generated_text']
res = str(text.split("Human:")[0]).strip("\n").strip()
return res
if __name__ == "__main__":
GPT().query("Will artificial intelligence help humanity conquer the universe?")
문자열 리터럴 f"Human: {input} Bot:"로 입력을 업데이트했습니다. . 사람의 입력이 문자열에 배치되고 Bot이 응답을 제공합니다. 이 입력 형식은 GPT-J6B를 대화형 모델로 전환합니다. 눈에 띄는 다른 변경 사항은 다음과 같습니다.
- use_cache:입력이 동일할 때 모델이 새 응답을 생성하도록 하려면 이를 False로 설정할 수 있습니다. 사용자가 동일한 메시지로 봇에 계속 스팸을 보내는 경우 무료 토큰이 소진되는 것을 방지하기 위해 프로덕션에서는 이를 True로 두는 것이 좋습니다. 캐시를 사용해도 모델에서 새로운 응답이 실제로 로드되지는 않습니다.
- return_full_text:입력을 반환할 필요가 없으므로 False입니다. 이미 반환되어 있습니다. 응답을 받으면 응답에서 "Bot:"과 선행/후행 공백을 제거하고 응답 텍스트만 반환합니다.
AI 모델의 단기 기억을 시뮬레이션하는 방법
우리가 모델에 보내는 모든 새로운 입력에 대해 모델이 대화 기록을 기억할 방법이 없습니다. 이는 대화에서 맥락을 유지하려는 경우 중요합니다.
하지만 모델에 보내는 토큰 수가 증가하면 처리 비용이 더 많이 들고 응답 시간도 길어진다는 점을 기억하세요.
따라서 단기 기록을 검색하여 모델로 보내는 방법을 찾아야 합니다. 또한 최적의 위치를 파악해야 합니다. 즉, 검색하여 모델에 전송하려는 과거 데이터의 양은 얼마나 됩니까?
채팅 기록을 처리하려면 JSON 데이터베이스로 대체해야 합니다. token를 사용하겠습니다. 마지막 채팅 데이터를 가져온 다음 응답을 받으면 해당 응답을 JSON 데이터베이스에 추가하세요.
worker.src.redis.config.py 업데이트 create_rejson_connection를 포함하려면 방법. 또한 .env 파일을 인증 데이터로 업데이트하고 rejson이 설치되었는지 확인하세요.
당신의 worker.src.redis.config.py 다음과 같아야 합니다:
import os
from dotenv import load_dotenv
import aioredis
from rejson import Client
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
self.REDIS_HOST = os.environ['REDIS_HOST']
self.REDIS_PORT = os.environ['REDIS_PORT']
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
def create_rejson_connection(self):
self.redisJson = Client(host=self.REDIS_HOST,
port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)
return self.redisJson
.env 파일은 다음과 같아야 합니다:
export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
export HUGGINFACE_INFERENCE_TOKEN=<HUGGINGFACE ACCESS TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B
다음으로 worker.src.redis에서 cache.py라는 새 파일을 생성하세요 그리고 아래 코드를 추가하세요:
from .config import Redis
from rejson import Path
class Cache:
def __init__(self, json_client):
self.json_client = json_client
async def get_chat_history(self, token: str):
data = self.json_client.jsonget(
str(token), Path.rootPath())
return data
캐시는 rejson 클라이언트와 get_chat_history 메소드로 초기화됩니다. Redis에서 해당 토큰에 대한 채팅 기록을 가져오기 위해 토큰을 가져옵니다. rejson에서 Path 객체를 가져와야 합니다.
다음으로 worker.main.py를 업데이트하세요. 아래 코드를 사용하세요:
from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
redis = Redis()
async def main():
json_client = redis.create_rejson_connection()
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
if __name__ == "__main__":
asyncio.run(main())
Postman의 이전 테스트에서 생성된 샘플 토큰을 하드 코딩했습니다. 생성된 토큰이 없으면 /token으로 새 요청을 보내세요. 토큰을 복사한 다음 python main.py를 실행하세요. 터미널에서. 터미널에 다음과 같은 데이터가 표시되어야 합니다:
{'token': '18196e23-763b-4808-ae84-064348a0daff', 'messages': [], 'name': 'Stephen', 'session_start': '2022-07-16 13:20:01.092109'}
다음으로 add_message_to_cache을 추가해야 합니다. Cache에 대한 메소드 특정 토큰에 대해 Redis에 메시지를 추가하는 클래스입니다.
async def add_message_to_cache(self, token: str, message_data: dict):
self.json_client.jsonarrappend(
str(token), Path('.messages'), message_data)
jsonarrappend rejson에서 제공하는 메서드는 새 메시지를 메시지 배열에 추가합니다.
메시지 배열에 액세스하려면 .messages을 제공해야 합니다. Path에 대한 인수로. 메시지 데이터에 다른/중첩 구조가 있는 경우 새 데이터를 추가하려는 배열의 경로를 제공하기만 하면 됩니다.
이 방법을 테스트하려면 main.py 파일의 기본 함수를 아래 코드로 업데이트하세요.
async def main():
json_client = redis.create_rejson_connection()
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", message_data={
"id": "1",
"msg": "Hello",
"timestamp": "2022-07-16 13:20:01.092109"
})
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
하드코딩된 메시지를 캐시로 보내고 캐시에서 채팅 기록을 가져옵니다. python main.py를 실행하면 작업자 디렉토리 내의 터미널에서 메시지 배열에 추가된 메시지와 함께 다음과 같은 내용이 터미널에 인쇄되어야 합니다.
{'token': '18196e23-763b-4808-ae84-064348a0daff', 'messages': [{'id': '1', 'msg': 'Hello', 'timestamp': '2022-07-16 13:20:01.092109'}], 'name': 'Stephen', 'session_start': '2022-07-16 13:20:01.092109'}
마지막으로 메시지 데이터를 GPT 모델로 보내기 위해 기본 함수를 업데이트하고 입력을 마지막 4개로 업데이트해야 합니다. 클라이언트와 모델 간에 전송되는 메시지입니다.
먼저 add_message_to_cache를 업데이트하겠습니다. 메시지가 사람인지 봇인지 알려주는 새로운 인수 "source"를 사용하여 함수를 실행하세요. 그런 다음 이 인수를 사용하여 데이터를 캐시에 저장하기 전에 "Human:" 또는 "Bot:" 태그를 데이터에 추가할 수 있습니다.
add_message_to_cache 업데이트 Cache 클래스의 메서드는 다음과 같습니다:
async def add_message_to_cache(self, token: str, source: str, message_data: dict):
if source == "human":
message_data['msg'] = "Human: " + (message_data['msg'])
elif source == "bot":
message_data['msg'] = "Bot: " + (message_data['msg'])
self.json_client.jsonarrappend(
str(token), Path('.messages'), message_data)
Then update the main function in main.py in the worker directory, and run python main.py to see the new results in the Redis database.
async def main():
json_client = redis.create_rejson_connection()
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="human", message_data={
"id": "1",
"msg": "Hello",
"timestamp": "2022-07-16 13:20:01.092109"
})
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
Next, we need to update the main function to add new messages to the cache, read the previous 4 messages from the cache, and then make an API call to the model using the query method. It'll have a payload consisting of a composite string of the last 4 messages.
You can always tune the number of messages in the history you want to extract, but I think 4 messages is a pretty good number for a demo.
In worker.src , create a new folder schema. Then create a new file named chat.py and paste our message schema in chat.py like so:
from datetime import datetime
from pydantic import BaseModel
from typing import List, Optional
import uuid
class Message(BaseModel):
id = str(uuid.uuid4())
msg: str
timestamp = str(datetime.now())
Next, update the main.py file like below:
async def main():
json_client = redis.create_rejson_connection()
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="human", message_data={
"id": "3",
"msg": "I would like to go to the moon to, would you take me?",
"timestamp": "2022-07-16 13:20:01.092109"
})
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
message_data = data['messages'][-4:]
input = ["" + i['msg'] for i in message_data]
input = " ".join(input)
res = GPT().query(input=input)
msg = Message(
msg=res
)
print(msg)
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="bot", message_data=msg.dict())
In the code above, we add new message data to the cache. This message will ultimately come from the message queue. Next we get the chat history from the cache, which will now include the most recent data we added.
Note that we are using the same hard-coded token to add to the cache and get from the cache, temporarily just to test this out.
Next, we trim off the cache data and extract only the last 4 items. Then we consolidate the input data by extracting the msg in a list and join it to an empty string.
Finally, we create a new Message instance for the bot response and add the response to the cache specifying the source as "bot"
Next, run python main.py a couple of times, changing the human message and id as desired with each run. You should have a full conversation input and output with the model.
Open Redis Insight and you should have something similar to the below:
Conversational Chat
Stream Consumer and Real-time Data Pull from the Message Queue
Next, we want to create a consumer and update our worker.main.py to connect to the message queue. We want it to pull the token data in real-time, as we are currently hard-coding the tokens and message inputs.
In worker.src.redis create a new file named stream.py . Add a StreamConsumer class with the code below:
class StreamConsumer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def consume_stream(self, count: int, block: int, stream_channel):
response = await self.redis_client.xread(
streams={stream_channel: '0-0'}, count=count, block=block)
return response
async def delete_message(self, stream_channel, message_id):
await self.redis_client.xdel(stream_channel, message_id)
The StreamConsumer class is initialized with a Redis client. The consume_stream method pulls a new message from the queue from the message channel, using the xread method provided by aioredis.
Next, update the worker.main.py file with a while loop to keep the connection to the message channel alive, like so:
from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.config import Redis
from src.redis.stream import StreamConsumer
import os
from src.schema.chat import Message
redis = Redis()
async def main():
json_client = redis.create_rejson_connection()
redis_client = await redis.create_connection()
consumer = StreamConsumer(redis_client)
cache = Cache(json_client)
print("Stream consumer started")
print("Stream waiting for new messages")
while True:
response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
if response:
for stream, messages in response:
# Get message from stream, and extract token, message data and message id
for message in messages:
message_id = message[0]
token = [k.decode('utf-8')
for k, v in message[1].items()][0]
message = [v.decode('utf-8')
for k, v in message[1].items()][0]
print(token)
# Create a new message instance and add to cache, specifying the source as human
msg = Message(msg=message)
await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())
# Get chat history from cache
data = await cache.get_chat_history(token=token)
# Clean message input and send to query
message_data = data['messages'][-4:]
input = ["" + i['msg'] for i in message_data]
input = " ".join(input)
res = GPT().query(input=input)
msg = Message(
msg=res
)
print(msg)
await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())
# Delete messaage from queue after it has been processed
await consumer.delete_message(stream_channel="message_channel", message_id=message_id)
if __name__ == "__main__":
asyncio.run(main())
This is quite the update, so let's take it step by step:
We use a while True loop so that the worker can be online listening to messages from the queue.
Next, we await new messages from the message_channel by calling our consume_stream method. If we have a message in the queue, we extract the message_id, token, and message. Then we create a new instance of the Message class, add the message to the cache, and then get the last 4 messages. We set it as input to the GPT model query method.
Once we get a response, we then add the response to the cache using the add_message_to_cache method, then delete the message from the queue.
How to Update the Chat Client with the AI Response
So far, we are sending a chat message from the client to the message_channel (which is received by the worker that queries the AI model) to get a response.
Next, we need to send this response to the client. As long as the socket connection is still open, the client should be able to receive the response.
If the connection is closed, the client can always get a response from the chat history using the refresh_token 끝점.
In worker.src.redis create a new file named producer.py , and add a Producer class similar to what we had on the chat web server:
class Producer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def add_to_stream(self, data: dict, stream_channel) -> bool:
msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
print(f"Message id {msg_id} added to {stream_channel} stream")
return msg_id
Next, in the main.py file, update the main function to initialize the producer, create a stream data, and send the response to a response_channel using the add_to_stream method:
from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.config import Redis
from src.redis.stream import StreamConsumer
import os
from src.schema.chat import Message
from src.redis.producer import Producer
redis = Redis()
async def main():
json_client = redis.create_rejson_connection()
redis_client = await redis.create_connection()
consumer = StreamConsumer(redis_client)
cache = Cache(json_client)
producer = Producer(redis_client)
print("Stream consumer started")
print("Stream waiting for new messages")
while True:
response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
if response:
for stream, messages in response:
# Get message from stream, and extract token, message data and message id
for message in messages:
message_id = message[0]
token = [k.decode('utf-8')
for k, v in message[1].items()][0]
message = [v.decode('utf-8')
for k, v in message[1].items()][0]
# Create a new message instance and add to cache, specifying the source as human
msg = Message(msg=message)
await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())
# Get chat history from cache
data = await cache.get_chat_history(token=token)
# Clean message input and send to query
message_data = data['messages'][-4:]
input = ["" + i['msg'] for i in message_data]
input = " ".join(input)
res = GPT().query(input=input)
msg = Message(
msg=res
)
stream_data = {}
stream_data[str(token)] = str(msg.dict())
await producer.add_to_stream(stream_data, "response_channel")
await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())
# Delete messaage from queue after it has been processed
await consumer.delete_message(stream_channel="message_channel", message_id=message_id)
if __name__ == "__main__":
asyncio.run(main())
Next, we need to let the client know when we receive responses from the worker in the /chat socket endpoint. We do this by listening to the response stream. We do not need to include a while loop here as the socket will be listening as long as the connection is open.
Note that we also need to check which client the response is for by adding logic to check if the token connected is equal to the token in the response. Then we delete the message in the response queue once it's been read.
In server.src.redis create a new file named stream.py and add our StreamConsumer class like this:
from .config import Redis
class StreamConsumer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def consume_stream(self, count: int, block: int, stream_channel):
response = await self.redis_client.xread(
streams={stream_channel: '0-0'}, count=count, block=block)
return response
async def delete_message(self, stream_channel, message_id):
await self.redis_client.xdel(stream_channel, message_id)
Next, update the /chat socket endpoint like so:
from ..redis.stream import StreamConsumer
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
json_client = redis.create_rejson_connection()
consumer = StreamConsumer(redis_client)
try:
while True:
data = await websocket.receive_text()
stream_data = {}
stream_data[str(token)] = str(data)
await producer.add_to_stream(stream_data, "message_channel")
response = await consumer.consume_stream(stream_channel="response_channel", block=0)
print(response)
for stream, messages in response:
for message in messages:
response_token = [k.decode('utf-8')
for k, v in message[1].items()][0]
if token == response_token:
response_message = [v.decode('utf-8')
for k, v in message[1].items()][0]
print(message[0].decode('utf-8'))
print(token)
print(response_token)
await manager.send_personal_message(response_message, websocket)
await consumer.delete_message(stream_channel="response_channel", message_id=message[0].decode('utf-8'))
except WebSocketDisconnect:
manager.disconnect(websocket)
Refresh Token
Finally, we need to update the /refresh_token endpoint to get the chat history from the Redis database using our Cache class.
In server.src.redis , add a cache.py file and add the code below:
from rejson import Path
class Cache:
def __init__(self, json_client):
self.json_client = json_client
async def get_chat_history(self, token: str):
data = self.json_client.jsonget(
str(token), Path.rootPath())
return data
Next, in server.src.routes.chat.py import the Cache class and update the /token endpoint to the below:
from ..redis.cache import Cache
@chat.get("/refresh_token")
async def refresh_token(request: Request, token: str):
json_client = redis.create_rejson_connection()
cache = Cache(json_client)
data = await cache.get_chat_history(token)
if data == None:
raise HTTPException(
status_code=400, detail="Session expired or does not exist")
else:
return data
Now, when we send a GET request to the /refresh_token endpoint with any token, the endpoint will fetch the data from the Redis database.
If the token has not timed out, the data will be sent to the user. Or it'll send a 400 response if the token is not found.
How to Test the Chat with multiple Clients in Postman
Finally, we will test the chat system by creating multiple chat sessions in Postman, connecting multiple clients in Postman, and chatting with the bot on the clients.
Lastly, we will try to get the chat history for the clients and hopefully get a proper response.
Recap
Let's have a quick recap as to what we have achieved with our chat system. The chat client creates a token for each chat session with a client. This token is used to identify each client, and each message sent by clients connected to or web server is queued in a Redis channel (message_chanel), identified by the token.
Our worker environment reads from this channel. It does not have any clue who the client is (except that it's a unique token) and uses the message in the queue to send requests to the Huggingface inference API.
When it gets a response, the response is added to a response channel and the chat history is updated. The client listening to the response_channel immediately sends the response to the client once it receives a response with its token.
If the socket is still open, this response is sent. If the socket is closed, we are certain that the response is preserved because the response is added to the chat history. The client can get the history, even if a page refresh happens or in the event of a lost connection.
Congratulations on getting this far! You have been able to build a working chat system.
In follow-up articles, I will focus on building a chat user interface for the client, creating unit and functional tests, fine-tuning our worker environment for faster response time with WebSockets and asynchronous requests, and ultimately deploying the chat application on AWS.
This Article is part of a series on building full-stack intelligent chatbots with tools like Python, React, Huggingface, Redis, and so on. You can follow the full series on my blog:blog.stephensanwo.dev - AI ChatBot Series**
You can download the full repository on My Github Repository
I wrote this tutorial in collaboration with Redis. Need help getting started with Redis? Try the following resources:
- Try Redis Cloud free of charge
- Watch this video on the benefits of Redis Cloud over other Redis providers
- Redis Developer Hub - tools, guides, and tutorials about Redis
- RedisInsight Desktop GUI
무료로 코딩을 배우세요. freeCodeCamp의 오픈 소스 커리큘럼은 40,000명 이상의 사람들이 개발자로 취업하는 데 도움을 주었습니다. 시작하세요