Integración de Apache Spark y MongoDB con PySpark#
Objetivo del Notebook: Aprender a conectar Apache Spark con una base de datos MongoDB para realizar operaciones de lectura, análisis y escritura. Este es un patrón muy común en arquitecturas de Big Data, donde MongoDB se utiliza como un sistema de almacenamiento flexible y escalable (Operational Datastore) y Spark se usa para el procesamiento y análisis de datos a gran escala.  Aprenderemos a:
Configurar el entorno: Iniciar una sesión de Spark con el conector oficial de MongoDB.
Conectar con MongoDB Atlas: Usaremos una base de datos en la nube para un ejemplo realista.
Poblar MongoDB: Insertaremos datos de ejemplo directamente desde el notebook.
Leer datos: Cargar una colección de MongoDB en un DataFrame de Spark.
Analizar datos: Utilizar el poder de Spark SQL y las operaciones de DataFrame para procesar la información.
Escribir datos: Guardar los resultados de nuestro análisis de vuelta en una nueva colección en MongoDB.
1. Configuración del Entorno#
Esta es la parte más importante. Necesitamos tres cosas:
Una base de datos MongoDB: Usaremos el servicio gratuito de MongoDB Atlas.
Las librerías necesarias:
pysparkpara Spark ypymongopara interactuar con MongoDB.El Conector de Spark para MongoDB: Un paquete que le permite a Spark comunicarse con MongoDB.
1.1. Pasos para Configurar MongoDB Atlas (¡Acción Requerida!)#
Si no tienes una cuenta, sigue estos pasos (toma 5-10 minutos):
Ve a MongoDB Atlas y regístrate.
Crea un clúster gratuito (Tier M0). Puedes elegir cualquier proveedor de nube y región.
Crea un usuario de base de datos: En la sección «Database Access», crea un usuario. Guarda bien el nombre de usuario y la contraseña.
Configura el acceso de red: En la sección «Network Access», agrega tu dirección IP actual o permite el acceso desde cualquier lugar (
0.0.0.0/0- solo para este tutorial, no es seguro para producción).Obtén la cadena de conexión: Ve a «Database», haz clic en «Connect» en tu clúster, selecciona «Connect your application» y copia la cadena de conexión. Se verá algo así:
mongodb+srv://<username>:<password>@clustername.mongodb.net/.
# =================================================================
# 1.2. Instalar las dependencias
# =================================================================
!pip install pyspark pymongo -q
?25l ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0.0/1.7 MB ? eta -:--:--
━━╺━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0.1/1.7 MB 3.6 MB/s eta 0:00:01
━━━━━━━━━━━━╸━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0.6/1.7 MB 8.2 MB/s eta 0:00:01
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸ 1.7/1.7 MB 16.6 MB/s eta 0:00:01
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.7/1.7 MB 14.1 MB/s eta 0:00:00
?25h?25l ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0.0/331.1 kB ? eta -:--:--
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 331.1/331.1 kB 22.5 MB/s eta 0:00:00
?25h
# =================================================================
# 1.3. Iniciar la Sesión de Spark con el Conector de MongoDB
# =================================================================
from pyspark.sql import SparkSession
# Reemplaza esta versión si es necesario. Búscala en el repositorio de Maven.
mongo_spark_connector_version = "3.0.1"
mongo_spark_connector = f"org.mongodb.spark:mongo-spark-connector_2.12:{mongo_spark_connector_version}"
spark = SparkSession.builder\
.appName("SparkMongoDBIntegration")\
.config("spark.jars.packages", mongo_spark_connector)\
.getOrCreate()
print("SparkSession iniciada con el conector de MongoDB.")
SparkSession iniciada con el conector de MongoDB.
2. Poblando MongoDB con Datos de Ejemplo#
Para que nuestro notebook sea autocontenido, vamos a insertar algunos datos en nuestra base de datos de Atlas usando la librería pymongo. Esto también nos sirve para verificar que nuestra cadena de conexión es correcta.
import pymongo
# =================================================================
# ¡ACCIÓN REQUERIDA!
# Reemplaza la cadena de conexión con la tuya de MongoDB Atlas.
# Asegúrate de poner tu usuario y contraseña.
# =================================================================
# =================================================================
# ¡ACCIÓN REQUERIDA!
# Reemplaza la cadena de conexión con la tuya de MongoDB Atlas.
# =================================================================
MONGO_URI = ""
# Definimos el nombre de nuestra base de datos y colección
DB_NAME = "universidad"
COLLECTION_NAME = "cientificos"
# Datos de ejemplo que vamos a insertar
cientificos_data = [
{"nombre": "Albert", "apellido": "Einstein", "campo": "Física", "nacimiento": 1879},
{"nombre": "Marie", "apellido": "Curie", "campo": "Química", "nacimiento": 1867},
{"nombre": "Isaac", "apellido": "Newton", "campo": "Física", "nacimiento": 1643},
{"nombre": "Charles", "apellido": "Darwin", "campo": "Biología", "nacimiento": 1809},
{"nombre": "Rosalind", "apellido": "Franklin", "campo": "Química", "nacimiento": 1920}
]
# Conectamos a MongoDB y poblamos la colección
try:
client = pymongo.MongoClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
# Limpiamos la colección por si ya existían datos
collection.delete_many({})
# Insertamos los nuevos datos
collection.insert_many(cientificos_data)
print(f"✅ Datos insertados correctamente en la colección '{COLLECTION_NAME}'.")
client.close()
except Exception as e:
print(f"❌ Error al conectar o insertar datos: {e}")
✅ Datos insertados correctamente en la colección 'cientificos'.
3. Leer Datos de MongoDB con Spark#
Ahora que tenemos datos en MongoDB, vamos a usar Spark para leerlos y cargarlos en un DataFrame. Un DataFrame es una tabla de datos distribuida con columnas nombradas.
# Leemos los datos desde MongoDB a un DataFrame de Spark
df_cientificos = spark.read.format("mongo")\
.option("uri", MONGO_URI)\
.option("database", DB_NAME)\
.option("collection", COLLECTION_NAME)\
.load()
# Mostramos el esquema inferido por Spark
print("Esquema del DataFrame:")
df_cientificos.printSchema()
# Mostramos los datos cargados
print("\nDatos cargados desde MongoDB:")
df_cientificos.show()
Esquema del DataFrame:
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- apellido: string (nullable = true)
|-- campo: string (nullable = true)
|-- nacimiento: integer (nullable = true)
|-- nombre: string (nullable = true)
Datos cargados desde MongoDB:
+--------------------+--------+--------+----------+--------+
| _id|apellido| campo|nacimiento| nombre|
+--------------------+--------+--------+----------+--------+
|{68ccb513e7510f01...|Einstein| Física| 1879| Albert|
|{68ccb513e7510f01...| Curie| Química| 1867| Marie|
|{68ccb513e7510f01...| Newton| Física| 1643| Isaac|
|{68ccb513e7510f01...| Darwin|Biología| 1809| Charles|
|{68ccb513e7510f01...|Franklin| Química| 1920|Rosalind|
+--------------------+--------+--------+----------+--------+
4. Analizar los Datos con Spark#
Una vez que los datos están en un DataFrame de Spark, podemos usar todo su poder para el análisis. Realizaremos un análisis simple: encontrar a los científicos nacidos en el siglo XIX y agruparlos por campo.
from pyspark.sql.functions import count
# Filtramos los científicos nacidos en el siglo XIX (entre 1801 y 1900)
df_siglo_xix = df_cientificos.filter(
(df_cientificos.nacimiento > 1800) & (df_cientificos.nacimiento <= 1900)
)
print("Científicos nacidos en el siglo XIX:")
df_siglo_xix.show()
# Agrupamos por campo y contamos cuántos hay en cada uno
df_conteo_por_campo = df_siglo_xix.groupBy("campo").agg(
count("*").alias("numero_de_cientificos")
)
print("\nConteo de científicos del siglo XIX por campo:")
df_conteo_por_campo.show()
Científicos nacidos en el siglo XIX:
+--------------------+--------+--------+----------+-------+
| _id|apellido| campo|nacimiento| nombre|
+--------------------+--------+--------+----------+-------+
|{68ccb513e7510f01...|Einstein| Física| 1879| Albert|
|{68ccb513e7510f01...| Curie| Química| 1867| Marie|
|{68ccb513e7510f01...| Darwin|Biología| 1809|Charles|
+--------------------+--------+--------+----------+-------+
Conteo de científicos del siglo XIX por campo:
+--------+---------------------+
| campo|numero_de_cientificos|
+--------+---------------------+
| Física| 1|
| Química| 1|
|Biología| 1|
+--------+---------------------+
5. Escribir los Resultados de Vuelta a MongoDB#
Finalmente, guardaremos el resultado de nuestro análisis (el DataFrame df_conteo_por_campo) en una nueva colección en MongoDB.
# Definimos el nombre de la nueva colección para los resultados
RESULTS_COLLECTION_NAME = "conteo_por_campo"
# Escribimos el DataFrame de resultados en MongoDB
# El modo "overwrite" borrará la colección si ya existe y la creará de nuevo.
df_conteo_por_campo.write.format("mongo")\
.option("uri", MONGO_URI)\
.option("database", DB_NAME)\
.option("collection", RESULTS_COLLECTION_NAME)\
.mode("overwrite")\
.save()
print(f"✅ Resultados guardados en la colección '{RESULTS_COLLECTION_NAME}'.")
# Verificación final usando pymongo para confirmar que los datos se escribieron
try:
client = pymongo.MongoClient(MONGO_URI)
db = client[DB_NAME]
results_collection = db[RESULTS_COLLECTION_NAME]
print("\nVerificando los datos escritos en MongoDB:")
for doc in results_collection.find():
print(doc)
client.close()
except Exception as e:
print(f"❌ Error al verificar los datos: {e}")
# Detenemos la sesión de Spark
spark.stop()
✅ Resultados guardados en la colección 'conteo_por_campo'.
Verificando los datos escritos en MongoDB:
{'_id': ObjectId('68ccb54c98f4a667c551597b'), 'campo': 'Física', 'numero_de_cientificos': 1}
{'_id': ObjectId('68ccb54c98f4a667c551597c'), 'campo': 'Química', 'numero_de_cientificos': 1}
{'_id': ObjectId('68ccb54c98f4a667c551597d'), 'campo': 'Biología', 'numero_de_cientificos': 1}
#Ejemplo con datos reales
Vamos a reemplazar los datos de los científicos con un conjunto de datos público muy popular: Títulos de Netflix. Este dataset contiene información sobre películas y shows de TV, su tipo, país de origen, año de lanzamiento, etc. Es perfecto para realizar agregaciones y filtros realistas.
Integración de Apache Spark y MongoDB con PySpark: Caso Práctico con Datos de Netflix#
Objetivo del Notebook: Aprender a conectar Apache Spark con una base de datos MongoDB para realizar operaciones de lectura, análisis y escritura. Este es un patrón muy común en arquitecturas de Big Data, donde MongoDB se utiliza como un sistema de almacenamiento flexible y Spark se usa para el procesamiento a gran escala.
Aprenderemos a:
Configurar el entorno: Iniciar una sesión de Spark con el conector oficial de MongoDB.
Conectar con MongoDB Atlas: Usaremos una base de datos en la nube para un ejemplo realista.
Poblar MongoDB: Descargaremos un dataset real (Títulos de Netflix) y lo insertaremos en nuestra base de datos.
Leer datos: Cargar la colección de Netflix en un DataFrame de Spark.
Analizar datos: Utilizar el poder de Spark para realizar agregaciones y filtros sobre los datos.
Escribir datos: Guardar los resultados de nuestro análisis de vuelta en una nueva colección en MongoDB.
# =================================================================
# Paso 1: Instalar las dependencias
# =================================================================
!pip install pyspark pymongo pandas -q
# =================================================================
# Paso 2: Iniciar la Sesión de Spark con el Conector de MongoDB
# =================================================================
from pyspark.sql import SparkSession
# Versión del conector de Spark para MongoDB
mongo_spark_connector_version = "3.0.1"
mongo_spark_connector = f"org.mongodb.spark:mongo-spark-connector_2.12:{mongo_spark_connector_version}"
spark = SparkSession.builder\
.appName("SparkMongoDBNetflix")\
.config("spark.jars.packages", mongo_spark_connector)\
.getOrCreate()
print("✅ SparkSession iniciada con el conector de MongoDB.")
✅ SparkSession iniciada con el conector de MongoDB.
3. Poblando MongoDB con el Dataset de Netflix#
Ahora, vamos a descargar el dataset de Netflix, procesarlo con la librería Pandas y subirlo a nuestra base de datos en MongoDB Atlas.
import pymongo
import pandas as pd
import requests
from io import StringIO
# =================================================================
# ¡ACCIÓN REQUERIDA!
# Reemplaza la cadena de conexión con la tuya de MongoDB Atlas.
# Asegúrate de poner tu usuario y contraseña.
# =================================================================
MONGO_URI = ""
# Definimos el nombre de nuestra base de datos y colección
DB_NAME = "netflix"
COLLECTION_NAME = "titles"
# URL del dataset de Netflix en formato CSV
url = "https://raw.githubusercontent.com/rfordatascience/tidytuesday/master/data/2021/2021-04-20/netflix_titles.csv"
print("📥 Descargando el dataset de Netflix...")
try:
# Descargar el contenido del archivo CSV
response = requests.get(url)
response.raise_for_status() # Lanza un error si la descarga falla
# Leer el CSV en un DataFrame de Pandas
csv_data = StringIO(response.text)
df_pandas = pd.read_csv(csv_data)
# Limpieza básica: Reemplazar valores nulos (NaN) para evitar errores en MongoDB
df_pandas.fillna("", inplace=True)
# Convertir el DataFrame a una lista de diccionarios (formato JSON)
data_to_insert = df_pandas.to_dict('records')
print(f"📄 Se leyeron {len(data_to_insert)} documentos del archivo.")
# Conectar a MongoDB y poblar la colección
print("⏳ Conectando a MongoDB Atlas y poblando la colección...")
client = pymongo.MongoClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
# Limpiamos la colección por si ya existían datos
collection.delete_many({})
# Insertamos los nuevos datos
collection.insert_many(data_to_insert)
print(f"✅ ¡Éxito! {len(data_to_insert)} documentos insertados en la colección '{COLLECTION_NAME}'.")
client.close()
except Exception as e:
print(f"❌ Ocurrió un error: {e}")
📥 Descargando el dataset de Netflix...
📄 Se leyeron 7787 documentos del archivo.
⏳ Conectando a MongoDB Atlas y poblando la colección...
✅ ¡Éxito! 7787 documentos insertados en la colección 'titles'.
4. Leer Datos de MongoDB con Spark#
Con los datos ya en MongoDB, usaremos Spark para leer la colección completa y cargarla en un DataFrame distribuido.
# Leemos los datos desde la colección de Netflix
df_netflix = spark.read.format("mongo")\
.option("uri", MONGO_URI)\
.option("database", DB_NAME)\
.option("collection", COLLECTION_NAME)\
.load()
print("📝 Esquema del DataFrame de Netflix:")
df_netflix.printSchema()
print("\n🎬 Algunos datos cargados desde la colección de Netflix:")
df_netflix.show(5)
📝 Esquema del DataFrame de Netflix:
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- cast: string (nullable = true)
|-- country: string (nullable = true)
|-- date_added: string (nullable = true)
|-- description: string (nullable = true)
|-- director: string (nullable = true)
|-- duration: string (nullable = true)
|-- listed_in: string (nullable = true)
|-- rating: string (nullable = true)
|-- release_year: integer (nullable = true)
|-- show_id: string (nullable = true)
|-- title: string (nullable = true)
|-- type: string (nullable = true)
🎬 Algunos datos cargados desde la colección de Netflix:
+--------------------+--------------------+-------------+-----------------+--------------------+-----------------+---------+--------------------+------+------------+-------+-----+-------+
| _id| cast| country| date_added| description| director| duration| listed_in|rating|release_year|show_id|title| type|
+--------------------+--------------------+-------------+-----------------+--------------------+-----------------+---------+--------------------+------+------------+-------+-----+-------+
|{68ccb6974b8b31bd...|João Miguel, Bian...| Brazil| August 14, 2020|In a future where...| |4 Seasons|International TV ...| TV-MA| 2020| s1| 3%|TV Show|
|{68ccb6974b8b31bd...|Demián Bichir, Hé...| Mexico|December 23, 2016|After a devastati...|Jorge Michel Grau| 93 min|Dramas, Internati...| TV-MA| 2016| s2| 7:19| Movie|
|{68ccb6974b8b31bd...|Tedd Chan, Stella...| Singapore|December 20, 2018|When an army recr...| Gilbert Chan| 78 min|Horror Movies, In...| R| 2011| s3|23:59| Movie|
|{68ccb6974b8b31bd...|Elijah Wood, John...|United States|November 16, 2017|In a postapocalyp...| Shane Acker| 80 min|Action & Adventur...| PG-13| 2009| s4| 9| Movie|
|{68ccb6974b8b31bd...|Jim Sturgess, Kev...|United States| January 1, 2020|A brilliant group...| Robert Luketic| 123 min| Dramas| PG-13| 2008| s5| 21| Movie|
+--------------------+--------------------+-------------+-----------------+--------------------+-----------------+---------+--------------------+------+------------+-------+-----+-------+
only showing top 5 rows
5. Analizar los Datos con Spark#
Ahora realizaremos un análisis sobre el DataFrame:
Contar cuántas producciones son Películas (Movie) vs. Shows de TV (TV Show).
Encontrar el Top 5 de países con la mayor cantidad de producciones en Netflix.
from pyspark.sql.functions import col, desc
# 1. Conteo por tipo (Movie vs. TV Show)
df_conteo_tipo = df_netflix.groupBy("type").count()
print("📊 Conteo de producciones por tipo:")
df_conteo_tipo.show()
# 2. Top 5 países con más producciones
# Filtramos para excluir registros donde el país no está especificado
df_conteo_pais = df_netflix.filter(col("country") != "")\
.groupBy("country")\
.count()\
.orderBy(desc("count"))\
.limit(5)
print("\n🏆 Top 5 países con más producciones en Netflix:")
df_conteo_pais.show()
📊 Conteo de producciones por tipo:
+-------+-----+
| type|count|
+-------+-----+
|TV Show| 2410|
| Movie| 5377|
+-------+-----+
🏆 Top 5 países con más producciones en Netflix:
+--------------+-----+
| country|count|
+--------------+-----+
| United States| 2555|
| India| 923|
|United Kingdom| 397|
| Japan| 226|
| South Korea| 183|
+--------------+-----+
6. Escribir Resultados de Vuelta a MongoDB#
Finalmente, guardaremos el resultado de nuestro análisis (el Top 5 de países) en una nueva colección en MongoDB para que pueda ser consultado por otras aplicaciones.
# Definimos el nombre de la nueva colección para los resultados
RESULTS_COLLECTION_NAME = "top_paises_productores"
# Escribimos el DataFrame de resultados en MongoDB
# El modo "overwrite" borrará la colección si ya existe y la creará de nuevo.
df_conteo_pais.write.format("mongo")\
.option("uri", MONGO_URI)\
.option("database", DB_NAME)\
.option("collection", RESULTS_COLLECTION_NAME)\
.mode("overwrite")\
.save()
print(f"✅ Resultados guardados en la colección '{RESULTS_COLLECTION_NAME}'.")
# Verificación final usando pymongo para confirmar que los datos se escribieron
try:
client = pymongo.MongoClient(MONGO_URI)
db = client[DB_NAME]
results_collection = db[RESULTS_COLLECTION_NAME]
print("\n🧐 Verificando los datos escritos en MongoDB:")
for doc in results_collection.find():
print(doc)
client.close()
except Exception as e:
print(f"❌ Error al verificar los datos: {e}")
# Detenemos la sesión de Spark para liberar recursos
spark.stop()
✅ Resultados guardados en la colección 'top_paises_productores'.
🧐 Verificando los datos escritos en MongoDB:
{'_id': ObjectId('68ccb6b9a0f4ab6fc60db1ae'), 'country': 'United States', 'count': 2555}
{'_id': ObjectId('68ccb6b9a0f4ab6fc60db1af'), 'country': 'India', 'count': 923}
{'_id': ObjectId('68ccb6b9a0f4ab6fc60db1b0'), 'country': 'United Kingdom', 'count': 397}
{'_id': ObjectId('68ccb6b9a0f4ab6fc60db1b1'), 'country': 'Japan', 'count': 226}
{'_id': ObjectId('68ccb6b9a0f4ab6fc60db1b2'), 'country': 'South Korea', 'count': 183}