Compartir en Twitter
Go to Homepage

CONSTRUYE UN CHATBOT IA CON REDIS Y PYTHON

October 22, 2025

Introducción al Desarrollo de un Chatbot IA

Desarrollar una aplicación full-stack moderna implica tomar decisiones críticas sobre tecnologías, arquitectura y herramientas. Un chatbot con inteligencia artificial (IA) es un proyecto ideal para aprender conceptos avanzados de desarrollo, desde la creación de APIs hasta la gestión de datos en tiempo real. Este tutorial te guiará paso a paso en la construcción de un chatbot con IA utilizando Python, FastAPI, WebSockets, Redis y la API de inferencia de Hugging Face. Cubriremos la arquitectura, la implementación del servidor, la integración con Redis para sistemas en tiempo real y la conexión con un modelo de lenguaje como GPT-J-6B. Este proyecto está diseñado para desarrolladores con conocimientos básicos de Python y JavaScript, ofreciendo un enfoque práctico para dominar el desarrollo full-stack.

Arquitectura de la Aplicación

La arquitectura de un chatbot full-stack debe ser clara y escalable. Este proyecto utiliza una arquitectura cliente-servidor con comunicación en tiempo real. A continuación, se detalla cada componente:

Interfaz de Usuario (Cliente)

La interfaz de usuario se construye con React 18, una biblioteca popular para interfaces dinámicas. La comunicación con el backend se realiza mediante WebSockets para garantizar interacciones en tiempo real, esenciales para un chatbot interactivo.

Modelo de Lenguaje GPT-J-6B

Utilizamos GPT-J-6B, un modelo de lenguaje de código abierto desarrollado por EleutherAI con 6 mil millones de parámetros. Este modelo, comparable a GPT-3 en ciertas tareas, es accesible a través de la API de inferencia de Hugging Face, que permite integrarlo sin costos iniciales para casos de uso simples. La API simplifica la conexión con el modelo, eliminando la necesidad de entrenar o desplegar modelos localmente.

Redis para Almacenamiento y Comunicación

Redis es un almacén de datos en memoria que permite un acceso rápido a datos tipo JSON. En este proyecto, usamos Redis JSON para almacenar el historial de chats y Redis Streams para gestionar la comunicación en tiempo real entre el cliente, el servidor y la API de Hugging Face. Utilizaremos una instancia gratuita de Redis Enterprise Cloud para pruebas.

Servidor con FastAPI y WebSockets

El backend se implementa con FastAPI, un framework moderno de Python que soporta WebSockets para comunicación bidireccional. Esto es crucial para enviar y recibir mensajes en tiempo real, superando las limitaciones de las conexiones HTTP tradicionales.

Configuración del Entorno de Desarrollo

Para comenzar, necesitas un entorno con Python 3.8+ y Node.js instalados. Este tutorial usa macOS, pero es adaptable a otros sistemas operativos. Sigue estos pasos para configurar el proyecto:

  1. Crea una carpeta llamada fullstack-ai-chatbot:

    mkdir fullstack-ai-chatbot
    cd fullstack-ai-chatbot
    
  2. Crea dos subcarpetas, client y server, para el frontend y backend:

    mkdir client server
    
  3. Inicializa un repositorio Git y crea un archivo .gitignore:

    git init
    touch .gitignore
    
  4. En el archivo .gitignore, agrega:

    env/
    __pycache__/
    node_modules/
    

Construcción del Servidor de Chat con FastAPI

El servidor se construye con FastAPI para manejar la lógica del backend y WebSockets para la comunicación en tiempo real. A continuación, se detalla cómo configurarlo.

Configuración del Entorno de Python

  1. En la carpeta server, crea un entorno virtual:

    cd server
    python3.8 -m venv env
    source env/bin/activate
    
  2. Instala las dependencias necesarias:

    pip install fastapi uuid uvicorn gunicorn websockets python-dotenv aioredis
    
  3. Crea un archivo .env para las variables de entorno:

    touch .env
    
  4. En el archivo .env, agrega:

    export APP_ENV=development
    

Configuración del Servidor FastAPI

Crea un archivo main.py en la carpeta server con el siguiente código para iniciar un servidor de desarrollo:

from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv

load_dotenv()

api = FastAPI()

@api.get("/test")
async def root():
    return {"msg": "API está en línea"}

