CONSTRUYE UN CHATBOT IA CON REDIS Y PYTHON
Introducción al Desarrollo de un Chatbot IA
Desarrollar una aplicación full-stack moderna implica tomar decisiones críticas sobre tecnologías, arquitectura y herramientas. Un chatbot con inteligencia artificial (IA) es un proyecto ideal para aprender conceptos avanzados de desarrollo, desde la creación de APIs hasta la gestión de datos en tiempo real. Este tutorial te guiará paso a paso en la construcción de un chatbot con IA utilizando Python, FastAPI, WebSockets, Redis y la API de inferencia de Hugging Face. Cubriremos la arquitectura, la implementación del servidor, la integración con Redis para sistemas en tiempo real y la conexión con un modelo de lenguaje como GPT-J-6B. Este proyecto está diseñado para desarrolladores con conocimientos básicos de Python y JavaScript, ofreciendo un enfoque práctico para dominar el desarrollo full-stack.
Arquitectura de la Aplicación
La arquitectura de un chatbot full-stack debe ser clara y escalable. Este proyecto utiliza una arquitectura cliente-servidor con comunicación en tiempo real. A continuación, se detalla cada componente:
Interfaz de Usuario (Cliente)
La interfaz de usuario se construye con React 18, una biblioteca popular para interfaces dinámicas. La comunicación con el backend se realiza mediante WebSockets para garantizar interacciones en tiempo real, esenciales para un chatbot interactivo.
Modelo de Lenguaje GPT-J-6B
Utilizamos GPT-J-6B, un modelo de lenguaje de código abierto desarrollado por EleutherAI con 6 mil millones de parámetros. Este modelo, comparable a GPT-3 en ciertas tareas, es accesible a través de la API de inferencia de Hugging Face, que permite integrarlo sin costos iniciales para casos de uso simples. La API simplifica la conexión con el modelo, eliminando la necesidad de entrenar o desplegar modelos localmente.
Redis para Almacenamiento y Comunicación
Redis es un almacén de datos en memoria que permite un acceso rápido a datos tipo JSON. En este proyecto, usamos Redis JSON para almacenar el historial de chats y Redis Streams para gestionar la comunicación en tiempo real entre el cliente, el servidor y la API de Hugging Face. Utilizaremos una instancia gratuita de Redis Enterprise Cloud para pruebas.
Servidor con FastAPI y WebSockets
El backend se implementa con FastAPI, un framework moderno de Python que soporta WebSockets para comunicación bidireccional. Esto es crucial para enviar y recibir mensajes en tiempo real, superando las limitaciones de las conexiones HTTP tradicionales.
Configuración del Entorno de Desarrollo
Para comenzar, necesitas un entorno con Python 3.8+ y Node.js instalados. Este tutorial usa macOS, pero es adaptable a otros sistemas operativos. Sigue estos pasos para configurar el proyecto:
-
Crea una carpeta llamada
fullstack-ai-chatbot:mkdir fullstack-ai-chatbot cd fullstack-ai-chatbot -
Crea dos subcarpetas,
clientyserver, para el frontend y backend:mkdir client server -
Inicializa un repositorio Git y crea un archivo
.gitignore:git init touch .gitignore -
En el archivo
.gitignore, agrega:env/ __pycache__/ node_modules/
Construcción del Servidor de Chat con FastAPI
El servidor se construye con FastAPI para manejar la lógica del backend y WebSockets para la comunicación en tiempo real. A continuación, se detalla cómo configurarlo.
Configuración del Entorno de Python
-
En la carpeta
server, crea un entorno virtual:cd server python3.8 -m venv env source env/bin/activate -
Instala las dependencias necesarias:
pip install fastapi uuid uvicorn gunicorn websockets python-dotenv aioredis -
Crea un archivo
.envpara las variables de entorno:touch .env -
En el archivo
.env, agrega:export APP_ENV=development
Configuración del Servidor FastAPI
Crea un archivo main.py en la carpeta server con el siguiente código para iniciar un servidor de desarrollo:
from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv
load_dotenv()
api = FastAPI()
@api.get("/test")
async def root():
return {"msg": "API está en línea"}
if __name__ == "__main__":
if os.environ.get('APP_ENV') == "development":
uvicorn.run("main:api", host="0.0.0.0", port=3500, workers=4, reload=True)
Este código inicializa un servidor FastAPI que escucha en el puerto 3500. El endpoint /test devuelve un mensaje JSON para verificar que el servidor está funcionando. Ejecuta el servidor con:
python main.py
Navega a http://localhost:3500/test en tu navegador para confirmar que la API está activa.
Creación de Rutas para la API
Crea una carpeta src/routes dentro de server y un archivo chat.py con las rutas iniciales:
from fastapi import APIRouter, FastAPI, WebSocket, Request
chat = APIRouter()
@chat.post("/token")
async def token_generator(request: Request):
return None
@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
return None
Conecta estas rutas al servidor principal actualizando main.py:
from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv
from src.routes.chat import chat
load_dotenv()
api = FastAPI()
api.include_router(chat)
@api.get("/test")
async def root():
return {"msg": "API está en línea"}
if __name__ == "__main__":
if os.environ.get('APP_ENV') == "development":
uvicorn.run("main:api", host="0.0.0.0", port=3500, workers=4, reload=True)
Generación de Tokens de Sesión
Para identificar sesiones de usuario, usamos UUID para generar tokens únicos. Actualiza chat.py para incluir la lógica del endpoint /token:
from fastapi import APIRouter, FastAPI, WebSocket, Request, HTTPException
import uuid
chat = APIRouter()
@chat.post("/token")
async def token_generator(name: str, request: Request):
if not name:
raise HTTPException(status_code=400, detail={"loc": "name", "msg": "Ingrese un nombre válido"})
token = str(uuid.uuid4())
data = {"name": name, "token": token}
return data
Este código genera un token único para cada usuario basado en su nombre. Prueba este endpoint en Postman enviando una solicitud POST a http://localhost:3500/token?name=TuNombre.
Gestión de Conexiones WebSocket
Crea una carpeta src/socket y un archivo connection.py para manejar conexiones WebSocket:
from fastapi import WebSocket
from typing import List
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
Actualiza el endpoint /chat en chat.py para usar esta clase:
from ..socket.connection import ConnectionManager
manager = ConnectionManager()
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message("Respuesta: Simulando respuesta del servicio GPT", websocket)
except:
manager.disconnect(websocket)
Prueba el WebSocket en Postman conectándote a ws://localhost:3500/chat y enviando un mensaje como “Hola Bot”. Deberías recibir una respuesta simulada.
Inyección de Dependencias
Para validar tokens en las conexiones WebSocket, crea un archivo src/socket/utils.py:
from fastapi import WebSocket, status, Query
from typing import Optional
async def get_token(websocket: WebSocket, token: Optional[str] = Query(None)):
if not token:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return token
Actualiza el endpoint /chat para incluir esta dependencia:
from ..socket.utils import get_token
from fastapi import Depends
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message("Respuesta: Simulando respuesta del servicio GPT", websocket)
except:
manager.disconnect(websocket)
Implementación de Sistemas en Tiempo Real con Redis
Redis permite manejar datos en tiempo real y almacenar el historial de chats. A continuación, configuraremos Redis para nuestro chatbot.
Configuración de Redis
-
Regístrate en Redis Enterprise Cloud para obtener una instancia gratuita.
-
Crea una carpeta
workeren el directorio raíz del proyecto:mkdir worker cd worker python3.8 -m venv env source env/bin/activate pip install aiohttp aioredis python-dotenv -
Crea un archivo
.envenworkercon las credenciales de Redis:export REDIS_URL=<TU_URL_REDIS> export REDIS_USER=<TU_USUARIO_REDIS> export REDIS_PASSWORD=<TU_CONTRASEÑA_REDIS> export REDIS_HOST=<TU_HOST_REDIS> export REDIS_PORT=<TU_PUERTO_REDIS> -
En
worker/src/redis, crea un archivoconfig.py:
import os
from dotenv import load_dotenv
import aioredis
load_dotenv()
class Redis:
def __init__(self):
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
async def create_connection(self):
self.connection = aioredis.from_url(self.connection_url, db=0)
return self.connection
Uso de Redis Streams
Crea un archivo producer.py en server/src/redis para enviar mensajes a un canal de Redis:
from .config import Redis
class Producer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def add_to_stream(self, data: dict, stream_channel):
msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
print(f"Mensaje id {msg_id} añadido al canal {stream_channel}")
return msg_id
Actualiza chat.py para usar el productor:
from ..redis.producer import Producer
from ..redis.config import Redis
redis = Redis()
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
try:
while True:
data = await websocket.receive_text()
stream_data = {token: data}
await producer.add_to_stream(stream_data, "message_channel")
await manager.send_personal_message("Respuesta: Simulando respuesta del servicio GPT", websocket)
except:
manager.disconnect(websocket)
Modelado de Datos de Chat
Crea un archivo server/src/schema/chat.py para definir el esquema de los mensajes:
from datetime import datetime
from pydantic import BaseModel
from typing import List
import uuid
class Message(BaseModel):
id: str = str(uuid.uuid4())
msg: str
timestamp: str = str(datetime.now())
class Chat(BaseModel):
token: str
messages: List[Message]
name: str
session_start: str = str(datetime.now())
Almacenamiento con Redis JSON
Instala rejson en el entorno del servidor:
pip install rejson
Actualiza server/src/redis/config.py para incluir soporte para JSON:
import os
from dotenv import load_dotenv
import aioredis
from rejson import Client
load_dotenv()
class Redis:
def __init__(self):
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.REDIS_HOST = os.environ['REDIS_HOST']
self.REDIS_PORT = os.environ['REDIS_PORT']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
async def create_connection(self):
self.connection = aioredis.from_url(self.connection_url, db=0)
return self.connection
def create_rejson_connection(self):
self.redisJson = Client(host=self.REDIS_HOST, port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)
return self.redisJson
Actualiza el endpoint /token en chat.py para almacenar sesiones en Redis:
from ..schema.chat import Chat
from rejson import Path
@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())
if not name:
raise HTTPException(status_code=400, detail={"loc": "name", "msg": "Ingrese un nombre válido"})
json_client = redis.create_rejson_connection()
chat_session = Chat(token=token, messages=[], name=name)
json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
redis_client = await redis.create_connection()
await redis_client.expire(str(token), 3600)
return chat_session.dict()
Integración con Modelos de IA
Para añadir inteligencia al chatbot, integramos la API de inferencia de Hugging Face con el modelo GPT-J-6B.
Configuración de Hugging Face
- Crea una cuenta en Hugging Face y genera un token de acceso.
- Agrega las credenciales al archivo
.envenworker:
export HUGGINFACE_INFERENCE_TOKEN=<TU_TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B
- Crea un archivo
worker/src/model/gptj.py:
import os
from dotenv import load_dotenv
import requests
import json
load_dotenv()
class GPT:
def __init__(self):
self.url = os.environ.get('MODEL_URL')
self.headers = {"Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
self.payload = {
"inputs": "",
"parameters": {"return_full_text": False, "use_cache": True, "max_new_tokens": 25}
}
def query(self, input: str) -> str:
self.payload["inputs"] = f"Human: {input} Bot:"
data = json.dumps(self.payload)
response = requests.post(self.url, headers=self.headers, data=data)
data = json.loads(response.content.decode("utf-8"))
return data[0]['generated_text'].strip()
Gestión de la Memoria Conversacional
Para mantener el contexto, almacenamos el historial de mensajes en Redis. Crea un archivo worker/src/redis/cache.py:
from .config import Redis
from rejson import Path
class Cache:
def __init__(self, json_client):
self.json_client = json_client
async def get_chat_history(self, token: str):
return self.json_client.jsonget(str(token), Path.rootPath())
async def add_message_to_cache(self, token: str, source: str, message_data: dict):
if source == "human":
message_data['msg'] = "Human: " + message_data['msg']
elif source == "bot":
message_data['msg'] = "Bot: " + message_data['msg']
self.json_client.jsonarrappend(str(token), Path('.messages'), message_data)
Actualiza worker/src/redis/config.py para incluir soporte para JSON, similar al servidor.
Consumo de Mensajes en Tiempo Real
Crea un archivo worker/src/redis/stream.py para consumir mensajes del canal:
class StreamConsumer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def consume_stream(self, count: int, block: int, stream_channel):
response = await self.redis_client.xread(streams={stream_channel: '0-0'}, count=count, block=block)
return response
async def delete_message(self, stream_channel, message_id):
await self.redis_client.xdel(stream_channel, message_id)
Actualiza worker/main.py para integrar el consumidor y el modelo:
from src.redis.config import Redis
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.stream import StreamConsumer
from src.redis.producer import Producer
from src.schema.chat import Message
import asyncio
redis = Redis()
async def main():
json_client = redis.create_rejson_connection()
redis_client = await redis.create_connection()
consumer = StreamConsumer(redis_client)
cache = Cache(json_client)
producer = Producer(redis_client)
while True:
response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
if response:
for stream, messages in response:
for message in messages:
message_id = message[0]
token = list(message[1].keys())[0].decode('utf-8')
message_text = list(message[1].values())[0].decode('utf-8')
msg = Message(msg=message_text)
await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())
data = await cache.get_chat_history(token=token)
message_data = data['messages'][-4:]
input = " ".join([i['msg'] for i in message_data])
res = GPT().query(input=input)
msg = Message(msg=res)
stream_data = {token: str(msg.dict())}
await producer.add_to_stream(stream_data, "response_channel")
await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())
await consumer.delete_message(stream_channel="message_channel", message_id=message_id)
Respuestas al Cliente
Crea un archivo server/src/redis/stream.py similar al del worker y actualiza el endpoint /chat para escuchar respuestas:
from ..redis.stream import StreamConsumer
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
json_client = redis.create_rejson_connection()
consumer = StreamConsumer(redis_client)
try:
while True:
data = await websocket.receive_text()
stream_data = {token: data}
await producer.add_to_stream(stream_data, "message_channel")
response = await consumer.consume_stream(stream_channel="response_channel", block=0)
for stream, messages in response:
for message in messages:
response_token = list(message[1].keys())[0].decode('utf-8')
if token == response_token:
response_message = list(message[1].values())[0].decode('utf-8')
await manager.send_personal_message(response_message, websocket)
await consumer.delete_message(stream_channel="response_channel", message_id=message[0].decode('utf-8'))
except:
manager.disconnect(websocket)
Recuperación del Historial
Actualiza el endpoint /refresh_token en chat.py:
from ..redis.cache import Cache
@chat.get("/refresh_token")
async def refresh_token(request: Request, token: str):
json_client = redis.create_rejson_connection()
cache = Cache(json_client)
data = await cache.get_chat_history(token)
if not data:
raise HTTPException(status_code=400, detail="Sesión expirada o no existe")
return data
Conclusiones
Este tutorial te ha guiado en la construcción de un chatbot full-stack utilizando tecnologías modernas como FastAPI, WebSockets, Redis y la API de Hugging Face. Has aprendido a configurar un servidor, gestionar conexiones en tiempo real, almacenar datos con Redis y comunicarte con un modelo de IA. Este proyecto es un punto de partida para explorar aplicaciones más complejas, como interfaces de usuario avanzadas o despliegues en la nube. Continúa experimentando con estas herramientas para construir aplicaciones innovadoras y escalables.