Backend FastAPI#
Usemos FastAPI para construir backend en Python
Configuremos el ambiente#
requirements.txt
fastapi[all]
uvicorn[standard]
!pip install -r requirements.txt
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
!uvicorn main:app --reload
Métodos Get - Post - Put - Delete#
# @title Cargar Datos
users = [
{
"name": "John",
"lastname": "Doe",
"language": "English",
"email": "john.doe@example.com",
"location_id": 1,
"password": "password123",
"skill_id": 101
},
{
"name": "Alice",
"lastname": "Smith",
"language": "Spanish",
"email": "alice.smith@example.com",
"location_id": 2,
"password": "alicepwd456",
"skill_id": 102
},
{
"name": "Bob",
"lastname": "Johnson",
"language": "French",
"email": "bob.johnson@example.com",
"location_id": 3,
"password": "bobbie789",
"skill_id": 103
},
{
"name": "Eva",
"lastname": "Brown",
"language": "German",
"email": "eva.brown@example.com",
"location_id": 4,
"password": "evapass123",
"skill_id": 104
},
{
"name": "Michael",
"lastname": "Davis",
"language": "Italian",
"email": "michael.davis@example.com",
"location_id": 5,
"password": "mdp7890",
"skill_id": 105
},
{
"name": "Sophia",
"lastname": "Garcia",
"language": "Portuguese",
"email": "sophia.garcia@example.com",
"location_id": 6,
"password": "sophiapwd12",
"skill_id": 106
},
{
"name": "Daniel",
"lastname": "Rodriguez",
"language": "Chinese",
"email": "daniel.rodriguez@example.com",
"location_id": 7,
"password": "dan1234",
"skill_id": 107
},
{
"name": "Olivia",
"lastname": "Martinez",
"language": "Japanese",
"email": "olivia.martinez@example.com",
"location_id": 8,
"password": "oliviapass567",
"skill_id": 108
},
{
"name": "William",
"lastname": "Lopez",
"language": "Korean",
"email": "william.lopez@example.com",
"location_id": 9,
"password": "willpwd90",
"skill_id": 109
},
{
"name": "Ava",
"lastname": "Lee",
"language": "Russian",
"email": "ava.lee@example.com",
"location_id": 10,
"password": "avapass321",
"skill_id": 110
}
]
Documentación:#
localhost:8000/docs
@app.get("/")
async def root():
return {"message": "Hello World"}
from pydantic import BaseModel
class User(BaseModel):
user_id = int
name = str
lastname = str
language = str
email = str
location_id = int
password = str
skill_id = int
Probemos la API#
@app.get("/items/{item_id}")
async def read_item(item_id: int):
try:
item = users[item_id]
return User(**item)
except:
print ({"Error": "item_id"})
@app.post("/items/")
async def create_item(item: User):
users.append(item) # Agregar el nuevo registro a la base de datos (en este caso, una lista)
return item
@app.put("/items/{item_id}")
async def update_item(item_id: int, item: User):
if item_id < 0 or item_id >= len(users):
return {"error": "El id del recurso no existe"}
# Actualizar el recurso existente por su id
users[item_id] = item.dict()
return {"message": "Recurso actualizado correctamente"}
# Endpoint DELETE para eliminar un recurso por su id
@app.delete("/items/{item_id}")
async def delete_item(item_id: int):
index_to_remove = None
# Buscar el índice del recurso por su id
for i, item in enumerate(users):
if item["id"] == item_id:
index_to_remove = i
break
if index_to_remove is not None:
# Eliminar el recurso de la lista por su índice
deleted_item = users.pop(index_to_remove)
return {"message": f"Recurso con id {item_id} eliminado correctamente"}
else:
return {"error": f"El recurso con id {item_id} no existe"}
Hablemos de esquemas#
from pydantic import BaseModel
class User(BaseModel):
user_id : int
name : str
lastname : str
language : str
email : str
location_id : int
password : str
skill_id : int
Hablemos de ORM’s#
sqlalchemy
from sqlalchemy import create_engine
engine = create_engine("sqlite:///test.db", echo=True)
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
session = sessionmaker(bind=engine)
Base = declarative_base()
Hablemos de modelos (ORM’s)#
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Float
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Float
class User(Base):
__tablename__ = "users"
user_id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50))
lastname = Column(String(50))
language = Column(String)
email = Column(String)
location_id = Column(Integer)
password = Column(String)
skill_id = Column(Integer)
Base.metadata.create_all(engine)
@app.post("/users")
async def create_user(user: User):
session.add(user)
session.commit()
return user
@app.put("/items/{item_id}")
async def update_item(item_id: str, item: Item):
return {"item_name": item.name, "item_id": item_id}
@app.get("/users")
async def get_users():
users = session.query(User).all()
return users
@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = session.query(User).filter(User.user_id == user_id).first()
return user