if __name__ == "__main__":
    if os.environ.get('APP_ENV') == "development":
        uvicorn.run("main:api", host="0.0.0.0", port=3500, workers=4, reload=True)

Este código inicializa un servidor FastAPI que escucha en el puerto 3500. El endpoint /test devuelve un mensaje JSON para verificar que el servidor está funcionando. Ejecuta el servidor con:

python main.py

Navega a http://localhost:3500/test en tu navegador para confirmar que la API está activa.

Creación de Rutas para la API

Crea una carpeta src/routes dentro de server y un archivo chat.py con las rutas iniciales:

from fastapi import APIRouter, FastAPI, WebSocket, Request

chat = APIRouter()

@chat.post("/token")
async def token_generator(request: Request):
    return None

@chat.post("/refresh_token")
async def refresh_token(request: Request):
    return None

@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
    return None

Conecta estas rutas al servidor principal actualizando main.py:

from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv
from src.routes.chat import chat

load_dotenv()

api = FastAPI()
api.include_router(chat)

@api.get("/test")
async def root():
    return {"msg": "API está en línea"}

if __name__ == "__main__":
    if os.environ.get('APP_ENV') == "development":
        uvicorn.run("main:api", host="0.0.0.0", port=3500, workers=4, reload=True)

Generación de Tokens de Sesión

Para identificar sesiones de usuario, usamos UUID para generar tokens únicos. Actualiza chat.py para incluir la lógica del endpoint /token:

from fastapi import APIRouter, FastAPI, WebSocket, Request, HTTPException
import uuid

chat = APIRouter()

@chat.post("/token")
async def token_generator(name: str, request: Request):
    if not name:
        raise HTTPException(status_code=400, detail={"loc": "name", "msg": "Ingrese un nombre válido"})
    token = str(uuid.uuid4())
    data = {"name": name, "token": token}
    return data

Este código genera un token único para cada usuario basado en su nombre. Prueba este endpoint en Postman enviando una solicitud POST a http://localhost:3500/token?name=TuNombre.

Gestión de Conexiones WebSocket

Crea una carpeta src/socket y un archivo connection.py para manejar conexiones WebSocket:

from fastapi import WebSocket
from typing import List

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

Actualiza el endpoint /chat en chat.py para usar esta clase:

from ..socket.connection import ConnectionManager

manager = ConnectionManager()

@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            print(data)
            await manager.send_personal_message("Respuesta: Simulando respuesta del servicio GPT", websocket)
    except:
        manager.disconnect(websocket)

Prueba el WebSocket en Postman conectándote a ws://localhost:3500/chat y enviando un mensaje como “Hola Bot”. Deberías recibir una respuesta simulada.

Inyección de Dependencias

Para validar tokens en las conexiones WebSocket, crea un archivo src/socket/utils.py:

from fastapi import WebSocket, status, Query
from typing import Optional

async def get_token(websocket: WebSocket, token: Optional[str] = Query(None)):
    if not token:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
    return token

Actualiza el endpoint /chat para incluir esta dependencia:

from ..socket.utils import get_token
from fastapi import Depends

@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            print(data)
            await manager.send_personal_message("Respuesta: Simulando respuesta del servicio GPT", websocket)
    except:
        manager.disconnect(websocket)

Implementación de Sistemas en Tiempo Real con Redis

Redis permite manejar datos en tiempo real y almacenar el historial de chats. A continuación, configuraremos Redis para nuestro chatbot.

Configuración de Redis

  1. Regístrate en Redis Enterprise Cloud para obtener una instancia gratuita.

  2. Crea una carpeta worker en el directorio raíz del proyecto:

    mkdir worker
    cd worker
    python3.8 -m venv env
    source env/bin/activate
    pip install aiohttp aioredis python-dotenv
    
  3. Crea un archivo .env en worker con las credenciales de Redis:

    export REDIS_URL=<TU_URL_REDIS>
    export REDIS_USER=<TU_USUARIO_REDIS>
    export REDIS_PASSWORD=<TU_CONTRASEÑA_REDIS>
    export REDIS_HOST=<TU_HOST_REDIS>
    export REDIS_PORT=<TU_PUERTO_REDIS>
    
  4. En worker/src/redis, crea un archivo config.py:

import os
from dotenv import load_dotenv
import aioredis

load_dotenv()

class Redis:
    def __init__(self):
        self.REDIS_URL = os.environ['REDIS_URL']
        self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
        self.REDIS_USER = os.environ['REDIS_USER']
        self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"

    async def create_connection(self):
        self.connection = aioredis.from_url(self.connection_url, db=0)
        return self.connection

Uso de Redis Streams

Crea un archivo producer.py en server/src/redis para enviar mensajes a un canal de Redis:

from .config import Redis

class Producer:
    def __init__(self, redis_client):
        self.redis_client = redis_client

    async def add_to_stream(self, data: dict, stream_channel):
        msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
        print(f"Mensaje id {msg_id} añadido al canal {stream_channel}")
        return msg_id

Actualiza chat.py para usar el productor:

from ..redis.producer import Producer
from ..redis.config import Redis

redis = Redis()

@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
    await manager.connect(websocket)
    redis_client = await redis.create_connection()
    producer = Producer(redis_client)
    try:
        while True:
            data = await websocket.receive_text()
            stream_data = {token: data}
            await producer.add_to_stream(stream_data, "message_channel")
            await manager.send_personal_message("Respuesta: Simulando respuesta del servicio GPT", websocket)
    except:
        manager.disconnect(websocket)

Modelado de Datos de Chat

Crea un archivo server/src/schema/chat.py para definir el esquema de los mensajes:

from datetime import datetime
from pydantic import BaseModel
from typing import List
import uuid

class Message(BaseModel):
    id: str = str(uuid.uuid4())
    msg: str
    timestamp: str = str(datetime.now())

class Chat(BaseModel):
    token: str
    messages: List[Message]
    name: str
    session_start: str = str(datetime.now())

Almacenamiento con Redis JSON

Instala rejson en el entorno del servidor:

pip install rejson

Actualiza server/src/redis/config.py para incluir soporte para JSON:

import os
from dotenv import load_dotenv
import aioredis
from rejson import Client

load_dotenv()

class Redis:
    def __init__(self):
        self.REDIS_URL = os.environ['REDIS_URL']
        self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
        self.REDIS_USER = os.environ['REDIS_USER']
        self.REDIS_HOST = os.environ['REDIS_HOST']
        self.REDIS_PORT = os.environ['REDIS_PORT']
        self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"

    async def create_connection(self):
        self.connection = aioredis.from_url(self.connection_url, db=0)
        return self.connection

    def create_rejson_connection(self):
        self.redisJson = Client(host=self.REDIS_HOST, port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)
        return self.redisJson

Actualiza el endpoint /token en chat.py para almacenar sesiones en Redis:

from ..schema.chat import Chat
from rejson import Path

@chat.post("/token")
async def token_generator(name: str, request: Request):
    token = str(uuid.uuid4())
    if not name:
        raise HTTPException(status_code=400, detail={"loc": "name", "msg": "Ingrese un nombre válido"})
    json_client = redis.create_rejson_connection()
    chat_session = Chat(token=token, messages=[], name=name)
    json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
    redis_client = await redis.create_connection()
    await redis_client.expire(str(token), 3600)
    return chat_session.dict()

Integración con Modelos de IA

Para añadir inteligencia al chatbot, integramos la API de inferencia de Hugging Face con el modelo GPT-J-6B.

Configuración de Hugging Face

  1. Crea una cuenta en Hugging Face y genera un token de acceso.
  2. Agrega las credenciales al archivo .env en worker:
export HUGGINFACE_INFERENCE_TOKEN=<TU_TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B
  1. Crea un archivo worker/src/model/gptj.py:
import os
from dotenv import load_dotenv
import requests
import json

load_dotenv()

class GPT:
    def __init__(self):
        self.url = os.environ.get('MODEL_URL')
        self.headers = {"Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
        self.payload = {
            "inputs": "",
            "parameters": {"return_full_text": False, "use_cache": True, "max_new_tokens": 25}
        }

    def query(self, input: str) -> str:
        self.payload["inputs"] = f"Human: {input} Bot:"
        data = json.dumps(self.payload)
        response = requests.post(self.url, headers=self.headers, data=data)
        data = json.loads(response.content.decode("utf-8"))
        return data[0]['generated_text'].strip()

Gestión de la Memoria Conversacional

Para mantener el contexto, almacenamos el historial de mensajes en Redis. Crea un archivo worker/src/redis/cache.py:

from .config import Redis
from rejson import Path

class Cache:
    def __init__(self, json_client):
        self.json_client = json_client

    async def get_chat_history(self, token: str):
        return self.json_client.jsonget(str(token), Path.rootPath())

    async def add_message_to_cache(self, token: str, source: str, message_data: dict):
        if source == "human":
            message_data['msg'] = "Human: " + message_data['msg']
        elif source == "bot":
            message_data['msg'] = "Bot: " + message_data['msg']
        self.json_client.jsonarrappend(str(token), Path('.messages'), message_data)

Actualiza worker/src/redis/config.py para incluir soporte para JSON, similar al servidor.

Consumo de Mensajes en Tiempo Real

Crea un archivo worker/src/redis/stream.py para consumir mensajes del canal:

class StreamConsumer:
    def __init__(self, redis_client):
        self.redis_client = redis_client

    async def consume_stream(self, count: int, block: int, stream_channel):
        response = await self.redis_client.xread(streams={stream_channel: '0-0'}, count=count, block=block)
        return response

    async def delete_message(self, stream_channel, message_id):
        await self.redis_client.xdel(stream_channel, message_id)

Actualiza worker/main.py para integrar el consumidor y el modelo:

from src.redis.config import Redis
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.stream import StreamConsumer
from src.redis.producer import Producer
from src.schema.chat import Message
import asyncio

redis = Redis()

async def main():
    json_client = redis.create_rejson_connection()
    redis_client = await redis.create_connection()
    consumer = StreamConsumer(redis_client)
    cache = Cache(json_client)
    producer = Producer(redis_client)

    while True:
        response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
        if response:
            for stream, messages in response:
                for message in messages:
                    message_id = message[0]
                    token = list(message[1].keys())[0].decode('utf-8')
                    message_text = list(message[1].values())[0].decode('utf-8')
                    msg = Message(msg=message_text)
                    await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())
                    data = await cache.get_chat_history(token=token)
                    message_data = data['messages'][-4:]
                    input = " ".join([i['msg'] for i in message_data])
                    res = GPT().query(input=input)
                    msg = Message(msg=res)
                    stream_data = {token: str(msg.dict())}
                    await producer.add_to_stream(stream_data, "response_channel")
                    await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())
                    await consumer.delete_message(stream_channel="message_channel", message_id=message_id)

Respuestas al Cliente

Crea un archivo server/src/redis/stream.py similar al del worker y actualiza el endpoint /chat para escuchar respuestas:

from ..redis.stream import StreamConsumer

@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
    await manager.connect(websocket)
    redis_client = await redis.create_connection()
    producer = Producer(redis_client)
    json_client = redis.create_rejson_connection()
    consumer = StreamConsumer(redis_client)
    try:
        while True:
            data = await websocket.receive_text()
            stream_data = {token: data}
            await producer.add_to_stream(stream_data, "message_channel")
            response = await consumer.consume_stream(stream_channel="response_channel", block=0)
            for stream, messages in response:
                for message in messages:
                    response_token = list(message[1].keys())[0].decode('utf-8')
                    if token == response_token:
                        response_message = list(message[1].values())[0].decode('utf-8')
                        await manager.send_personal_message(response_message, websocket)
                    await consumer.delete_message(stream_channel="response_channel", message_id=message[0].decode('utf-8'))
    except:
        manager.disconnect(websocket)

Recuperación del Historial

Actualiza el endpoint /refresh_token en chat.py:

from ..redis.cache import Cache

@chat.get("/refresh_token")
async def refresh_token(request: Request, token: str):
    json_client = redis.create_rejson_connection()
    cache = Cache(json_client)
    data = await cache.get_chat_history(token)
    if not data:
        raise HTTPException(status_code=400, detail="Sesión expirada o no existe")
    return data

Conclusiones

Este tutorial te ha guiado en la construcción de un chatbot full-stack utilizando tecnologías modernas como FastAPI, WebSockets, Redis y la API de Hugging Face. Has aprendido a configurar un servidor, gestionar conexiones en tiempo real, almacenar datos con Redis y comunicarte con un modelo de IA. Este proyecto es un punto de partida para explorar aplicaciones más complejas, como interfaces de usuario avanzadas o despliegues en la nube. Continúa experimentando con estas herramientas para construir aplicaciones innovadoras y escalables.