Tutorial de Spark: Aprendiendo Apache Spark

Contenido

Tutorial de Spark: Aprendiendo Apache Spark#

Adaptado de GitHub

Este tutorial te enseñará a usar Apache Spark, un marco de trabajo para el procesamiento de datos a gran escala, dentro de un cuaderno. Muchos marcos de trabajo tradicionales fueron diseñados para ejecutarse en una sola computadora. Sin embargo, muchos conjuntos de datos hoy en día son demasiado grandes para almacenarse en una sola computadora, e incluso cuando un conjunto de datos puede almacenarse en una computadora (como los conjuntos de datos en este tutorial), a menudo se puede procesar mucho más rápido utilizando varias computadoras. Spark tiene implementaciones eficientes de varias transformaciones y acciones que se pueden componer juntas para realizar procesamiento y análisis de datos. Spark se destaca en distribuir estas operaciones a través de un clúster mientras abstrae muchos de los detalles de implementación subyacentes. Spark ha sido diseñado con un enfoque en la escalabilidad y eficiencia. Con Spark, puedes comenzar a desarrollar tu solución en tu computadora portátil, usando un conjunto de datos pequeño, y luego usar ese mismo código para procesar terabytes o incluso petabytes en un clúster distribuido.#

Durante este tutorial cubriremos:#

Parte 1: Uso básico del cuaderno e integración con Python#

Parte 2: Introducción al uso de Apache Spark con la API de pySpark de Python ejecutándose en el navegador#

Parte 3: Uso de RDDs y encadenamiento de transformaciones y acciones#

Parte 4: Funciones Lambda#

Parte 5: Acciones adicionales de RDD#

Parte 6: Transformaciones adicionales de RDD#

Parte 7: Caché de RDDs y opciones de almacenamiento#

Las siguientes transformaciones serán cubiertas:#

  • map(), mapPartitions(), mapPartitionsWithIndex(), filter(), flatMap(), reduceByKey(), groupByKey()

Las siguientes acciones serán cubiertas:#

  • first(), take(), takeSample(), takeOrdered(), collect(), count(), countByValue(), reduce(), top()

También cubierto:#

  • cache(), unpersist(), id(), setName()

Nota que, para referencia, puedes consultar los detalles de estos métodos en la API de Python de Spark#

Part 0: Google Colaboratory environment set up#

# Download Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# Next, we will install Apache Spark 3.0.1 with Hadoop 2.7 from here.
!wget https://dlcdn.apache.org/spark/spark-3.5.6/spark-3.5.6-bin-hadoop3.tgz
# Now, we just need to unzip that folder.
!tar xf spark-3.5.6-bin-hadoop3.tgz

# Setting JVM and Spark path variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.6-bin-hadoop3"

# Installing required packages
!pip install pyspark==3.5.6
!pip install findspark
import findspark
findspark.init()
--2025-09-13 19:13:18--  https://dlcdn.apache.org/spark/spark-3.5.6/spark-3.5.6-bin-hadoop3.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400923510 (382M) [application/x-gzip]
Saving to: ‘spark-3.5.6-bin-hadoop3.tgz’

spark-3.5.6-bin-had 100%[===================>] 382.35M   302MB/s    in 1.3s    

2025-09-13 19:13:35 (302 MB/s) - ‘spark-3.5.6-bin-hadoop3.tgz’ saved [400923510/400923510]

Collecting pyspark==3.5.6
  Downloading pyspark-3.5.6.tar.gz (317.4 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 317.4/317.4 MB 4.3 MB/s eta 0:00:00
?25h  Preparing metadata (setup.py) ... ?25l?25hdone
Requirement already satisfied: py4j==0.10.9.7 in /usr/local/lib/python3.12/dist-packages (from pyspark==3.5.6) (0.10.9.7)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... ?25l?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.6-py2.py3-none-any.whl size=317895798 sha256=3223be3a1f0f0f064c32140c9d086f7bdfa796afc20b50dcdb8174976e239a73
  Stored in directory: /root/.cache/pip/wheels/64/62/f3/ec15656ea4ada0523cae62a1827fe7beb55d3c8c87174aad4a
Successfully built pyspark
Installing collected packages: pyspark
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.5.1
    Uninstalling pyspark-3.5.1:
      Successfully uninstalled pyspark-3.5.1
Successfully installed pyspark-3.5.6
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
# Crear un SparkContext (sc)
from pyspark import SparkContext
sc = SparkContext("local", "example")

Parte 1: Uso básico del cuaderno e integración con Python#

(1a) Notebook usage#

####Un cuaderno está compuesto por una secuencia lineal de celdas. Estas celdas pueden contener markdown o código, pero no mezclaremos ambos en una sola celda. Cuando se ejecuta una celda de markdown, se muestra texto formateado, imágenes y enlaces, al igual que HTML en una página web normal. El texto que estás leyendo ahora es parte de una celda de markdown. Las celdas de código de Python te permiten ejecutar comandos arbitrarios de Python, como en cualquier intérprete de Python. Coloca tu cursor dentro de la celda a continuación y presiona «Shift» + «Enter» para ejecutar el código y avanzar a la siguiente celda. También puedes presionar «Ctrl» + «Enter» para ejecutar el código y permanecer en la celda. Estos comandos funcionan igual en celdas de markdown y de código.

#  Esta es una celda Python. Puedes ejecutar código Python normal aquí...
print('The sum of 1 and 1 is {0}'.format(1+1))
The sum of 1 and 1 is 2
# Aquí hay otra celda de Python, esta vez con una declaración de variable (x) y una sentencia if:
x = 42
if x > 40:
    print('The sum of 1 and 2 is {0}'.format(1+2))
The sum of 1 and 2 is 3

(1b) Estado del cuaderno#

A medida que trabajas en un cuaderno, es importante que ejecutes todas las celdas de código. El cuaderno es con estado, lo que significa que las variables y sus valores se conservan hasta que el cuaderno se desconecta (en Databricks Cloud) o se reinicia el kernel (en cuadernos IPython). Si no ejecutas todas las celdas de código a medida que avanzas por el cuaderno, tus variables no se inicializarán correctamente y el código posterior podría fallar. También necesitarás volver a ejecutar cualquier celda que hayas modificado para que los cambios estén disponibles en otras celdas.#

# Esta celda depende de que x ya esté definida.
# Si no ejecutáramos las celdas de la parte (1a) este código fallaría.
print(x * 2)
84

(1c) Importar Librerias#

Podemos importar bibliotecas estándar de Python (módulos) de la manera habitual. Una declaración import importará el módulo especificado. En este tutorial y en futuros laboratorios, proporcionaremos las importaciones necesarias.#

# Importar la biblioteca de expresiones regulares
import re
m = re.search('(?<=abc)def', 'abcdef')
m.group(0)
'def'
# Importar la biblioteca datetime
import datetime
print('This was last run on: {0}'.format(datetime.datetime.now()))
This was last run on: 2025-09-06 15:59:22.703444

Parte 2: Una introducción al uso de Apache Spark con la API de pySpark de Python ejecutándose en el navegador#

Contexto de Spark#

En Spark, la comunicación ocurre entre un driver y los ejecutores. El driver tiene trabajos de Spark que necesita ejecutar y estos trabajos se dividen en tareas que se envían a los ejecutores para su finalización. Los resultados de estas tareas se entregan de vuelta al driver.#

En la parte 1, vimos que el código Python normal puede ejecutarse a través de celdas. Al usar Databricks Cloud, este código se ejecuta en la Máquina Virtual Java (JVM) del driver de Spark y no en la JVM de un ejecutor, y al usar un cuaderno IPython, se ejecuta dentro del kernel asociado con el cuaderno. Dado que no se está utilizando ninguna funcionalidad de Spark, no se lanzan tareas en los ejecutores.#

Para usar Spark y su API necesitaremos usar un SparkContext. Al ejecutar Spark, inicias una nueva aplicación de Spark creando un SparkContext. Cuando se crea el SparkContext, solicita al master algunos núcleos para usar en el trabajo. El master reserva estos núcleos solo para ti; no se usarán para otras aplicaciones. Al usar Databricks Cloud o la máquina virtual provisionada para esta clase, el SparkContext se crea automáticamente como sc.#

(2a) Ejemplo de Clúster#

El diagrama a continuación muestra un clúster de ejemplo, donde los núcleos asignados para una aplicación están delineados en púrpura.#

executors

Puedes ver los detalles de tu aplicación Spark en la interfaz web de Spark. La interfaz web es accesible en Databricks Cloud yendo a «Clusters» y luego haciendo clic en el enlace «View Spark UI» para tu clúster. Al ejecutarlo localmente, lo encontrarás en localhost:4040. En la interfaz web, bajo la pestaña «Jobs», puedes ver una lista de trabajos que han sido programados o ejecutados. Es probable que no haya nada interesante aquí todavía porque no hemos ejecutado ningún trabajo, pero volveremos a esta página más tarde.#

A un alto nivel, cada aplicación Spark consta de un programa driver que lanza varias operaciones paralelas en Máquinas Virtuales Java (JVMs) de ejecutores que se ejecutan en un clúster o localmente en la misma máquina. En Databricks Cloud, «Databricks Shell» es el programa driver. Al ejecutarlo localmente, «PySparkShell» es el programa driver. En todos los casos, este programa driver contiene el bucle principal del programa y crea conjuntos de datos distribuidos en el clúster, luego aplica operaciones (transformaciones y acciones) a esos conjuntos de datos.#

Los programas driver acceden a Spark a través de un objeto SparkContext, que representa una conexión a un clúster de computación. Un objeto de contexto Spark (sc) es el punto de entrada principal para la funcionalidad de Spark. Un contexto Spark se puede usar para crear Conjuntos de Datos Distribuidos Resilientes (RDDs) en un clúster.#

Intenta imprimir sc para ver su tipo.#

# Mostrar el tipo de Spark Context sc
print(type(sc))
<class 'pyspark.context.SparkContext'>

(2b) Atributos de SparkContext#

Puedes usar la función dir() de Python para obtener una lista de todos los atributos (incluidos los métodos) accesibles a través del objeto sc.#

# Listar los atributos de sc
dir(sc)
['PACKAGE_EXTENSIONS',
 '__annotations__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getnewargs__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_accumulatorServer',
 '_active_spark_context',
 '_assert_on_driver',
 '_batchSize',
 '_callsite',
 '_checkpointFile',
 '_conf',
 '_dictToJavaMap',
 '_do_init',
 '_encryption_enabled',
 '_ensure_initialized',
 '_gateway',
 '_getJavaStorageLevel',
 '_initialize_context',
 '_javaAccumulator',
 '_jsc',
 '_jvm',
 '_lock',
 '_next_accum_id',
 '_pickled_broadcast_vars',
 '_python_includes',
 '_repr_html_',
 '_serialize_to_jvm',
 '_temp_dir',
 '_unbatched_serializer',
 'accumulator',
 'addArchive',
 'addFile',
 'addJobTag',
 'addPyFile',
 'appName',
 'applicationId',
 'binaryFiles',
 'binaryRecords',
 'broadcast',
 'cancelAllJobs',
 'cancelJobGroup',
 'cancelJobsWithTag',
 'clearJobTags',
 'defaultMinPartitions',
 'defaultParallelism',
 'dump_profiles',
 'emptyRDD',
 'environment',
 'getCheckpointDir',
 'getConf',
 'getJobTags',
 'getLocalProperty',
 'getOrCreate',
 'hadoopFile',
 'hadoopRDD',
 'listArchives',
 'listFiles',
 'master',
 'newAPIHadoopFile',
 'newAPIHadoopRDD',
 'parallelize',
 'pickleFile',
 'profiler_collector',
 'pythonExec',
 'pythonVer',
 'range',
 'removeJobTag',
 'resources',
 'runJob',
 'sequenceFile',
 'serializer',
 'setCheckpointDir',
 'setInterruptOnCancel',
 'setJobDescription',
 'setJobGroup',
 'setLocalProperty',
 'setLogLevel',
 'setSystemProperty',
 'show_profiles',
 'sparkHome',
 'sparkUser',
 'startTime',
 'statusTracker',
 'stop',
 'textFile',
 'uiWebUrl',
 'union',
 'version',
 'wholeTextFiles']

(2c) Obtener ayuda#

Alternativamente, puedes usar la función help() de Python para obtener una lista más fácil de leer de todos los atributos, incluidos ejemplos, que tiene el objeto sc.#

# Utiliza la ayuda para obtener información más detallada
help(sc)
Help on SparkContext in module pyspark.context object:

class SparkContext(builtins.object)
 |  SparkContext(master: Optional[str] = None, appName: Optional[str] = None, sparkHome: Optional[str] = None, pyFiles: Optional[List[str]] = None, environment: Optional[Dict[str, Any]] = None, batchSize: int = 0, serializer: 'Serializer' = CloudPickleSerializer(), conf: Optional[pyspark.conf.SparkConf] = None, gateway: Optional[py4j.java_gateway.JavaGateway] = None, jsc: Optional[py4j.java_gateway.JavaObject] = None, profiler_cls: Type[pyspark.profiler.BasicProfiler] = <class 'pyspark.profiler.BasicProfiler'>, udf_profiler_cls: Type[pyspark.profiler.UDFBasicProfiler] = <class 'pyspark.profiler.UDFBasicProfiler'>, memory_profiler_cls: Type[pyspark.profiler.MemoryProfiler] = <class 'pyspark.profiler.MemoryProfiler'>)
 |
 |  Main entry point for Spark functionality. A SparkContext represents the
 |  connection to a Spark cluster, and can be used to create :class:`RDD` and
 |  broadcast variables on that cluster.
 |
 |  When you create a new SparkContext, at least the master and app name should
 |  be set, either through the named parameters here or through `conf`.
 |
 |  Parameters
 |  ----------
 |  master : str, optional
 |      Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
 |  appName : str, optional
 |      A name for your job, to display on the cluster web UI.
 |  sparkHome : str, optional
 |      Location where Spark is installed on cluster nodes.
 |  pyFiles : list, optional
 |      Collection of .zip or .py files to send to the cluster
 |      and add to PYTHONPATH.  These can be paths on the local file
 |      system or HDFS, HTTP, HTTPS, or FTP URLs.
 |  environment : dict, optional
 |      A dictionary of environment variables to set on
 |      worker nodes.
 |  batchSize : int, optional, default 0
 |      The number of Python objects represented as a single
 |      Java object. Set 1 to disable batching, 0 to automatically choose
 |      the batch size based on object sizes, or -1 to use an unlimited
 |      batch size
 |  serializer : :class:`Serializer`, optional, default :class:`CPickleSerializer`
 |      The serializer for RDDs.
 |  conf : :class:`SparkConf`, optional
 |      An object setting Spark properties.
 |  gateway : class:`py4j.java_gateway.JavaGateway`,  optional
 |      Use an existing gateway and JVM, otherwise a new JVM
 |      will be instantiated. This is only used internally.
 |  jsc : class:`py4j.java_gateway.JavaObject`, optional
 |      The JavaSparkContext instance. This is only used internally.
 |  profiler_cls : type, optional, default :class:`BasicProfiler`
 |      A class of custom Profiler used to do profiling
 |  udf_profiler_cls : type, optional, default :class:`UDFBasicProfiler`
 |      A class of custom Profiler used to do udf profiling
 |
 |  Notes
 |  -----
 |  Only one :class:`SparkContext` should be active per JVM. You must `stop()`
 |  the active :class:`SparkContext` before creating a new one.
 |
 |  :class:`SparkContext` instance is not supported to share across multiple
 |  processes out of the box, and PySpark does not guarantee multi-processing execution.
 |  Use threads instead for concurrent processing purpose.
 |
 |  Examples
 |  --------
 |  >>> from pyspark.context import SparkContext
 |  >>> sc = SparkContext('local', 'test')
 |  >>> sc2 = SparkContext('local', 'test2') # doctest: +IGNORE_EXCEPTION_DETAIL
 |  Traceback (most recent call last):
 |      ...
 |  ValueError: ...
 |
 |  Methods defined here:
 |
 |  __enter__(self) -> 'SparkContext'
 |      Enable 'with SparkContext(...) as sc: app(sc)' syntax.
 |
 |  __exit__(self, type: Optional[Type[BaseException]], value: Optional[BaseException], trace: Optional[traceback]) -> None
 |      Enable 'with SparkContext(...) as sc: app' syntax.
 |
 |      Specifically stop the context on exit of the with block.
 |
 |  __getnewargs__(self) -> NoReturn
 |
 |  __init__(self, master: Optional[str] = None, appName: Optional[str] = None, sparkHome: Optional[str] = None, pyFiles: Optional[List[str]] = None, environment: Optional[Dict[str, Any]] = None, batchSize: int = 0, serializer: 'Serializer' = CloudPickleSerializer(), conf: Optional[pyspark.conf.SparkConf] = None, gateway: Optional[py4j.java_gateway.JavaGateway] = None, jsc: Optional[py4j.java_gateway.JavaObject] = None, profiler_cls: Type[pyspark.profiler.BasicProfiler] = <class 'pyspark.profiler.BasicProfiler'>, udf_profiler_cls: Type[pyspark.profiler.UDFBasicProfiler] = <class 'pyspark.profiler.UDFBasicProfiler'>, memory_profiler_cls: Type[pyspark.profiler.MemoryProfiler] = <class 'pyspark.profiler.MemoryProfiler'>)
 |      Initialize self.  See help(type(self)) for accurate signature.
 |
 |  __repr__(self) -> str
 |      Return repr(self).
 |
 |  accumulator(self, value: ~T, accum_param: Optional[ForwardRef('AccumulatorParam[T]')] = None) -> 'Accumulator[T]'
 |      Create an :class:`Accumulator` with the given initial value, using a given
 |      :class:`AccumulatorParam` helper object to define how to add values of the
 |      data type if provided. Default AccumulatorParams are used for integers
 |      and floating-point numbers if you do not provide one. For other types,
 |      a custom AccumulatorParam can be used.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      value : T
 |          initialized value
 |      accum_param : :class:`pyspark.AccumulatorParam`, optional
 |          helper object to define how to add values
 |
 |      Returns
 |      -------
 |      :class:`Accumulator`
 |          `Accumulator` object, a shared variable that can be accumulated
 |
 |      Examples
 |      --------
 |      >>> acc = sc.accumulator(9)
 |      >>> acc.value
 |      9
 |      >>> acc += 1
 |      >>> acc.value
 |      10
 |
 |      Accumulator object can be accumulated in RDD operations:
 |
 |      >>> rdd = sc.range(5)
 |      >>> def f(x):
 |      ...     global acc
 |      ...     acc += 1
 |      ...
 |      >>> rdd.foreach(f)
 |      >>> acc.value
 |      15
 |
 |  addArchive(self, path: str) -> None
 |      Add an archive to be downloaded with this Spark job on every node.
 |      The `path` passed can be either a local file, a file in HDFS
 |      (or other Hadoop-supported filesystems), or an HTTP, HTTPS or
 |      FTP URI.
 |
 |      To access the file in Spark jobs, use :meth:`SparkFiles.get` with the
 |      filename to find its download/unpacked location. The given path should
 |      be one of .zip, .tar, .tar.gz, .tgz and .jar.
 |
 |      .. versionadded:: 3.3.0
 |
 |      Parameters
 |      ----------
 |      path : str
 |          can be either a local file, a file in HDFS (or other Hadoop-supported
 |          filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
 |          use :meth:`SparkFiles.get` to find its download location.
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.listArchives`
 |      :meth:`SparkFiles.get`
 |
 |      Notes
 |      -----
 |      A path can be added only once. Subsequent additions of the same path are ignored.
 |      This API is experimental.
 |
 |      Examples
 |      --------
 |      Creates a zipped file that contains a text file written '100'.
 |
 |      >>> import os
 |      >>> import tempfile
 |      >>> import zipfile
 |      >>> from pyspark import SparkFiles
 |
 |      >>> with tempfile.TemporaryDirectory() as d:
 |      ...     path = os.path.join(d, "test.txt")
 |      ...     with open(path, "w") as f:
 |      ...         _ = f.write("100")
 |      ...
 |      ...     zip_path1 = os.path.join(d, "test1.zip")
 |      ...     with zipfile.ZipFile(zip_path1, "w", zipfile.ZIP_DEFLATED) as z:
 |      ...         z.write(path, os.path.basename(path))
 |      ...
 |      ...     zip_path2 = os.path.join(d, "test2.zip")
 |      ...     with zipfile.ZipFile(zip_path2, "w", zipfile.ZIP_DEFLATED) as z:
 |      ...         z.write(path, os.path.basename(path))
 |      ...
 |      ...     sc.addArchive(zip_path1)
 |      ...     arch_list1 = sorted(sc.listArchives)
 |      ...
 |      ...     sc.addArchive(zip_path2)
 |      ...     arch_list2 = sorted(sc.listArchives)
 |      ...
 |      ...     # add zip_path2 twice, this addition will be ignored
 |      ...     sc.addArchive(zip_path2)
 |      ...     arch_list3 = sorted(sc.listArchives)
 |      ...
 |      ...     def func(iterator):
 |      ...         with open("%s/test.txt" % SparkFiles.get("test1.zip")) as f:
 |      ...             mul = int(f.readline())
 |      ...             return [x * mul for x in iterator]
 |      ...
 |      ...     collected = sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
 |
 |      >>> arch_list1
 |      ['file:/.../test1.zip']
 |      >>> arch_list2
 |      ['file:/.../test1.zip', 'file:/.../test2.zip']
 |      >>> arch_list3
 |      ['file:/.../test1.zip', 'file:/.../test2.zip']
 |      >>> collected
 |      [100, 200, 300, 400]
 |
 |  addFile(self, path: str, recursive: bool = False) -> None
 |      Add a file to be downloaded with this Spark job on every node.
 |      The `path` passed can be either a local file, a file in HDFS
 |      (or other Hadoop-supported filesystems), or an HTTP, HTTPS or
 |      FTP URI.
 |
 |      To access the file in Spark jobs, use :meth:`SparkFiles.get` with the
 |      filename to find its download location.
 |
 |      A directory can be given if the recursive option is set to True.
 |      Currently directories are only supported for Hadoop-supported filesystems.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      path : str
 |          can be either a local file, a file in HDFS (or other Hadoop-supported
 |          filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
 |          use :meth:`SparkFiles.get` to find its download location.
 |      recursive : bool, default False
 |          whether to recursively add files in the input directory
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.listFiles`
 |      :meth:`SparkContext.addPyFile`
 |      :meth:`SparkFiles.get`
 |
 |      Notes
 |      -----
 |      A path can be added only once. Subsequent additions of the same path are ignored.
 |
 |      Examples
 |      --------
 |      >>> import os
 |      >>> import tempfile
 |      >>> from pyspark import SparkFiles
 |
 |      >>> with tempfile.TemporaryDirectory() as d:
 |      ...     path1 = os.path.join(d, "test1.txt")
 |      ...     with open(path1, "w") as f:
 |      ...         _ = f.write("100")
 |      ...
 |      ...     path2 = os.path.join(d, "test2.txt")
 |      ...     with open(path2, "w") as f:
 |      ...         _ = f.write("200")
 |      ...
 |      ...     sc.addFile(path1)
 |      ...     file_list1 = sorted(sc.listFiles)
 |      ...
 |      ...     sc.addFile(path2)
 |      ...     file_list2 = sorted(sc.listFiles)
 |      ...
 |      ...     # add path2 twice, this addition will be ignored
 |      ...     sc.addFile(path2)
 |      ...     file_list3 = sorted(sc.listFiles)
 |      ...
 |      ...     def func(iterator):
 |      ...         with open(SparkFiles.get("test1.txt")) as f:
 |      ...             mul = int(f.readline())
 |      ...             return [x * mul for x in iterator]
 |      ...
 |      ...     collected = sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
 |
 |      >>> file_list1
 |      ['file:/.../test1.txt']
 |      >>> file_list2
 |      ['file:/.../test1.txt', 'file:/.../test2.txt']
 |      >>> file_list3
 |      ['file:/.../test1.txt', 'file:/.../test2.txt']
 |      >>> collected
 |      [100, 200, 300, 400]
 |
 |  addJobTag(self, tag: str) -> None
 |      Add a tag to be assigned to all the jobs started by this thread.
 |
 |      .. versionadded:: 3.5.0
 |
 |      Parameters
 |      ----------
 |      tag : str
 |          The tag to be added. Cannot contain ',' (comma) character.
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.removeJobTag`
 |      :meth:`SparkContext.getJobTags`
 |      :meth:`SparkContext.clearJobTags`
 |      :meth:`SparkContext.cancelJobsWithTag`
 |      :meth:`SparkContext.setInterruptOnCancel`
 |
 |      Examples
 |      --------
 |      >>> import threading
 |      >>> from time import sleep
 |      >>> from pyspark import InheritableThread
 |      >>> sc.setInterruptOnCancel(interruptOnCancel=True)
 |      >>> result = "Not Set"
 |      >>> lock = threading.Lock()
 |      >>> def map_func(x):
 |      ...     sleep(100)
 |      ...     raise RuntimeError("Task should have been cancelled")
 |      ...
 |      >>> def start_job(x):
 |      ...     global result
 |      ...     try:
 |      ...         sc.addJobTag("job_to_cancel")
 |      ...         result = sc.parallelize(range(x)).map(map_func).collect()
 |      ...     except Exception as e:
 |      ...         result = "Cancelled"
 |      ...     lock.release()
 |      ...
 |      >>> def stop_job():
 |      ...     sleep(5)
 |      ...     sc.cancelJobsWithTag("job_to_cancel")
 |      ...
 |      >>> suppress = lock.acquire()
 |      >>> suppress = InheritableThread(target=start_job, args=(10,)).start()
 |      >>> suppress = InheritableThread(target=stop_job).start()
 |      >>> suppress = lock.acquire()
 |      >>> print(result)
 |      Cancelled
 |      >>> sc.clearJobTags()
 |
 |  addPyFile(self, path: str) -> None
 |      Add a .py or .zip dependency for all tasks to be executed on this
 |      SparkContext in the future.  The `path` passed can be either a local
 |      file, a file in HDFS (or other Hadoop-supported filesystems), or an
 |      HTTP, HTTPS or FTP URI.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      path : str
 |          can be either a .py file or .zip dependency.
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.addFile`
 |
 |      Notes
 |      -----
 |      A path can be added only once. Subsequent additions of the same path are ignored.
 |
 |  binaryFiles(self, path: str, minPartitions: Optional[int] = None) -> pyspark.rdd.RDD[typing.Tuple[str, bytes]]
 |      Read a directory of binary files from HDFS, a local file system
 |      (available on all nodes), or any Hadoop-supported file system URI
 |      as a byte array. Each file is read as a single record and returned
 |      in a key-value pair, where the key is the path of each file, the
 |      value is the content of each file.
 |
 |      .. versionadded:: 1.3.0
 |
 |      Parameters
 |      ----------
 |      path : str
 |          directory to the input data files, the path can be comma separated
 |          paths as a list of inputs
 |      minPartitions : int, optional
 |          suggested minimum number of partitions for the resulting RDD
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          RDD representing path-content pairs from the file(s).
 |
 |      Notes
 |      -----
 |      Small files are preferred, large file is also allowable, but may cause bad performance.
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.binaryRecords`
 |
 |      Examples
 |      --------
 |      >>> import os
 |      >>> import tempfile
 |      >>> with tempfile.TemporaryDirectory() as d:
 |      ...     # Write a temporary binary file
 |      ...     with open(os.path.join(d, "1.bin"), "wb") as f1:
 |      ...         _ = f1.write(b"binary data I")
 |      ...
 |      ...     # Write another temporary binary file
 |      ...     with open(os.path.join(d, "2.bin"), "wb") as f2:
 |      ...         _ = f2.write(b"binary data II")
 |      ...
 |      ...     collected = sorted(sc.binaryFiles(d).collect())
 |
 |      >>> collected
 |      [('.../1.bin', b'binary data I'), ('.../2.bin', b'binary data II')]
 |
 |  binaryRecords(self, path: str, recordLength: int) -> pyspark.rdd.RDD[bytes]
 |      Load data from a flat binary file, assuming each record is a set of numbers
 |      with the specified numerical format (see ByteBuffer), and the number of
 |      bytes per record is constant.
 |
 |      .. versionadded:: 1.3.0
 |
 |      Parameters
 |      ----------
 |      path : str
 |          Directory to the input data files
 |      recordLength : int
 |          The length at which to split the records
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          RDD of data with values, represented as byte arrays
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.binaryFiles`
 |
 |      Examples
 |      --------
 |      >>> import os
 |      >>> import tempfile
 |      >>> with tempfile.TemporaryDirectory() as d:
 |      ...     # Write a temporary file
 |      ...     with open(os.path.join(d, "1.bin"), "w") as f:
 |      ...         for i in range(3):
 |      ...             _ = f.write("%04d" % i)
 |      ...
 |      ...     # Write another file
 |      ...     with open(os.path.join(d, "2.bin"), "w") as f:
 |      ...         for i in [-1, -2, -10]:
 |      ...             _ = f.write("%04d" % i)
 |      ...
 |      ...     collected = sorted(sc.binaryRecords(d, 4).collect())
 |
 |      >>> collected
 |      [b'-001', b'-002', b'-010', b'0000', b'0001', b'0002']
 |
 |  broadcast(self, value: ~T) -> 'Broadcast[T]'
 |      Broadcast a read-only variable to the cluster, returning a :class:`Broadcast`
 |      object for reading it in distributed functions. The variable will
 |      be sent to each cluster only once.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      value : T
 |          value to broadcast to the Spark nodes
 |
 |      Returns
 |      -------
 |      :class:`Broadcast`
 |          :class:`Broadcast` object, a read-only variable cached on each machine
 |
 |      Examples
 |      --------
 |      >>> mapping = {1: 10001, 2: 10002}
 |      >>> bc = sc.broadcast(mapping)
 |
 |      >>> rdd = sc.range(5)
 |      >>> rdd2 = rdd.map(lambda i: bc.value[i] if i in bc.value else -1)
 |      >>> rdd2.collect()
 |      [-1, 10001, 10002, -1, -1]
 |
 |      >>> bc.destroy()
 |
 |  cancelAllJobs(self) -> None
 |      Cancel all jobs that have been scheduled or are running.
 |
 |      .. versionadded:: 1.1.0
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.cancelJobGroup`
 |      :meth:`SparkContext.cancelJobsWithTag`
 |      :meth:`SparkContext.runJob`
 |
 |  cancelJobGroup(self, groupId: str) -> None
 |      Cancel active jobs for the specified group. See :meth:`SparkContext.setJobGroup`.
 |      for more information.
 |
 |      .. versionadded:: 1.1.0
 |
 |      Parameters
 |      ----------
 |      groupId : str
 |          The group ID to cancel the job.
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.setJobGroup`
 |
 |  cancelJobsWithTag(self, tag: str) -> None
 |      Cancel active jobs that have the specified tag. See
 |      :meth:`SparkContext.addJobTag`.
 |
 |      .. versionadded:: 3.5.0
 |
 |      Parameters
 |      ----------
 |      tag : str
 |          The tag to be cancelled. Cannot contain ',' (comma) character.
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.addJobTag`
 |      :meth:`SparkContext.removeJobTag`
 |      :meth:`SparkContext.getJobTags`
 |      :meth:`SparkContext.clearJobTags`
 |      :meth:`SparkContext.setInterruptOnCancel`
 |
 |  clearJobTags(self) -> None
 |      Clear the current thread's job tags.
 |
 |      .. versionadded:: 3.5.0
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.addJobTag`
 |      :meth:`SparkContext.removeJobTag`
 |      :meth:`SparkContext.getJobTags`
 |      :meth:`SparkContext.cancelJobsWithTag`
 |      :meth:`SparkContext.setInterruptOnCancel`
 |
 |      Examples
 |      --------
 |      >>> sc.addJobTag("job_to_cancel")
 |      >>> sc.clearJobTags()
 |      >>> sc.getJobTags()
 |      set()
 |
 |  dump_profiles(self, path: str) -> None
 |      Dump the profile stats into directory `path`
 |
 |      .. versionadded:: 1.2.0
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.show_profiles`
 |
 |  emptyRDD(self) -> pyspark.rdd.RDD[typing.Any]
 |      Create an :class:`RDD` that has no partitions or elements.
 |
 |      .. versionadded:: 1.5.0
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          An empty RDD
 |
 |      Examples
 |      --------
 |      >>> sc.emptyRDD()
 |      EmptyRDD...
 |      >>> sc.emptyRDD().count()
 |      0
 |
 |  getCheckpointDir(self) -> Optional[str]
 |      Return the directory where RDDs are checkpointed. Returns None if no
 |      checkpoint directory has been set.
 |
 |      .. versionadded:: 3.1.0
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.setCheckpointDir`
 |      :meth:`RDD.checkpoint`
 |      :meth:`RDD.getCheckpointFile`
 |
 |  getConf(self) -> pyspark.conf.SparkConf
 |      Return a copy of this SparkContext's configuration :class:`SparkConf`.
 |
 |      .. versionadded:: 2.1.0
 |
 |  getJobTags(self) -> Set[str]
 |      Get the tags that are currently set to be assigned to all the jobs started by this thread.
 |
 |      .. versionadded:: 3.5.0
 |
 |      Returns
 |      -------
 |      set of str
 |          the tags that are currently set to be assigned to all the jobs started by this thread.
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.addJobTag`
 |      :meth:`SparkContext.removeJobTag`
 |      :meth:`SparkContext.clearJobTags`
 |      :meth:`SparkContext.cancelJobsWithTag`
 |      :meth:`SparkContext.setInterruptOnCancel`
 |
 |      Examples
 |      --------
 |      >>> sc.addJobTag("job_to_cancel")
 |      >>> sc.getJobTags()
 |      {'job_to_cancel'}
 |      >>> sc.clearJobTags()
 |
 |  getLocalProperty(self, key: str) -> Optional[str]
 |      Get a local property set in this thread, or null if it is missing. See
 |      :meth:`setLocalProperty`.
 |
 |      .. versionadded:: 1.0.0
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.setLocalProperty`
 |
 |  hadoopFile(self, path: str, inputFormatClass: str, keyClass: str, valueClass: str, keyConverter: Optional[str] = None, valueConverter: Optional[str] = None, conf: Optional[Dict[str, str]] = None, batchSize: int = 0) -> pyspark.rdd.RDD[typing.Tuple[~T, ~U]]
 |      Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
 |      a local file system (available on all nodes), or any Hadoop-supported file system URI.
 |      The mechanism is the same as for meth:`SparkContext.sequenceFile`.
 |
 |      .. versionadded:: 1.1.0
 |
 |      A Hadoop configuration can be passed in as a Python dict. This will be converted into a
 |      Configuration in Java.
 |
 |      Parameters
 |      ----------
 |      path : str
 |          path to Hadoop file
 |      inputFormatClass : str
 |          fully qualified classname of Hadoop InputFormat
 |          (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat")
 |      keyClass : str
 |          fully qualified classname of key Writable class (e.g. "org.apache.hadoop.io.Text")
 |      valueClass : str
 |          fully qualified classname of value Writable class
 |          (e.g. "org.apache.hadoop.io.LongWritable")
 |      keyConverter : str, optional
 |          fully qualified name of a function returning key WritableConverter
 |      valueConverter : str, optional
 |          fully qualified name of a function returning value WritableConverter
 |      conf : dict, optional
 |          Hadoop configuration, passed in as a dict
 |      batchSize : int, optional, default 0
 |          The number of Python objects represented as a single
 |          Java object. (default 0, choose batchSize automatically)
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          RDD of tuples of key and corresponding value
 |
 |      See Also
 |      --------
 |      :meth:`RDD.saveAsSequenceFile`
 |      :meth:`RDD.saveAsNewAPIHadoopFile`
 |      :meth:`RDD.saveAsHadoopFile`
 |      :meth:`SparkContext.newAPIHadoopFile`
 |      :meth:`SparkContext.hadoopRDD`
 |
 |      Examples
 |      --------
 |      >>> import os
 |      >>> import tempfile
 |
 |      Set the related classes
 |
 |      >>> output_format_class = "org.apache.hadoop.mapred.TextOutputFormat"
 |      >>> input_format_class = "org.apache.hadoop.mapred.TextInputFormat"
 |      >>> key_class = "org.apache.hadoop.io.IntWritable"
 |      >>> value_class = "org.apache.hadoop.io.Text"
 |
 |      >>> with tempfile.TemporaryDirectory() as d:
 |      ...     path = os.path.join(d, "old_hadoop_file")
 |      ...
 |      ...     # Write a temporary Hadoop file
 |      ...     rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")])
 |      ...     rdd.saveAsHadoopFile(path, output_format_class, key_class, value_class)
 |      ...
 |      ...     loaded = sc.hadoopFile(path, input_format_class, key_class, value_class)
 |      ...     collected = sorted(loaded.collect())
 |
 |      >>> collected
 |      [(0, '1\t'), (0, '1\ta'), (0, '3\tx')]
 |
 |  hadoopRDD(self, inputFormatClass: str, keyClass: str, valueClass: str, keyConverter: Optional[str] = None, valueConverter: Optional[str] = None, conf: Optional[Dict[str, str]] = None, batchSize: int = 0) -> pyspark.rdd.RDD[typing.Tuple[~T, ~U]]
 |      Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
 |      Hadoop configuration, which is passed in as a Python dict.
 |      This will be converted into a Configuration in Java.
 |      The mechanism is the same as for meth:`SparkContext.sequenceFile`.
 |
 |      .. versionadded:: 1.1.0
 |
 |      Parameters
 |      ----------
 |      inputFormatClass : str
 |          fully qualified classname of Hadoop InputFormat
 |          (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat")
 |      keyClass : str
 |          fully qualified classname of key Writable class (e.g. "org.apache.hadoop.io.Text")
 |      valueClass : str
 |          fully qualified classname of value Writable class
 |          (e.g. "org.apache.hadoop.io.LongWritable")
 |      keyConverter : str, optional
 |          fully qualified name of a function returning key WritableConverter
 |      valueConverter : str, optional
 |          fully qualified name of a function returning value WritableConverter
 |      conf : dict, optional
 |          Hadoop configuration, passed in as a dict
 |      batchSize : int, optional, default 0
 |          The number of Python objects represented as a single
 |          Java object. (default 0, choose batchSize automatically)
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          RDD of tuples of key and corresponding value
 |
 |      See Also
 |      --------
 |      :meth:`RDD.saveAsNewAPIHadoopDataset`
 |      :meth:`RDD.saveAsHadoopDataset`
 |      :meth:`SparkContext.newAPIHadoopRDD`
 |      :meth:`SparkContext.hadoopFile`
 |
 |      Examples
 |      --------
 |      >>> import os
 |      >>> import tempfile
 |
 |      Set the related classes
 |
 |      >>> output_format_class = "org.apache.hadoop.mapred.TextOutputFormat"
 |      >>> input_format_class = "org.apache.hadoop.mapred.TextInputFormat"
 |      >>> key_class = "org.apache.hadoop.io.IntWritable"
 |      >>> value_class = "org.apache.hadoop.io.Text"
 |
 |      >>> with tempfile.TemporaryDirectory() as d:
 |      ...     path = os.path.join(d, "old_hadoop_file")
 |      ...
 |      ...     # Create the conf for writing
 |      ...     write_conf = {
 |      ...         "mapred.output.format.class": output_format_class,
 |      ...         "mapreduce.job.output.key.class": key_class,
 |      ...         "mapreduce.job.output.value.class": value_class,
 |      ...         "mapreduce.output.fileoutputformat.outputdir": path,
 |      ...     }
 |      ...
 |      ...     # Write a temporary Hadoop file
 |      ...     rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")])
 |      ...     rdd.saveAsHadoopDataset(conf=write_conf)
 |      ...
 |      ...     # Create the conf for reading
 |      ...     read_conf = {"mapreduce.input.fileinputformat.inputdir": path}
 |      ...
 |      ...     loaded = sc.hadoopRDD(input_format_class, key_class, value_class, conf=read_conf)
 |      ...     collected = sorted(loaded.collect())
 |
 |      >>> collected
 |      [(0, '1\t'), (0, '1\ta'), (0, '3\tx')]
 |
 |  newAPIHadoopFile(self, path: str, inputFormatClass: str, keyClass: str, valueClass: str, keyConverter: Optional[str] = None, valueConverter: Optional[str] = None, conf: Optional[Dict[str, str]] = None, batchSize: int = 0) -> pyspark.rdd.RDD[typing.Tuple[~T, ~U]]
 |      Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
 |      a local file system (available on all nodes), or any Hadoop-supported file system URI.
 |      The mechanism is the same as for meth:`SparkContext.sequenceFile`.
 |
 |      A Hadoop configuration can be passed in as a Python dict. This will be converted into a
 |      Configuration in Java
 |
 |      .. versionadded:: 1.1.0
 |
 |      Parameters
 |      ----------
 |      path : str
 |          path to Hadoop file
 |      inputFormatClass : str
 |          fully qualified classname of Hadoop InputFormat
 |          (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat")
 |      keyClass : str
 |          fully qualified classname of key Writable class
 |          (e.g. "org.apache.hadoop.io.Text")
 |      valueClass : str
 |          fully qualified classname of value Writable class
 |          (e.g. "org.apache.hadoop.io.LongWritable")
 |      keyConverter : str, optional
 |          fully qualified name of a function returning key WritableConverter
 |          None by default
 |      valueConverter : str, optional
 |          fully qualified name of a function returning value WritableConverter
 |          None by default
 |      conf : dict, optional
 |          Hadoop configuration, passed in as a dict
 |          None by default
 |      batchSize : int, optional, default 0
 |          The number of Python objects represented as a single
 |          Java object. (default 0, choose batchSize automatically)
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          RDD of tuples of key and corresponding value
 |
 |      See Also
 |      --------
 |      :meth:`RDD.saveAsSequenceFile`
 |      :meth:`RDD.saveAsNewAPIHadoopFile`
 |      :meth:`RDD.saveAsHadoopFile`
 |      :meth:`SparkContext.sequenceFile`
 |      :meth:`SparkContext.hadoopFile`
 |
 |      Examples
 |      --------
 |      >>> import os
 |      >>> import tempfile
 |
 |      Set the related classes
 |
 |      >>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"
 |      >>> input_format_class = "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"
 |      >>> key_class = "org.apache.hadoop.io.IntWritable"
 |      >>> value_class = "org.apache.hadoop.io.Text"
 |
 |      >>> with tempfile.TemporaryDirectory() as d:
 |      ...     path = os.path.join(d, "new_hadoop_file")
 |      ...
 |      ...     # Write a temporary Hadoop file
 |      ...     rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")])
 |      ...     rdd.saveAsNewAPIHadoopFile(path, output_format_class, key_class, value_class)
 |      ...
 |      ...     loaded = sc.newAPIHadoopFile(path, input_format_class, key_class, value_class)
 |      ...     collected = sorted(loaded.collect())
 |
 |      >>> collected
 |      [(1, ''), (1, 'a'), (3, 'x')]
 |
 |  newAPIHadoopRDD(self, inputFormatClass: str, keyClass: str, valueClass: str, keyConverter: Optional[str] = None, valueConverter: Optional[str] = None, conf: Optional[Dict[str, str]] = None, batchSize: int = 0) -> pyspark.rdd.RDD[typing.Tuple[~T, ~U]]
 |      Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
 |      Hadoop configuration, which is passed in as a Python dict.
 |      This will be converted into a Configuration in Java.
 |      The mechanism is the same as for meth:`SparkContext.sequenceFile`.
 |
 |      .. versionadded:: 1.1.0
 |
 |      Parameters
 |      ----------
 |      inputFormatClass : str
 |          fully qualified classname of Hadoop InputFormat
 |          (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat")
 |      keyClass : str
 |          fully qualified classname of key Writable class (e.g. "org.apache.hadoop.io.Text")
 |      valueClass : str
 |          fully qualified classname of value Writable class
 |          (e.g. "org.apache.hadoop.io.LongWritable")
 |      keyConverter : str, optional
 |          fully qualified name of a function returning key WritableConverter
 |          (None by default)
 |      valueConverter : str, optional
 |          fully qualified name of a function returning value WritableConverter
 |          (None by default)
 |      conf : dict, optional
 |          Hadoop configuration, passed in as a dict (None by default)
 |      batchSize : int, optional, default 0
 |          The number of Python objects represented as a single
 |          Java object. (default 0, choose batchSize automatically)
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          RDD of tuples of key and corresponding value
 |
 |      See Also
 |      --------
 |      :meth:`RDD.saveAsNewAPIHadoopDataset`
 |      :meth:`RDD.saveAsHadoopDataset`
 |      :meth:`SparkContext.hadoopRDD`
 |      :meth:`SparkContext.hadoopFile`
 |
 |      Examples
 |      --------
 |      >>> import os
 |      >>> import tempfile
 |
 |      Set the related classes
 |
 |      >>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"
 |      >>> input_format_class = "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"
 |      >>> key_class = "org.apache.hadoop.io.IntWritable"
 |      >>> value_class = "org.apache.hadoop.io.Text"
 |
 |      >>> with tempfile.TemporaryDirectory() as d:
 |      ...     path = os.path.join(d, "new_hadoop_file")
 |      ...
 |      ...     # Create the conf for writing
 |      ...     write_conf = {
 |      ...         "mapreduce.job.outputformat.class": (output_format_class),
 |      ...         "mapreduce.job.output.key.class": key_class,
 |      ...         "mapreduce.job.output.value.class": value_class,
 |      ...         "mapreduce.output.fileoutputformat.outputdir": path,
 |      ...     }
 |      ...
 |      ...     # Write a temporary Hadoop file
 |      ...     rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")])
 |      ...     rdd.saveAsNewAPIHadoopDataset(conf=write_conf)
 |      ...
 |      ...     # Create the conf for reading
 |      ...     read_conf = {"mapreduce.input.fileinputformat.inputdir": path}
 |      ...
 |      ...     loaded = sc.newAPIHadoopRDD(input_format_class,
 |      ...         key_class, value_class, conf=read_conf)
 |      ...     collected = sorted(loaded.collect())
 |
 |      >>> collected
 |      [(1, ''), (1, 'a'), (3, 'x')]
 |
 |  parallelize(self, c: Iterable[~T], numSlices: Optional[int] = None) -> pyspark.rdd.RDD[~T]
 |      Distribute a local Python collection to form an RDD. Using range
 |      is recommended if the input represents a range for performance.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      c : :class:`collections.abc.Iterable`
 |          iterable collection to distribute
 |      numSlices : int, optional
 |          the number of partitions of the new RDD
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          RDD representing distributed collection.
 |
 |      Examples
 |      --------
 |      >>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
 |      [[0], [2], [3], [4], [6]]
 |      >>> sc.parallelize(range(0, 6, 2), 5).glom().collect()
 |      [[], [0], [], [2], [4]]
 |
 |      Deal with a list of strings.
 |
 |      >>> strings = ["a", "b", "c"]
 |      >>> sc.parallelize(strings, 2).glom().collect()
 |      [['a'], ['b', 'c']]
 |
 |  pickleFile(self, name: str, minPartitions: Optional[int] = None) -> pyspark.rdd.RDD[typing.Any]
 |      Load an RDD previously saved using :meth:`RDD.saveAsPickleFile` method.
 |
 |      .. versionadded:: 1.1.0
 |
 |      Parameters
 |      ----------
 |      name : str
 |          directory to the input data files, the path can be comma separated
 |          paths as a list of inputs
 |      minPartitions : int, optional
 |          suggested minimum number of partitions for the resulting RDD
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          RDD representing unpickled data from the file(s).
 |
 |      See Also
 |      --------
 |      :meth:`RDD.saveAsPickleFile`
 |
 |      Examples
 |      --------
 |      >>> import os
 |      >>> import tempfile
 |      >>> with tempfile.TemporaryDirectory() as d:
 |      ...     # Write a temporary pickled file
 |      ...     path1 = os.path.join(d, "pickled1")
 |      ...     sc.parallelize(range(10)).saveAsPickleFile(path1, 3)
 |      ...
 |      ...     # Write another temporary pickled file
 |      ...     path2 = os.path.join(d, "pickled2")
 |      ...     sc.parallelize(range(-10, -5)).saveAsPickleFile(path2, 3)
 |      ...
 |      ...     # Load picked file
 |      ...     collected1 = sorted(sc.pickleFile(path1, 3).collect())
 |      ...     collected2 = sorted(sc.pickleFile(path2, 4).collect())
 |      ...
 |      ...     # Load two picked files together
 |      ...     collected3 = sorted(sc.pickleFile('{},{}'.format(path1, path2), 5).collect())
 |
 |      >>> collected1
 |      [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
 |      >>> collected2
 |      [-10, -9, -8, -7, -6]
 |      >>> collected3
 |      [-10, -9, -8, -7, -6, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
 |
 |  range(self, start: int, end: Optional[int] = None, step: int = 1, numSlices: Optional[int] = None) -> pyspark.rdd.RDD[int]
 |      Create a new RDD of int containing elements from `start` to `end`
 |      (exclusive), increased by `step` every element. Can be called the same
 |      way as python's built-in range() function. If called with a single argument,
 |      the argument is interpreted as `end`, and `start` is set to 0.
 |
 |      .. versionadded:: 1.5.0
 |
 |      Parameters
 |      ----------
 |      start : int
 |          the start value
 |      end : int, optional
 |          the end value (exclusive)
 |      step : int, optional, default 1
 |          the incremental step
 |      numSlices : int, optional
 |          the number of partitions of the new RDD
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          An RDD of int
 |
 |      See Also
 |      --------
 |      :meth:`pyspark.sql.SparkSession.range`
 |
 |      Examples
 |      --------
 |      >>> sc.range(5).collect()
 |      [0, 1, 2, 3, 4]
 |      >>> sc.range(2, 4).collect()
 |      [2, 3]
 |      >>> sc.range(1, 7, 2).collect()
 |      [1, 3, 5]
 |
 |      Generate RDD with a negative step
 |
 |      >>> sc.range(5, 0, -1).collect()
 |      [5, 4, 3, 2, 1]
 |      >>> sc.range(0, 5, -1).collect()
 |      []
 |
 |      Control the number of partitions
 |
 |      >>> sc.range(5, numSlices=1).getNumPartitions()
 |      1
 |      >>> sc.range(5, numSlices=10).getNumPartitions()
 |      10
 |
 |  removeJobTag(self, tag: str) -> None
 |      Remove a tag previously added to be assigned to all the jobs started by this thread.
 |      Noop if such a tag was not added earlier.
 |
 |      .. versionadded:: 3.5.0
 |
 |      Parameters
 |      ----------
 |      tag : str
 |          The tag to be removed. Cannot contain ',' (comma) character.
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.addJobTag`
 |      :meth:`SparkContext.getJobTags`
 |      :meth:`SparkContext.clearJobTags`
 |      :meth:`SparkContext.cancelJobsWithTag`
 |      :meth:`SparkContext.setInterruptOnCancel`
 |
 |      Examples
 |      --------
 |      >>> sc.addJobTag("job_to_cancel1")
 |      >>> sc.addJobTag("job_to_cancel2")
 |      >>> sc.getJobTags()
 |      {'job_to_cancel1', 'job_to_cancel2'}
 |      >>> sc.removeJobTag("job_to_cancel1")
 |      >>> sc.getJobTags()
 |      {'job_to_cancel2'}
 |      >>> sc.clearJobTags()
 |
 |  runJob(self, rdd: pyspark.rdd.RDD[~T], partitionFunc: Callable[[Iterable[~T]], Iterable[~U]], partitions: Optional[Sequence[int]] = None, allowLocal: bool = False) -> List[~U]
 |      Executes the given partitionFunc on the specified set of partitions,
 |      returning the result as an array of elements.
 |
 |      If 'partitions' is not specified, this will run over all partitions.
 |
 |      .. versionadded:: 1.1.0
 |
 |      Parameters
 |      ----------
 |      rdd : :class:`RDD`
 |          target RDD to run tasks on
 |      partitionFunc : function
 |          a function to run on each partition of the RDD
 |      partitions : list, optional
 |          set of partitions to run on; some jobs may not want to compute on all
 |          partitions of the target RDD, e.g. for operations like `first`
 |      allowLocal : bool, default False
 |          this parameter takes no effect
 |
 |      Returns
 |      -------
 |      list
 |          results of specified partitions
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.cancelAllJobs`
 |
 |      Examples
 |      --------
 |      >>> myRDD = sc.parallelize(range(6), 3)
 |      >>> sc.runJob(myRDD, lambda part: [x * x for x in part])
 |      [0, 1, 4, 9, 16, 25]
 |
 |      >>> myRDD = sc.parallelize(range(6), 3)
 |      >>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)
 |      [0, 1, 16, 25]
 |
 |  sequenceFile(self, path: str, keyClass: Optional[str] = None, valueClass: Optional[str] = None, keyConverter: Optional[str] = None, valueConverter: Optional[str] = None, minSplits: Optional[int] = None, batchSize: int = 0) -> pyspark.rdd.RDD[typing.Tuple[~T, ~U]]
 |      Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
 |      a local file system (available on all nodes), or any Hadoop-supported file system URI.
 |      The mechanism is as follows:
 |
 |          1. A Java RDD is created from the SequenceFile or other InputFormat, and the key
 |             and value Writable classes
 |          2. Serialization is attempted via Pickle pickling
 |          3. If this fails, the fallback is to call 'toString' on each key and value
 |          4. :class:`CPickleSerializer` is used to deserialize pickled objects on the Python side
 |
 |      .. versionadded:: 1.3.0
 |
 |      Parameters
 |      ----------
 |      path : str
 |          path to sequencefile
 |      keyClass: str, optional
 |          fully qualified classname of key Writable class (e.g. "org.apache.hadoop.io.Text")
 |      valueClass : str, optional
 |          fully qualified classname of value Writable class
 |          (e.g. "org.apache.hadoop.io.LongWritable")
 |      keyConverter : str, optional
 |          fully qualified name of a function returning key WritableConverter
 |      valueConverter : str, optional
 |          fully qualifiedname of a function returning value WritableConverter
 |      minSplits : int, optional
 |          minimum splits in dataset (default min(2, sc.defaultParallelism))
 |      batchSize : int, optional, default 0
 |          The number of Python objects represented as a single
 |          Java object. (default 0, choose batchSize automatically)
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          RDD of tuples of key and corresponding value
 |
 |      See Also
 |      --------
 |      :meth:`RDD.saveAsSequenceFile`
 |      :meth:`RDD.saveAsNewAPIHadoopFile`
 |      :meth:`RDD.saveAsHadoopFile`
 |      :meth:`SparkContext.newAPIHadoopFile`
 |      :meth:`SparkContext.hadoopFile`
 |
 |      Examples
 |      --------
 |      >>> import os
 |      >>> import tempfile
 |
 |      Set the class of output format
 |
 |      >>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"
 |
 |      >>> with tempfile.TemporaryDirectory() as d:
 |      ...     path = os.path.join(d, "hadoop_file")
 |      ...
 |      ...     # Write a temporary Hadoop file
 |      ...     rdd = sc.parallelize([(1, {3.0: "bb"}), (2, {1.0: "aa"}), (3, {2.0: "dd"})])
 |      ...     rdd.saveAsNewAPIHadoopFile(path, output_format_class)
 |      ...
 |      ...     collected = sorted(sc.sequenceFile(path).collect())
 |
 |      >>> collected
 |      [(1, {3.0: 'bb'}), (2, {1.0: 'aa'}), (3, {2.0: 'dd'})]
 |
 |  setCheckpointDir(self, dirName: str) -> None
 |      Set the directory under which RDDs are going to be checkpointed. The
 |      directory must be an HDFS path if running on a cluster.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      dirName : str
 |          path to the directory where checkpoint files will be stored
 |          (must be HDFS path if running in cluster)
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.getCheckpointDir`
 |      :meth:`RDD.checkpoint`
 |      :meth:`RDD.getCheckpointFile`
 |
 |  setInterruptOnCancel(self, interruptOnCancel: bool) -> None
 |      Set the behavior of job cancellation from jobs started in this thread.
 |
 |      .. versionadded:: 3.5.0
 |
 |      Parameters
 |      ----------
 |      interruptOnCancel : bool
 |          If true, then job cancellation will result in ``Thread.interrupt()``
 |          being called on the job's executor threads. This is useful to help ensure that
 |          the tasks are actually stopped in a timely manner, but is off by default due to
 |          HDFS-1208, where HDFS may respond to ``Thread.interrupt()`` by marking nodes as dead.
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.addJobTag`
 |      :meth:`SparkContext.removeJobTag`
 |      :meth:`SparkContext.cancelAllJobs`
 |      :meth:`SparkContext.cancelJobGroup`
 |      :meth:`SparkContext.cancelJobsWithTag`
 |
 |  setJobDescription(self, value: str) -> None
 |      Set a human readable description of the current job.
 |
 |      .. versionadded:: 2.3.0
 |
 |      Parameters
 |      ----------
 |      value : str
 |          The job description to set.
 |
 |      Notes
 |      -----
 |      If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
 |      local inheritance.
 |
 |  setJobGroup(self, groupId: str, description: str, interruptOnCancel: bool = False) -> None
 |      Assigns a group ID to all the jobs started by this thread until the group ID is set to a
 |      different value or cleared.
 |
 |      Often, a unit of execution in an application consists of multiple Spark actions or jobs.
 |      Application programmers can use this method to group all those jobs together and give a
 |      group description. Once set, the Spark web UI will associate such jobs with this group.
 |
 |      The application can use :meth:`SparkContext.cancelJobGroup` to cancel all
 |      running jobs in this group.
 |
 |      .. versionadded:: 1.0.0
 |
 |      Parameters
 |      ----------
 |      groupId : str
 |          The group ID to assign.
 |      description : str
 |          The description to set for the job group.
 |      interruptOnCancel : bool, optional, default False
 |          whether to interrupt jobs on job cancellation.
 |
 |      Notes
 |      -----
 |      If interruptOnCancel is set to true for the job group, then job cancellation will result
 |      in Thread.interrupt() being called on the job's executor threads. This is useful to help
 |      ensure that the tasks are actually stopped in a timely manner, but is off by default due
 |      to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.
 |
 |      If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
 |      local inheritance.
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.cancelJobGroup`
 |
 |      Examples
 |      --------
 |      >>> import threading
 |      >>> from time import sleep
 |      >>> from pyspark import InheritableThread
 |      >>> result = "Not Set"
 |      >>> lock = threading.Lock()
 |      >>> def map_func(x):
 |      ...     sleep(100)
 |      ...     raise RuntimeError("Task should have been cancelled")
 |      ...
 |      >>> def start_job(x):
 |      ...     global result
 |      ...     try:
 |      ...         sc.setJobGroup("job_to_cancel", "some description")
 |      ...         result = sc.parallelize(range(x)).map(map_func).collect()
 |      ...     except Exception as e:
 |      ...         result = "Cancelled"
 |      ...     lock.release()
 |      ...
 |      >>> def stop_job():
 |      ...     sleep(5)
 |      ...     sc.cancelJobGroup("job_to_cancel")
 |      ...
 |      >>> suppress = lock.acquire()
 |      >>> suppress = InheritableThread(target=start_job, args=(10,)).start()
 |      >>> suppress = InheritableThread(target=stop_job).start()
 |      >>> suppress = lock.acquire()
 |      >>> print(result)
 |      Cancelled
 |
 |  setLocalProperty(self, key: str, value: str) -> None
 |      Set a local property that affects jobs submitted from this thread, such as the
 |      Spark fair scheduler pool.
 |
 |      .. versionadded:: 1.0.0
 |
 |      Parameters
 |      ----------
 |      key : str
 |          The key of the local property to set.
 |      value : str
 |          The value of the local property to set.
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.getLocalProperty`
 |
 |      Notes
 |      -----
 |      If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
 |      local inheritance.
 |
 |  setLogLevel(self, logLevel: str) -> None
 |      Control our logLevel. This overrides any user-defined log settings.
 |      Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
 |
 |      .. versionadded:: 1.4.0
 |
 |      Parameters
 |      ----------
 |      logLevel : str
 |          The desired log level as a string.
 |
 |      Examples
 |      --------
 |      >>> sc.setLogLevel("WARN")  # doctest :+SKIP
 |
 |  show_profiles(self) -> None
 |      Print the profile stats to stdout
 |
 |      .. versionadded:: 1.2.0
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.dump_profiles`
 |
 |  sparkUser(self) -> str
 |      Get SPARK_USER for user who is running SparkContext.
 |
 |      .. versionadded:: 1.0.0
 |
 |  statusTracker(self) -> pyspark.status.StatusTracker
 |      Return :class:`StatusTracker` object
 |
 |      .. versionadded:: 1.4.0
 |
 |  stop(self) -> None
 |      Shut down the :class:`SparkContext`.
 |
 |      .. versionadded:: 0.7.0
 |
 |  textFile(self, name: str, minPartitions: Optional[int] = None, use_unicode: bool = True) -> pyspark.rdd.RDD[str]
 |      Read a text file from HDFS, a local file system (available on all
 |      nodes), or any Hadoop-supported file system URI, and return it as an
 |      RDD of Strings. The text files must be encoded as UTF-8.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      name : str
 |          directory to the input data files, the path can be comma separated
 |          paths as a list of inputs
 |      minPartitions : int, optional
 |          suggested minimum number of partitions for the resulting RDD
 |      use_unicode : bool, default True
 |          If `use_unicode` is False, the strings will be kept as `str` (encoding
 |          as `utf-8`), which is faster and smaller than unicode.
 |
 |          .. versionadded:: 1.2.0
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          RDD representing text data from the file(s).
 |
 |      See Also
 |      --------
 |      :meth:`RDD.saveAsTextFile`
 |      :meth:`SparkContext.wholeTextFiles`
 |
 |      Examples
 |      --------
 |      >>> import os
 |      >>> import tempfile
 |      >>> with tempfile.TemporaryDirectory() as d:
 |      ...     path1 = os.path.join(d, "text1")
 |      ...     path2 = os.path.join(d, "text2")
 |      ...
 |      ...     # Write a temporary text file
 |      ...     sc.parallelize(["x", "y", "z"]).saveAsTextFile(path1)
 |      ...
 |      ...     # Write another temporary text file
 |      ...     sc.parallelize(["aa", "bb", "cc"]).saveAsTextFile(path2)
 |      ...
 |      ...     # Load text file
 |      ...     collected1 = sorted(sc.textFile(path1, 3).collect())
 |      ...     collected2 = sorted(sc.textFile(path2, 4).collect())
 |      ...
 |      ...     # Load two text files together
 |      ...     collected3 = sorted(sc.textFile('{},{}'.format(path1, path2), 5).collect())
 |
 |      >>> collected1
 |      ['x', 'y', 'z']
 |      >>> collected2
 |      ['aa', 'bb', 'cc']
 |      >>> collected3
 |      ['aa', 'bb', 'cc', 'x', 'y', 'z']
 |
 |  union(self, rdds: List[pyspark.rdd.RDD[~T]]) -> pyspark.rdd.RDD[~T]
 |      Build the union of a list of RDDs.
 |
 |      This supports unions() of RDDs with different serialized formats,
 |      although this forces them to be reserialized using the default
 |      serializer:
 |
 |      .. versionadded:: 0.7.0
 |
 |      See Also
 |      --------
 |      :meth:`RDD.union`
 |
 |      Examples
 |      --------
 |      >>> import os
 |      >>> import tempfile
 |      >>> with tempfile.TemporaryDirectory() as d:
 |      ...     # generate a text RDD
 |      ...     with open(os.path.join(d, "union-text.txt"), "w") as f:
 |      ...         _ = f.write("Hello")
 |      ...     text_rdd = sc.textFile(d)
 |      ...
 |      ...     # generate another RDD
 |      ...     parallelized = sc.parallelize(["World!"])
 |      ...
 |      ...     unioned = sorted(sc.union([text_rdd, parallelized]).collect())
 |
 |      >>> unioned
 |      ['Hello', 'World!']
 |
 |  wholeTextFiles(self, path: str, minPartitions: Optional[int] = None, use_unicode: bool = True) -> pyspark.rdd.RDD[typing.Tuple[str, str]]
 |      Read a directory of text files from HDFS, a local file system
 |      (available on all nodes), or any  Hadoop-supported file system
 |      URI. Each file is read as a single record and returned in a
 |      key-value pair, where the key is the path of each file, the
 |      value is the content of each file.
 |      The text files must be encoded as UTF-8.
 |
 |      .. versionadded:: 1.0.0
 |
 |      For example, if you have the following files:
 |
 |      .. code-block:: text
 |
 |          hdfs://a-hdfs-path/part-00000
 |          hdfs://a-hdfs-path/part-00001
 |          ...
 |          hdfs://a-hdfs-path/part-nnnnn
 |
 |      Do ``rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")``,
 |      then ``rdd`` contains:
 |
 |      .. code-block:: text
 |
 |          (a-hdfs-path/part-00000, its content)
 |          (a-hdfs-path/part-00001, its content)
 |          ...
 |          (a-hdfs-path/part-nnnnn, its content)
 |
 |      Parameters
 |      ----------
 |      path : str
 |          directory to the input data files, the path can be comma separated
 |          paths as a list of inputs
 |      minPartitions : int, optional
 |          suggested minimum number of partitions for the resulting RDD
 |      use_unicode : bool, default True
 |          If `use_unicode` is False, the strings will be kept as `str` (encoding
 |          as `utf-8`), which is faster and smaller than unicode.
 |
 |          .. versionadded:: 1.2.0
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          RDD representing path-content pairs from the file(s).
 |
 |      Notes
 |      -----
 |      Small files are preferred, as each file will be loaded fully in memory.
 |
 |      See Also
 |      --------
 |      :meth:`RDD.saveAsTextFile`
 |      :meth:`SparkContext.textFile`
 |
 |      Examples
 |      --------
 |      >>> import os
 |      >>> import tempfile
 |      >>> with tempfile.TemporaryDirectory() as d:
 |      ...     # Write a temporary text file
 |      ...     with open(os.path.join(d, "1.txt"), "w") as f:
 |      ...         _ = f.write("123")
 |      ...
 |      ...     # Write another temporary text file
 |      ...     with open(os.path.join(d, "2.txt"), "w") as f:
 |      ...         _ = f.write("xyz")
 |      ...
 |      ...     collected = sorted(sc.wholeTextFiles(d).collect())
 |      >>> collected
 |      [('.../1.txt', '123'), ('.../2.txt', 'xyz')]
 |
 |  ----------------------------------------------------------------------
 |  Class methods defined here:
 |
 |  getOrCreate(conf: Optional[pyspark.conf.SparkConf] = None) -> 'SparkContext'
 |      Get or instantiate a :class:`SparkContext` and register it as a singleton object.
 |
 |      .. versionadded:: 1.4.0
 |
 |      Parameters
 |      ----------
 |      conf : :class:`SparkConf`, optional
 |          :class:`SparkConf` that will be used for initialization of the :class:`SparkContext`.
 |
 |      Returns
 |      -------
 |      :class:`SparkContext`
 |          current :class:`SparkContext`, or a new one if it wasn't created before the function
 |          call.
 |
 |      Examples
 |      --------
 |      >>> SparkContext.getOrCreate()
 |      <SparkContext ...>
 |
 |  setSystemProperty(key: str, value: str) -> None
 |      Set a Java system property, such as `spark.executor.memory`. This must
 |      be invoked before instantiating :class:`SparkContext`.
 |
 |      .. versionadded:: 0.9.0
 |
 |      Parameters
 |      ----------
 |      key : str
 |          The key of a new Java system property.
 |      value : str
 |          The value of a new Java system property.
 |
 |  ----------------------------------------------------------------------
 |  Readonly properties defined here:
 |
 |  applicationId
 |      A unique identifier for the Spark application.
 |      Its format depends on the scheduler implementation.
 |
 |      * in case of local spark app something like 'local-1433865536131'
 |      * in case of YARN something like 'application_1433865536131_34483'
 |
 |      .. versionadded:: 1.5.0
 |
 |      Examples
 |      --------
 |      >>> sc.applicationId  # doctest: +ELLIPSIS
 |      'local-...'
 |
 |  defaultMinPartitions
 |      Default min number of partitions for Hadoop RDDs when not given by user
 |
 |      .. versionadded:: 1.1.0
 |
 |      Examples
 |      --------
 |      >>> sc.defaultMinPartitions > 0
 |      True
 |
 |  defaultParallelism
 |      Default level of parallelism to use when not given by user (e.g. for reduce tasks)
 |
 |      .. versionadded:: 0.7.0
 |
 |      Examples
 |      --------
 |      >>> sc.defaultParallelism > 0
 |      True
 |
 |  listArchives
 |      Returns a list of archive paths that are added to resources.
 |
 |      .. versionadded:: 3.4.0
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.addArchive`
 |
 |  listFiles
 |      Returns a list of file paths that are added to resources.
 |
 |      .. versionadded:: 3.4.0
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.addFile`
 |
 |  resources
 |      Return the resource information of this :class:`SparkContext`.
 |      A resource could be a GPU, FPGA, etc.
 |
 |      .. versionadded:: 3.0.0
 |
 |  startTime
 |      Return the epoch time when the :class:`SparkContext` was started.
 |
 |      .. versionadded:: 1.5.0
 |
 |      Examples
 |      --------
 |      >>> _ = sc.startTime
 |
 |  uiWebUrl
 |      Return the URL of the SparkUI instance started by this :class:`SparkContext`
 |
 |      .. versionadded:: 2.1.0
 |
 |      Notes
 |      -----
 |      When the web ui is disabled, e.g., by ``spark.ui.enabled`` set to ``False``,
 |      it returns ``None``.
 |
 |      Examples
 |      --------
 |      >>> sc.uiWebUrl
 |      'http://...'
 |
 |  version
 |      The version of Spark on which this application is running.
 |
 |      .. versionadded:: 1.1.0
 |
 |      Examples
 |      --------
 |      >>> _ = sc.version
 |
 |  ----------------------------------------------------------------------
 |  Data descriptors defined here:
 |
 |  __dict__
 |      dictionary for instance variables
 |
 |  __weakref__
 |      list of weak references to the object
 |
 |  ----------------------------------------------------------------------
 |  Data and other attributes defined here:
 |
 |  PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
 |
 |  __annotations__ = {'PACKAGE_EXTENSIONS': typing.Iterable[str], '_activ...
# Después de leer la ayuda hemos decidido que queremos usar sc.version para ver qué versión de Spark estamos ejecutando
sc.version
'3.5.6'
# Help can be used on any Python object
help(map)
Help on class map in module builtins:

class map(object)
 |  map(func, *iterables) --> map object
 |
 |  Make an iterator that computes the function using arguments from
 |  each of the iterables.  Stops when the shortest iterable is exhausted.
 |
 |  Methods defined here:
 |
 |  __getattribute__(self, name, /)
 |      Return getattr(self, name).
 |
 |  __iter__(self, /)
 |      Implement iter(self).
 |
 |  __next__(self, /)
 |      Implement next(self).
 |
 |  __reduce__(...)
 |      Return state information for pickling.
 |
 |  ----------------------------------------------------------------------
 |  Static methods defined here:
 |
 |  __new__(*args, **kwargs)
 |      Create and return a new object.  See help(type) for accurate signature.

Parte 3: Uso de RDDs y encadenamiento de transformaciones y acciones#

Trabajando con tu primer RDD#

En Spark, primero creamos un Conjunto de Datos Distribuidos Resilientes (RDD) base. Luego, podemos aplicar una o más transformaciones a ese RDD base. Un RDD es inmutable, por lo que una vez creado, no se puede cambiar. Como resultado, cada transformación crea un nuevo RDD. Finalmente, podemos aplicar una o más acciones a los RDDs. Nota que Spark utiliza la evaluación perezosa, por lo que las transformaciones no se ejecutan realmente hasta que ocurre una acción.#

Realizaremos varios ejercicios para obtener una mejor comprensión de los RDDs:#

  • Crear una colección de Python de 10,000 enteros
  • Crear un RDD base de Spark a partir de esa colección
  • Restar uno de cada valor usando map
  • Realizar la acción collect para ver los resultados
  • Realizar la acción count para ver los conteos
  • Aplicar la transformación filter y ver los resultados con collect
  • Aprender sobre funciones lambda
  • Explorar cómo funciona la evaluación perezosa y los desafíos de depuración que introduce

(3a) Crear una colección de enteros en Python en el rango de 1 a 10000#

Usaremos la función xrange() para crear una lista de enteros. xrange() solo genera valores a medida que se necesitan. Esto es diferente del comportamiento de range(), que genera la lista completa al ejecutarse. Debido a esto, xrange() es más eficiente en memoria que range(), especialmente para rangos grandes.#

data = range(1, 10001)
print(data)
range(1, 10001)
# Los datos son una lista normal de Python
# Obtener el primer elemento de los datos
data[9999]
10000
# Podemos comprobar el tamaño de la lista utilizando la función len()
len(data)
10000

(3b) Datos distribuidos y uso de una colección para crear un RDD#

En Spark, los conjuntos de datos se representan como una lista de entradas, donde la lista se divide en muchas particiones diferentes que se almacenan en diferentes máquinas. Cada partición contiene un subconjunto único de las entradas en la lista. Spark llama a los conjuntos de datos que almacena «Conjuntos de Datos Distribuidos Resilientes» (RDDs).#

Una de las características definitorias de Spark, en comparación con otros marcos de análisis de datos (por ejemplo, Hadoop), es que almacena datos en memoria en lugar de en disco. Esto permite que las aplicaciones de Spark se ejecuten mucho más rápido, ya que no se ven ralentizadas por la necesidad de leer datos del disco.#

La figura a continuación ilustra cómo Spark divide una lista de entradas de datos en particiones que se almacenan en memoria en un trabajador.#

partitions

Para crear el RDD, usamos sc.parallelize(), que le dice a Spark que cree un nuevo conjunto de datos de entrada basado en los datos que se pasan. En este ejemplo, proporcionaremos un xrange. El segundo argumento del método sc.parallelize() le dice a Spark en cuántas particiones dividir los datos cuando los almacena en memoria (hablaremos más sobre esto más adelante en este tutorial). Ten en cuenta que para un mejor rendimiento al usar parallelize, se recomienda xrange() si la entrada representa un rango. Esta es la razón por la cual usamos xrange() en 3a.#

Hay muchos tipos diferentes de RDDs. La clase base para los RDDs es pyspark.RDD y otros RDDs heredan de pyspark.RDD. Dado que los otros tipos de RDD heredan de pyspark.RDD, tienen las mismas API y son funcionalmente idénticos. Veremos que sc.parallelize() genera un pyspark.rdd.PipelinedRDD cuando su entrada es un xrange, y un pyspark.RDD cuando su entrada es un range.#

Después de generar RDDs, podemos verlos en la pestaña «Storage» de la interfaz web. Notarás que los nuevos conjuntos de datos no se enumeran hasta que Spark necesita devolver un resultado debido a la ejecución de una acción. Esta característica de Spark se llama «evaluación perezosa». Esto permite que Spark evite realizar cálculos innecesarios.#

# Paraleliza los datos usando 8 particiones
# Esta operación es una transformación de datos en un RDD
# Spark utiliza la evaluación perezosa, por lo que no se ejecutan trabajos Spark en este punto
xrangeRDD = sc.parallelize(data, 8)
# Veamos la ayuda sobre paralelizar
help(sc.parallelize)
Help on method parallelize in module pyspark.context:

parallelize(c: Iterable[~T], numSlices: Optional[int] = None) -> pyspark.rdd.RDD[~T] method of pyspark.context.SparkContext instance
    Distribute a local Python collection to form an RDD. Using range
    is recommended if the input represents a range for performance.

    .. versionadded:: 0.7.0

    Parameters
    ----------
    c : :class:`collections.abc.Iterable`
        iterable collection to distribute
    numSlices : int, optional
        the number of partitions of the new RDD

    Returns
    -------
    :class:`RDD`
        RDD representing distributed collection.

    Examples
    --------
    >>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
    [[0], [2], [3], [4], [6]]
    >>> sc.parallelize(range(0, 6, 2), 5).glom().collect()
    [[], [0], [], [2], [4]]

    Deal with a list of strings.

    >>> strings = ["a", "b", "c"]
    >>> sc.parallelize(strings, 2).glom().collect()
    [['a'], ['b', 'c']]
# Veamos qué tipo devuelve sc.parallelize()
print('type of xrangeRDD: {0}'.format(type(xrangeRDD)))

# ¿Y si usamos un rango
dataRange = range(1, 10001)
rangeRDD = sc.parallelize(dataRange, 8)
print('type of dataRangeRDD: {0}'.format(type(rangeRDD)))
type of xrangeRDD: <class 'pyspark.rdd.PipelinedRDD'>
type of dataRangeRDD: <class 'pyspark.rdd.PipelinedRDD'>
# Cada RDD recibe un ID único
print('xrangeRDD id: {0}'.format(xrangeRDD.id()))
print('rangeRDD id: {0}'.format(rangeRDD.id()))
xrangeRDD id: 2
rangeRDD id: 3
# Podemos nombrar cada RDD recién creado usando el método setName()
xrangeRDD.setName('My first RDD')
My first RDD PythonRDD[2] at RDD at PythonRDD.scala:53
# Veamos el linaje (el conjunto de transformaciones) del RDD usando toDebugString()
print(xrangeRDD.toDebugString())
b'(8) My first RDD PythonRDD[2] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289 []'
# Usemos help para ver que métodos podemos llamar en este RDD
help(xrangeRDD)
Help on PipelinedRDD in module pyspark.rdd object:

class PipelinedRDD(RDD, typing.Generic)
 |  PipelinedRDD(prev: pyspark.rdd.RDD[~T], func: Callable[[int, Iterable[~T]], Iterable[~U]], preservesPartitioning: bool = False, isFromBarrier: bool = False)
 |
 |  Examples
 |  --------
 |  Pipelined maps:
 |
 |  >>> rdd = sc.parallelize([1, 2, 3, 4])
 |  >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
 |  [4, 8, 12, 16]
 |  >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
 |  [4, 8, 12, 16]
 |
 |  Pipelined reduces:
 |
 |  >>> from operator import add
 |  >>> rdd.map(lambda x: 2 * x).reduce(add)
 |  20
 |  >>> rdd.flatMap(lambda x: [x, x]).reduce(add)
 |  20
 |
 |  Method resolution order:
 |      PipelinedRDD
 |      RDD
 |      typing.Generic
 |      builtins.object
 |
 |  Methods defined here:
 |
 |  __init__(self, prev: pyspark.rdd.RDD[~T], func: Callable[[int, Iterable[~T]], Iterable[~U]], preservesPartitioning: bool = False, isFromBarrier: bool = False)
 |      Initialize self.  See help(type(self)) for accurate signature.
 |
 |  getNumPartitions(self) -> int
 |      Returns the number of partitions in RDD
 |
 |      .. versionadded:: 1.1.0
 |
 |      Returns
 |      -------
 |      int
 |          number of partitions
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
 |      >>> rdd.getNumPartitions()
 |      2
 |
 |  id(self) -> int
 |      A unique ID for this RDD (within its SparkContext).
 |
 |      .. versionadded:: 0.7.0
 |
 |      Returns
 |      -------
 |      int
 |          The unique ID for this :class:`RDD`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.range(5)
 |      >>> rdd.id()  # doctest: +SKIP
 |      3
 |
 |  ----------------------------------------------------------------------
 |  Data and other attributes defined here:
 |
 |  __annotations__ = {}
 |
 |  __orig_bases__ = (pyspark.rdd.RDD[~U], typing.Generic[~T, ~U])
 |
 |  __parameters__ = (~T, ~U)
 |
 |  ----------------------------------------------------------------------
 |  Methods inherited from RDD:
 |
 |  __add__(self: 'RDD[T]', other: 'RDD[U]') -> 'RDD[Union[T, U]]'
 |      Return the union of this RDD and another one.
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([1, 1, 2, 3])
 |      >>> (rdd + rdd).collect()
 |      [1, 1, 2, 3, 1, 1, 2, 3]
 |
 |  __getnewargs__(self) -> NoReturn
 |
 |  __repr__(self) -> str
 |      Return repr(self).
 |
 |  aggregate(self: 'RDD[T]', zeroValue: ~U, seqOp: Callable[[~U, ~T], ~U], combOp: Callable[[~U, ~U], ~U]) -> ~U
 |      Aggregate the elements of each partition, and then the results for all
 |      the partitions, using a given combine functions and a neutral "zero
 |      value."
 |
 |      The functions ``op(t1, t2)`` is allowed to modify ``t1`` and return it
 |      as its result value to avoid object allocation; however, it should not
 |      modify ``t2``.
 |
 |      The first function (seqOp) can return a different result type, U, than
 |      the type of this RDD. Thus, we need one operation for merging a T into
 |      an U and one operation for merging two U
 |
 |      .. versionadded:: 1.1.0
 |
 |      Parameters
 |      ----------
 |      zeroValue : U
 |          the initial value for the accumulated result of each partition
 |      seqOp : function
 |          a function used to accumulate results within a partition
 |      combOp : function
 |          an associative function used to combine results from different partitions
 |
 |      Returns
 |      -------
 |      U
 |          the aggregated result
 |
 |      See Also
 |      --------
 |      :meth:`RDD.reduce`
 |      :meth:`RDD.fold`
 |
 |      Examples
 |      --------
 |      >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
 |      >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
 |      >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
 |      (10, 4)
 |      >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
 |      (0, 0)
 |
 |  aggregateByKey(self: 'RDD[Tuple[K, V]]', zeroValue: ~U, seqFunc: Callable[[~U, ~V], ~U], combFunc: Callable[[~U, ~U], ~U], numPartitions: Optional[int] = None, partitionFunc: Callable[[~K], int] = <function portable_hash at 0x7d0ad45511c0>) -> 'RDD[Tuple[K, U]]'
 |      Aggregate the values of each key, using given combine functions and a neutral
 |      "zero value". This function can return a different result type, U, than the type
 |      of the values in this RDD, V. Thus, we need one operation for merging a V into
 |      a U and one operation for merging two U's, The former operation is used for merging
 |      values within a partition, and the latter is used for merging values between
 |      partitions. To avoid memory allocation, both of these functions are
 |      allowed to modify and return their first argument instead of creating a new U.
 |
 |      .. versionadded:: 1.1.0
 |
 |      Parameters
 |      ----------
 |      zeroValue : U
 |          the initial value for the accumulated result of each partition
 |      seqFunc : function
 |          a function to merge a V into a U
 |      combFunc : function
 |          a function to combine two U's into a single one
 |      numPartitions : int, optional
 |          the number of partitions in new :class:`RDD`
 |      partitionFunc : function, optional, default `portable_hash`
 |          function to compute the partition index
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` containing the keys and the aggregated result for each key
 |
 |      See Also
 |      --------
 |      :meth:`RDD.reduceByKey`
 |      :meth:`RDD.combineByKey`
 |      :meth:`RDD.foldByKey`
 |      :meth:`RDD.groupByKey`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
 |      >>> seqFunc = (lambda x, y: (x[0] + y, x[1] + 1))
 |      >>> combFunc = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
 |      >>> sorted(rdd.aggregateByKey((0, 0), seqFunc, combFunc).collect())
 |      [('a', (3, 2)), ('b', (1, 1))]
 |
 |  barrier(self: 'RDD[T]') -> 'RDDBarrier[T]'
 |      Marks the current stage as a barrier stage, where Spark must launch all tasks together.
 |      In case of a task failure, instead of only restarting the failed task, Spark will abort the
 |      entire stage and relaunch all tasks for this stage.
 |      The barrier execution mode feature is experimental and it only handles limited scenarios.
 |      Please read the linked SPIP and design docs to understand the limitations and future plans.
 |
 |      .. versionadded:: 2.4.0
 |
 |      Returns
 |      -------
 |      :class:`RDDBarrier`
 |          instance that provides actions within a barrier stage.
 |
 |      See Also
 |      --------
 |      :class:`pyspark.BarrierTaskContext`
 |
 |      Notes
 |      -----
 |      For additional information see
 |
 |      - `SPIP: Barrier Execution Mode <https://issues.apache.org/jira/browse/SPARK-24374>`_
 |      - `Design Doc <https://issues.apache.org/jira/browse/SPARK-24582>`_
 |
 |      This API is experimental
 |
 |  cache(self: 'RDD[T]') -> 'RDD[T]'
 |      Persist this RDD with the default storage level (`MEMORY_ONLY`).
 |
 |      .. versionadded:: 0.7.0
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          The same :class:`RDD` with storage level set to `MEMORY_ONLY`
 |
 |      See Also
 |      --------
 |      :meth:`RDD.persist`
 |      :meth:`RDD.unpersist`
 |      :meth:`RDD.getStorageLevel`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.range(5)
 |      >>> rdd2 = rdd.cache()
 |      >>> rdd2 is rdd
 |      True
 |      >>> str(rdd.getStorageLevel())
 |      'Memory Serialized 1x Replicated'
 |      >>> _ = rdd.unpersist()
 |
 |  cartesian(self: 'RDD[T]', other: 'RDD[U]') -> 'RDD[Tuple[T, U]]'
 |      Return the Cartesian product of this RDD and another one, that is, the
 |      RDD of all pairs of elements ``(a, b)`` where ``a`` is in `self` and
 |      ``b`` is in `other`.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      other : :class:`RDD`
 |          another :class:`RDD`
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          the Cartesian product of this :class:`RDD` and another one
 |
 |      See Also
 |      --------
 |      :meth:`pyspark.sql.DataFrame.crossJoin`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([1, 2])
 |      >>> sorted(rdd.cartesian(rdd).collect())
 |      [(1, 1), (1, 2), (2, 1), (2, 2)]
 |
 |  checkpoint(self) -> None
 |      Mark this RDD for checkpointing. It will be saved to a file inside the
 |      checkpoint directory set with :meth:`SparkContext.setCheckpointDir` and
 |      all references to its parent RDDs will be removed. This function must
 |      be called before any job has been executed on this RDD. It is strongly
 |      recommended that this RDD is persisted in memory, otherwise saving it
 |      on a file will require recomputation.
 |
 |      .. versionadded:: 0.7.0
 |
 |      See Also
 |      --------
 |      :meth:`RDD.isCheckpointed`
 |      :meth:`RDD.getCheckpointFile`
 |      :meth:`RDD.localCheckpoint`
 |      :meth:`SparkContext.setCheckpointDir`
 |      :meth:`SparkContext.getCheckpointDir`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.range(5)
 |      >>> rdd.is_checkpointed
 |      False
 |      >>> rdd.getCheckpointFile() == None
 |      True
 |
 |      >>> rdd.checkpoint()
 |      >>> rdd.is_checkpointed
 |      True
 |      >>> rdd.getCheckpointFile() == None
 |      True
 |
 |      >>> rdd.count()
 |      5
 |      >>> rdd.is_checkpointed
 |      True
 |      >>> rdd.getCheckpointFile() == None
 |      False
 |
 |  cleanShuffleDependencies(self, blocking: bool = False) -> None
 |      Removes an RDD's shuffles and it's non-persisted ancestors.
 |
 |      When running without a shuffle service, cleaning up shuffle files enables downscaling.
 |      If you use the RDD after this call, you should checkpoint and materialize it first.
 |
 |      .. versionadded:: 3.3.0
 |
 |      Parameters
 |      ----------
 |      blocking : bool, optional, default False
 |         whether to block on shuffle cleanup tasks
 |
 |      Notes
 |      -----
 |      This API is a developer API.
 |
 |  coalesce(self: 'RDD[T]', numPartitions: int, shuffle: bool = False) -> 'RDD[T]'
 |      Return a new RDD that is reduced into `numPartitions` partitions.
 |
 |      .. versionadded:: 1.0.0
 |
 |      Parameters
 |      ----------
 |      numPartitions : int, optional
 |          the number of partitions in new :class:`RDD`
 |      shuffle : bool, optional, default False
 |          whether to add a shuffle step
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` that is reduced into `numPartitions` partitions
 |
 |      See Also
 |      --------
 |      :meth:`RDD.repartition`
 |
 |      Examples
 |      --------
 |      >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
 |      [[1], [2, 3], [4, 5]]
 |      >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
 |      [[1, 2, 3, 4, 5]]
 |
 |  cogroup(self: 'RDD[Tuple[K, V]]', other: 'RDD[Tuple[K, U]]', numPartitions: Optional[int] = None) -> 'RDD[Tuple[K, Tuple[ResultIterable[V], ResultIterable[U]]]]'
 |      For each key k in `self` or `other`, return a resulting RDD that
 |      contains a tuple with the list of values for that key in `self` as
 |      well as `other`.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      other : :class:`RDD`
 |          another :class:`RDD`
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` containing the keys and cogrouped values
 |
 |      See Also
 |      --------
 |      :meth:`RDD.groupWith`
 |      :meth:`RDD.join`
 |
 |      Examples
 |      --------
 |      >>> rdd1 = sc.parallelize([("a", 1), ("b", 4)])
 |      >>> rdd2 = sc.parallelize([("a", 2)])
 |      >>> [(x, tuple(map(list, y))) for x, y in sorted(list(rdd1.cogroup(rdd2).collect()))]
 |      [('a', ([1], [2])), ('b', ([4], []))]
 |
 |  collect(self: 'RDD[T]') -> List[~T]
 |      Return a list that contains all the elements in this RDD.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Returns
 |      -------
 |      list
 |          a list containing all the elements
 |
 |      Notes
 |      -----
 |      This method should only be used if the resulting array is expected
 |      to be small, as all the data is loaded into the driver's memory.
 |
 |      See Also
 |      --------
 |      :meth:`RDD.toLocalIterator`
 |      :meth:`pyspark.sql.DataFrame.collect`
 |
 |      Examples
 |      --------
 |      >>> sc.range(5).collect()
 |      [0, 1, 2, 3, 4]
 |      >>> sc.parallelize(["x", "y", "z"]).collect()
 |      ['x', 'y', 'z']
 |
 |  collectAsMap(self: 'RDD[Tuple[K, V]]') -> Dict[~K, ~V]
 |      Return the key-value pairs in this RDD to the master as a dictionary.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Returns
 |      -------
 |      :class:`dict`
 |          a dictionary of (key, value) pairs
 |
 |      See Also
 |      --------
 |      :meth:`RDD.countByValue`
 |
 |      Notes
 |      -----
 |      This method should only be used if the resulting data is expected
 |      to be small, as all the data is loaded into the driver's memory.
 |
 |      Examples
 |      --------
 |      >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
 |      >>> m[1]
 |      2
 |      >>> m[3]
 |      4
 |
 |  collectWithJobGroup(self: 'RDD[T]', groupId: str, description: str, interruptOnCancel: bool = False) -> 'List[T]'
 |      When collect rdd, use this method to specify job group.
 |
 |      .. versionadded:: 3.0.0
 |
 |      .. deprecated:: 3.1.0
 |          Use :class:`pyspark.InheritableThread` with the pinned thread mode enabled.
 |
 |      Parameters
 |      ----------
 |      groupId : str
 |          The group ID to assign.
 |      description : str
 |          The description to set for the job group.
 |      interruptOnCancel : bool, optional, default False
 |          whether to interrupt jobs on job cancellation.
 |
 |      Returns
 |      -------
 |      list
 |          a list containing all the elements
 |
 |      See Also
 |      --------
 |      :meth:`RDD.collect`
 |      :meth:`SparkContext.setJobGroup`
 |
 |  combineByKey(self: 'RDD[Tuple[K, V]]', createCombiner: Callable[[~V], ~U], mergeValue: Callable[[~U, ~V], ~U], mergeCombiners: Callable[[~U, ~U], ~U], numPartitions: Optional[int] = None, partitionFunc: Callable[[~K], int] = <function portable_hash at 0x7d0ad45511c0>) -> 'RDD[Tuple[K, U]]'
 |      Generic function to combine the elements for each key using a custom
 |      set of aggregation functions.
 |
 |      Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
 |      type" C.
 |
 |      To avoid memory allocation, both mergeValue and mergeCombiners are allowed to
 |      modify and return their first argument instead of creating a new C.
 |
 |      In addition, users can control the partitioning of the output RDD.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      createCombiner : function
 |          a function to turns a V into a C
 |      mergeValue : function
 |          a function to merge a V into a C
 |      mergeCombiners : function
 |          a function to combine two C's into a single one
 |      numPartitions : int, optional
 |          the number of partitions in new :class:`RDD`
 |      partitionFunc : function, optional, default `portable_hash`
 |          function to compute the partition index
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` containing the keys and the aggregated result for each key
 |
 |      See Also
 |      --------
 |      :meth:`RDD.reduceByKey`
 |      :meth:`RDD.aggregateByKey`
 |      :meth:`RDD.foldByKey`
 |      :meth:`RDD.groupByKey`
 |
 |      Notes
 |      -----
 |      V and C can be different -- for example, one might group an RDD of type
 |          (Int, Int) into an RDD of type (Int, List[Int]).
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
 |      >>> def to_list(a):
 |      ...     return [a]
 |      ...
 |      >>> def append(a, b):
 |      ...     a.append(b)
 |      ...     return a
 |      ...
 |      >>> def extend(a, b):
 |      ...     a.extend(b)
 |      ...     return a
 |      ...
 |      >>> sorted(rdd.combineByKey(to_list, append, extend).collect())
 |      [('a', [1, 2]), ('b', [1])]
 |
 |  count(self) -> int
 |      Return the number of elements in this RDD.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Returns
 |      -------
 |      int
 |          the number of elements
 |
 |      See Also
 |      --------
 |      :meth:`RDD.countApprox`
 |      :meth:`pyspark.sql.DataFrame.count`
 |
 |      Examples
 |      --------
 |      >>> sc.parallelize([2, 3, 4]).count()
 |      3
 |
 |  countApprox(self, timeout: int, confidence: float = 0.95) -> int
 |      Approximate version of count() that returns a potentially incomplete
 |      result within a timeout, even if not all tasks have finished.
 |
 |      .. versionadded:: 1.2.0
 |
 |      Parameters
 |      ----------
 |      timeout : int
 |          maximum time to wait for the job, in milliseconds
 |      confidence : float
 |          the desired statistical confidence in the result
 |
 |      Returns
 |      -------
 |      int
 |          a potentially incomplete result, with error bounds
 |
 |      See Also
 |      --------
 |      :meth:`RDD.count`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize(range(1000), 10)
 |      >>> rdd.countApprox(1000, 1.0)
 |      1000
 |
 |  countApproxDistinct(self: 'RDD[T]', relativeSD: float = 0.05) -> int
 |      Return approximate number of distinct elements in the RDD.
 |
 |      .. versionadded:: 1.2.0
 |
 |      Parameters
 |      ----------
 |      relativeSD : float, optional
 |          Relative accuracy. Smaller values create
 |          counters that require more space.
 |          It must be greater than 0.000017.
 |
 |      Returns
 |      -------
 |      int
 |          approximate number of distinct elements
 |
 |      See Also
 |      --------
 |      :meth:`RDD.distinct`
 |
 |      Notes
 |      -----
 |      The algorithm used is based on streamlib's implementation of
 |      `"HyperLogLog in Practice: Algorithmic Engineering of a State
 |      of The Art Cardinality Estimation Algorithm", available here
 |      <https://doi.org/10.1145/2452376.2452456>`_.
 |
 |      Examples
 |      --------
 |      >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
 |      >>> 900 < n < 1100
 |      True
 |      >>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct()
 |      >>> 16 < n < 24
 |      True
 |
 |  countByKey(self: 'RDD[Tuple[K, V]]') -> Dict[~K, int]
 |      Count the number of elements for each key, and return the result to the
 |      master as a dictionary.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Returns
 |      -------
 |      dict
 |          a dictionary of (key, count) pairs
 |
 |      See Also
 |      --------
 |      :meth:`RDD.collectAsMap`
 |      :meth:`RDD.countByValue`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
 |      >>> sorted(rdd.countByKey().items())
 |      [('a', 2), ('b', 1)]
 |
 |  countByValue(self: 'RDD[K]') -> Dict[~K, int]
 |      Return the count of each unique value in this RDD as a dictionary of
 |      (value, count) pairs.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Returns
 |      -------
 |      dict
 |          a dictionary of (value, count) pairs
 |
 |      See Also
 |      --------
 |      :meth:`RDD.collectAsMap`
 |      :meth:`RDD.countByKey`
 |
 |      Examples
 |      --------
 |      >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
 |      [(1, 2), (2, 3)]
 |
 |  distinct(self: 'RDD[T]', numPartitions: Optional[int] = None) -> 'RDD[T]'
 |      Return a new RDD containing the distinct elements in this RDD.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      numPartitions : int, optional
 |          the number of partitions in new :class:`RDD`
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a new :class:`RDD` containing the distinct elements
 |
 |      See Also
 |      --------
 |      :meth:`RDD.countApproxDistinct`
 |
 |      Examples
 |      --------
 |      >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
 |      [1, 2, 3]
 |
 |  filter(self: 'RDD[T]', f: Callable[[~T], bool]) -> 'RDD[T]'
 |      Return a new RDD containing only the elements that satisfy a predicate.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      f : function
 |          a function to run on each element of the RDD
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a new :class:`RDD` by applying a function to each element
 |
 |      See Also
 |      --------
 |      :meth:`RDD.map`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
 |      >>> rdd.filter(lambda x: x % 2 == 0).collect()
 |      [2, 4]
 |
 |  first(self: 'RDD[T]') -> ~T
 |      Return the first element in this RDD.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Returns
 |      -------
 |      T
 |          the first element
 |
 |      See Also
 |      --------
 |      :meth:`RDD.take`
 |      :meth:`pyspark.sql.DataFrame.first`
 |      :meth:`pyspark.sql.DataFrame.head`
 |
 |      Examples
 |      --------
 |      >>> sc.parallelize([2, 3, 4]).first()
 |      2
 |      >>> sc.parallelize([]).first()
 |      Traceback (most recent call last):
 |          ...
 |      ValueError: RDD is empty
 |
 |  flatMap(self: 'RDD[T]', f: Callable[[~T], Iterable[~U]], preservesPartitioning: bool = False) -> 'RDD[U]'
 |      Return a new RDD by first applying a function to all elements of this
 |      RDD, and then flattening the results.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      f : function
 |          a function to turn a T into a sequence of U
 |      preservesPartitioning : bool, optional, default False
 |          indicates whether the input function preserves the partitioner,
 |          which should be False unless this is a pair RDD and the input
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a new :class:`RDD` by applying a function to all elements
 |
 |      See Also
 |      --------
 |      :meth:`RDD.map`
 |      :meth:`RDD.mapPartitions`
 |      :meth:`RDD.mapPartitionsWithIndex`
 |      :meth:`RDD.mapPartitionsWithSplit`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([2, 3, 4])
 |      >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
 |      [1, 1, 1, 2, 2, 3]
 |      >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
 |      [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
 |
 |  flatMapValues(self: 'RDD[Tuple[K, V]]', f: Callable[[~V], Iterable[~U]]) -> 'RDD[Tuple[K, U]]'
 |      Pass each value in the key-value pair RDD through a flatMap function
 |      without changing the keys; this also retains the original RDD's
 |      partitioning.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      f : function
 |         a function to turn a V into a sequence of U
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` containing the keys and the flat-mapped value
 |
 |      See Also
 |      --------
 |      :meth:`RDD.flatMap`
 |      :meth:`RDD.mapValues`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
 |      >>> def f(x): return x
 |      ...
 |      >>> rdd.flatMapValues(f).collect()
 |      [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
 |
 |  fold(self: 'RDD[T]', zeroValue: ~T, op: Callable[[~T, ~T], ~T]) -> ~T
 |      Aggregate the elements of each partition, and then the results for all
 |      the partitions, using a given associative function and a neutral "zero value."
 |
 |      The function ``op(t1, t2)`` is allowed to modify ``t1`` and return it
 |      as its result value to avoid object allocation; however, it should not
 |      modify ``t2``.
 |
 |      This behaves somewhat differently from fold operations implemented
 |      for non-distributed collections in functional languages like Scala.
 |      This fold operation may be applied to partitions individually, and then
 |      fold those results into the final result, rather than apply the fold
 |      to each element sequentially in some defined ordering. For functions
 |      that are not commutative, the result may differ from that of a fold
 |      applied to a non-distributed collection.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      zeroValue : T
 |          the initial value for the accumulated result of each partition
 |      op : function
 |          a function used to both accumulate results within a partition and combine
 |          results from different partitions
 |
 |      Returns
 |      -------
 |      T
 |          the aggregated result
 |
 |      See Also
 |      --------
 |      :meth:`RDD.reduce`
 |      :meth:`RDD.aggregate`
 |
 |      Examples
 |      --------
 |      >>> from operator import add
 |      >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
 |      15
 |
 |  foldByKey(self: 'RDD[Tuple[K, V]]', zeroValue: ~V, func: Callable[[~V, ~V], ~V], numPartitions: Optional[int] = None, partitionFunc: Callable[[~K], int] = <function portable_hash at 0x7d0ad45511c0>) -> 'RDD[Tuple[K, V]]'
 |      Merge the values for each key using an associative function "func"
 |      and a neutral "zeroValue" which may be added to the result an
 |      arbitrary number of times, and must not change the result
 |      (e.g., 0 for addition, or 1 for multiplication.).
 |
 |      .. versionadded:: 1.1.0
 |
 |      Parameters
 |      ----------
 |      zeroValue : V
 |          the initial value for the accumulated result of each partition
 |      func : function
 |          a function to combine two V's into a single one
 |      numPartitions : int, optional
 |          the number of partitions in new :class:`RDD`
 |      partitionFunc : function, optional, default `portable_hash`
 |          function to compute the partition index
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` containing the keys and the aggregated result for each key
 |
 |      See Also
 |      --------
 |      :meth:`RDD.reduceByKey`
 |      :meth:`RDD.combineByKey`
 |      :meth:`RDD.aggregateByKey`
 |      :meth:`RDD.groupByKey`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
 |      >>> from operator import add
 |      >>> sorted(rdd.foldByKey(0, add).collect())
 |      [('a', 2), ('b', 1)]
 |
 |  foreach(self: 'RDD[T]', f: Callable[[~T], NoneType]) -> None
 |      Applies a function to all elements of this RDD.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      f : function
 |          a function applied to each element
 |
 |      See Also
 |      --------
 |      :meth:`RDD.foreachPartition`
 |      :meth:`pyspark.sql.DataFrame.foreach`
 |      :meth:`pyspark.sql.DataFrame.foreachPartition`
 |
 |      Examples
 |      --------
 |      >>> def f(x): print(x)
 |      ...
 |      >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
 |
 |  foreachPartition(self: 'RDD[T]', f: Callable[[Iterable[~T]], NoneType]) -> None
 |      Applies a function to each partition of this RDD.
 |
 |      .. versionadded:: 1.0.0
 |
 |      Parameters
 |      ----------
 |      f : function
 |          a function applied to each partition
 |
 |      See Also
 |      --------
 |      :meth:`RDD.foreach`
 |      :meth:`pyspark.sql.DataFrame.foreach`
 |      :meth:`pyspark.sql.DataFrame.foreachPartition`
 |
 |      Examples
 |      --------
 |      >>> def f(iterator):
 |      ...     for x in iterator:
 |      ...          print(x)
 |      ...
 |      >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
 |
 |  fullOuterJoin(self: 'RDD[Tuple[K, V]]', other: 'RDD[Tuple[K, U]]', numPartitions: Optional[int] = None) -> 'RDD[Tuple[K, Tuple[Optional[V], Optional[U]]]]'
 |      Perform a right outer join of `self` and `other`.
 |
 |      For each element (k, v) in `self`, the resulting RDD will either
 |      contain all pairs (k, (v, w)) for w in `other`, or the pair
 |      (k, (v, None)) if no elements in `other` have key k.
 |
 |      Similarly, for each element (k, w) in `other`, the resulting RDD will
 |      either contain all pairs (k, (v, w)) for v in `self`, or the pair
 |      (k, (None, w)) if no elements in `self` have key k.
 |
 |      Hash-partitions the resulting RDD into the given number of partitions.
 |
 |      .. versionadded:: 1.2.0
 |
 |      Parameters
 |      ----------
 |      other : :class:`RDD`
 |          another :class:`RDD`
 |      numPartitions : int, optional
 |          the number of partitions in new :class:`RDD`
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` containing all pairs of elements with matching keys
 |
 |      See Also
 |      --------
 |      :meth:`RDD.join`
 |      :meth:`RDD.leftOuterJoin`
 |      :meth:`RDD.fullOuterJoin`
 |      :meth:`pyspark.sql.DataFrame.join`
 |
 |      Examples
 |      --------
 |      >>> rdd1 = sc.parallelize([("a", 1), ("b", 4)])
 |      >>> rdd2 = sc.parallelize([("a", 2), ("c", 8)])
 |      >>> sorted(rdd1.fullOuterJoin(rdd2).collect())
 |      [('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
 |
 |  getCheckpointFile(self) -> Optional[str]
 |      Gets the name of the file to which this RDD was checkpointed
 |
 |      Not defined if RDD is checkpointed locally.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Returns
 |      -------
 |      str
 |          the name of the file to which this :class:`RDD` was checkpointed
 |
 |      See Also
 |      --------
 |      :meth:`RDD.checkpoint`
 |      :meth:`SparkContext.setCheckpointDir`
 |      :meth:`SparkContext.getCheckpointDir`
 |
 |  getResourceProfile(self) -> Optional[pyspark.resource.profile.ResourceProfile]
 |      Get the :class:`pyspark.resource.ResourceProfile` specified with this RDD or None
 |      if it wasn't specified.
 |
 |      .. versionadded:: 3.1.0
 |
 |      Returns
 |      -------
 |      class:`pyspark.resource.ResourceProfile`
 |          The user specified profile or None if none were specified
 |
 |      See Also
 |      --------
 |      :meth:`RDD.withResources`
 |
 |      Notes
 |      -----
 |      This API is experimental
 |
 |  getStorageLevel(self) -> pyspark.storagelevel.StorageLevel
 |      Get the RDD's current storage level.
 |
 |      .. versionadded:: 1.0.0
 |
 |      Returns
 |      -------
 |      :class:`StorageLevel`
 |          current :class:`StorageLevel`
 |
 |      See Also
 |      --------
 |      :meth:`RDD.name`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([1,2])
 |      >>> rdd.getStorageLevel()
 |      StorageLevel(False, False, False, False, 1)
 |      >>> print(rdd.getStorageLevel())
 |      Serialized 1x Replicated
 |
 |  glom(self: 'RDD[T]') -> 'RDD[List[T]]'
 |      Return an RDD created by coalescing all elements within each partition
 |      into a list.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a new :class:`RDD` coalescing all elements within each partition into a list
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
 |      >>> sorted(rdd.glom().collect())
 |      [[1, 2], [3, 4]]
 |
 |  groupBy(self: 'RDD[T]', f: Callable[[~T], ~K], numPartitions: Optional[int] = None, partitionFunc: Callable[[~K], int] = <function portable_hash at 0x7d0ad45511c0>) -> 'RDD[Tuple[K, Iterable[T]]]'
 |      Return an RDD of grouped items.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      f : function
 |          a function to compute the key
 |      numPartitions : int, optional
 |          the number of partitions in new :class:`RDD`
 |      partitionFunc : function, optional, default `portable_hash`
 |          a function to compute the partition index
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a new :class:`RDD` of grouped items
 |
 |      See Also
 |      --------
 |      :meth:`RDD.groupByKey`
 |      :meth:`pyspark.sql.DataFrame.groupBy`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
 |      >>> result = rdd.groupBy(lambda x: x % 2).collect()
 |      >>> sorted([(x, sorted(y)) for (x, y) in result])
 |      [(0, [2, 8]), (1, [1, 1, 3, 5])]
 |
 |  groupByKey(self: 'RDD[Tuple[K, V]]', numPartitions: Optional[int] = None, partitionFunc: Callable[[~K], int] = <function portable_hash at 0x7d0ad45511c0>) -> 'RDD[Tuple[K, Iterable[V]]]'
 |      Group the values for each key in the RDD into a single sequence.
 |      Hash-partitions the resulting RDD with numPartitions partitions.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      numPartitions : int, optional
 |          the number of partitions in new :class:`RDD`
 |      partitionFunc : function, optional, default `portable_hash`
 |          function to compute the partition index
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` containing the keys and the grouped result for each key
 |
 |      See Also
 |      --------
 |      :meth:`RDD.reduceByKey`
 |      :meth:`RDD.combineByKey`
 |      :meth:`RDD.aggregateByKey`
 |      :meth:`RDD.foldByKey`
 |
 |      Notes
 |      -----
 |      If you are grouping in order to perform an aggregation (such as a
 |      sum or average) over each key, using reduceByKey or aggregateByKey will
 |      provide much better performance.
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
 |      >>> sorted(rdd.groupByKey().mapValues(len).collect())
 |      [('a', 2), ('b', 1)]
 |      >>> sorted(rdd.groupByKey().mapValues(list).collect())
 |      [('a', [1, 1]), ('b', [1])]
 |
 |  groupWith(self: 'RDD[Tuple[Any, Any]]', other: 'RDD[Tuple[Any, Any]]', *others: 'RDD[Tuple[Any, Any]]') -> 'RDD[Tuple[Any, Tuple[ResultIterable[Any], ...]]]'
 |      Alias for cogroup but with support for multiple RDDs.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      other : :class:`RDD`
 |          another :class:`RDD`
 |      others : :class:`RDD`
 |          other :class:`RDD`\s
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` containing the keys and cogrouped values
 |
 |      See Also
 |      --------
 |      :meth:`RDD.cogroup`
 |      :meth:`RDD.join`
 |
 |      Examples
 |      --------
 |      >>> rdd1 = sc.parallelize([("a", 5), ("b", 6)])
 |      >>> rdd2 = sc.parallelize([("a", 1), ("b", 4)])
 |      >>> rdd3 = sc.parallelize([("a", 2)])
 |      >>> rdd4 = sc.parallelize([("b", 42)])
 |      >>> [(x, tuple(map(list, y))) for x, y in
 |      ...     sorted(list(rdd1.groupWith(rdd2, rdd3, rdd4).collect()))]
 |      [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]
 |
 |  histogram(self: 'RDD[S]', buckets: Union[int, List[ForwardRef('S')], Tuple[ForwardRef('S'), ...]]) -> Tuple[Sequence[ForwardRef('S')], List[int]]
 |      Compute a histogram using the provided buckets. The buckets
 |      are all open to the right except for the last which is closed.
 |      e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50],
 |      which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1
 |      and 50 we would have a histogram of 1,0,1.
 |
 |      If your histogram is evenly spaced (e.g. [0, 10, 20, 30]),
 |      this can be switched from an O(log n) insertion to O(1) per
 |      element (where n is the number of buckets).
 |
 |      Buckets must be sorted, not contain any duplicates, and have
 |      at least two elements.
 |
 |      If `buckets` is a number, it will generate buckets which are
 |      evenly spaced between the minimum and maximum of the RDD. For
 |      example, if the min value is 0 and the max is 100, given `buckets`
 |      as 2, the resulting buckets will be [0,50) [50,100]. `buckets` must
 |      be at least 1. An exception is raised if the RDD contains infinity.
 |      If the elements in the RDD do not vary (max == min), a single bucket
 |      will be used.
 |
 |      .. versionadded:: 1.2.0
 |
 |      Parameters
 |      ----------
 |      buckets : int, or list, or tuple
 |          if `buckets` is a number, it computes a histogram of the data using
 |          `buckets` number of buckets evenly, otherwise, `buckets` is the provided
 |          buckets to bin the data.
 |
 |      Returns
 |      -------
 |      tuple
 |          a tuple of buckets and histogram
 |
 |      See Also
 |      --------
 |      :meth:`RDD.stats`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize(range(51))
 |      >>> rdd.histogram(2)
 |      ([0, 25, 50], [25, 26])
 |      >>> rdd.histogram([0, 5, 25, 50])
 |      ([0, 5, 25, 50], [5, 20, 26])
 |      >>> rdd.histogram([0, 15, 30, 45, 60])  # evenly spaced buckets
 |      ([0, 15, 30, 45, 60], [15, 15, 15, 6])
 |      >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"])
 |      >>> rdd.histogram(("a", "b", "c"))
 |      (('a', 'b', 'c'), [2, 2])
 |
 |  intersection(self: 'RDD[T]', other: 'RDD[T]') -> 'RDD[T]'
 |      Return the intersection of this RDD and another one. The output will
 |      not contain any duplicate elements, even if the input RDDs did.
 |
 |      .. versionadded:: 1.0.0
 |
 |      Parameters
 |      ----------
 |      other : :class:`RDD`
 |          another :class:`RDD`
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          the intersection of this :class:`RDD` and another one
 |
 |      See Also
 |      --------
 |      :meth:`pyspark.sql.DataFrame.intersect`
 |
 |      Notes
 |      -----
 |      This method performs a shuffle internally.
 |
 |      Examples
 |      --------
 |      >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
 |      >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
 |      >>> rdd1.intersection(rdd2).collect()
 |      [1, 2, 3]
 |
 |  isCheckpointed(self) -> bool
 |      Return whether this RDD is checkpointed and materialized, either reliably or locally.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Returns
 |      -------
 |      bool
 |          whether this :class:`RDD` is checkpointed and materialized, either reliably or locally
 |
 |      See Also
 |      --------
 |      :meth:`RDD.checkpoint`
 |      :meth:`RDD.getCheckpointFile`
 |      :meth:`SparkContext.setCheckpointDir`
 |      :meth:`SparkContext.getCheckpointDir`
 |
 |  isEmpty(self) -> bool
 |      Returns true if and only if the RDD contains no elements at all.
 |
 |      .. versionadded:: 1.3.0
 |
 |      Returns
 |      -------
 |      bool
 |          whether the :class:`RDD` is empty
 |
 |      See Also
 |      --------
 |      :meth:`RDD.first`
 |      :meth:`pyspark.sql.DataFrame.isEmpty`
 |
 |      Notes
 |      -----
 |      An RDD may be empty even when it has at least 1 partition.
 |
 |      Examples
 |      --------
 |      >>> sc.parallelize([]).isEmpty()
 |      True
 |      >>> sc.parallelize([1]).isEmpty()
 |      False
 |
 |  isLocallyCheckpointed(self) -> bool
 |      Return whether this RDD is marked for local checkpointing.
 |
 |      Exposed for testing.
 |
 |      .. versionadded:: 2.2.0
 |
 |      Returns
 |      -------
 |      bool
 |          whether this :class:`RDD` is marked for local checkpointing
 |
 |      See Also
 |      --------
 |      :meth:`RDD.localCheckpoint`
 |
 |  join(self: 'RDD[Tuple[K, V]]', other: 'RDD[Tuple[K, U]]', numPartitions: Optional[int] = None) -> 'RDD[Tuple[K, Tuple[V, U]]]'
 |      Return an RDD containing all pairs of elements with matching keys in
 |      `self` and `other`.
 |
 |      Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
 |      (k, v1) is in `self` and (k, v2) is in `other`.
 |
 |      Performs a hash join across the cluster.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      other : :class:`RDD`
 |          another :class:`RDD`
 |      numPartitions : int, optional
 |          the number of partitions in new :class:`RDD`
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` containing all pairs of elements with matching keys
 |
 |      See Also
 |      --------
 |      :meth:`RDD.leftOuterJoin`
 |      :meth:`RDD.rightOuterJoin`
 |      :meth:`RDD.fullOuterJoin`
 |      :meth:`RDD.cogroup`
 |      :meth:`RDD.groupWith`
 |      :meth:`pyspark.sql.DataFrame.join`
 |
 |      Examples
 |      --------
 |      >>> rdd1 = sc.parallelize([("a", 1), ("b", 4)])
 |      >>> rdd2 = sc.parallelize([("a", 2), ("a", 3)])
 |      >>> sorted(rdd1.join(rdd2).collect())
 |      [('a', (1, 2)), ('a', (1, 3))]
 |
 |  keyBy(self: 'RDD[T]', f: Callable[[~T], ~K]) -> 'RDD[Tuple[K, T]]'
 |      Creates tuples of the elements in this RDD by applying `f`.
 |
 |      .. versionadded:: 0.9.1
 |
 |      Parameters
 |      ----------
 |      f : function
 |          a function to compute the key
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` with the elements from this that are not in `other`
 |
 |      See Also
 |      --------
 |      :meth:`RDD.map`
 |      :meth:`RDD.keys`
 |      :meth:`RDD.values`
 |
 |      Examples
 |      --------
 |      >>> rdd1 = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
 |      >>> rdd2 = sc.parallelize(zip(range(0,5), range(0,5)))
 |      >>> [(x, list(map(list, y))) for x, y in sorted(rdd1.cogroup(rdd2).collect())]
 |      [(0, [[0], [0]]), (1, [[1], [1]]), (2, [[], [2]]), (3, [[], [3]]), (4, [[2], [4]])]
 |
 |  keys(self: 'RDD[Tuple[K, V]]') -> 'RDD[K]'
 |      Return an RDD with the keys of each tuple.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` only containing the keys
 |
 |      See Also
 |      --------
 |      :meth:`RDD.values`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([(1, 2), (3, 4)]).keys()
 |      >>> rdd.collect()
 |      [1, 3]
 |
 |  leftOuterJoin(self: 'RDD[Tuple[K, V]]', other: 'RDD[Tuple[K, U]]', numPartitions: Optional[int] = None) -> 'RDD[Tuple[K, Tuple[V, Optional[U]]]]'
 |      Perform a left outer join of `self` and `other`.
 |
 |      For each element (k, v) in `self`, the resulting RDD will either
 |      contain all pairs (k, (v, w)) for w in `other`, or the pair
 |      (k, (v, None)) if no elements in `other` have key k.
 |
 |      Hash-partitions the resulting RDD into the given number of partitions.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      other : :class:`RDD`
 |          another :class:`RDD`
 |      numPartitions : int, optional
 |          the number of partitions in new :class:`RDD`
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` containing all pairs of elements with matching keys
 |
 |      See Also
 |      --------
 |      :meth:`RDD.join`
 |      :meth:`RDD.rightOuterJoin`
 |      :meth:`RDD.fullOuterJoin`
 |      :meth:`pyspark.sql.DataFrame.join`
 |
 |      Examples
 |      --------
 |      >>> rdd1 = sc.parallelize([("a", 1), ("b", 4)])
 |      >>> rdd2 = sc.parallelize([("a", 2)])
 |      >>> sorted(rdd1.leftOuterJoin(rdd2).collect())
 |      [('a', (1, 2)), ('b', (4, None))]
 |
 |  localCheckpoint(self) -> None
 |      Mark this RDD for local checkpointing using Spark's existing caching layer.
 |
 |      This method is for users who wish to truncate RDD lineages while skipping the expensive
 |      step of replicating the materialized data in a reliable distributed file system. This is
 |      useful for RDDs with long lineages that need to be truncated periodically (e.g. GraphX).
 |
 |      Local checkpointing sacrifices fault-tolerance for performance. In particular, checkpointed
 |      data is written to ephemeral local storage in the executors instead of to a reliable,
 |      fault-tolerant storage. The effect is that if an executor fails during the computation,
 |      the checkpointed data may no longer be accessible, causing an irrecoverable job failure.
 |
 |      This is NOT safe to use with dynamic allocation, which removes executors along
 |      with their cached blocks. If you must use both features, you are advised to set
 |      `spark.dynamicAllocation.cachedExecutorIdleTimeout` to a high value.
 |
 |      The checkpoint directory set through :meth:`SparkContext.setCheckpointDir` is not used.
 |
 |      .. versionadded:: 2.2.0
 |
 |      See Also
 |      --------
 |      :meth:`RDD.checkpoint`
 |      :meth:`RDD.isLocallyCheckpointed`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.range(5)
 |      >>> rdd.isLocallyCheckpointed()
 |      False
 |
 |      >>> rdd.localCheckpoint()
 |      >>> rdd.isLocallyCheckpointed()
 |      True
 |
 |  lookup(self: 'RDD[Tuple[K, V]]', key: ~K) -> List[~V]
 |      Return the list of values in the RDD for key `key`. This operation
 |      is done efficiently if the RDD has a known partitioner by only
 |      searching the partition that the key maps to.
 |
 |      .. versionadded:: 1.2.0
 |
 |      Parameters
 |      ----------
 |      key : K
 |          the key to look up
 |
 |      Returns
 |      -------
 |      list
 |          the list of values in the :class:`RDD` for key `key`
 |
 |      Examples
 |      --------
 |      >>> l = range(1000)
 |      >>> rdd = sc.parallelize(zip(l, l), 10)
 |      >>> rdd.lookup(42)  # slow
 |      [42]
 |      >>> sorted = rdd.sortByKey()
 |      >>> sorted.lookup(42)  # fast
 |      [42]
 |      >>> sorted.lookup(1024)
 |      []
 |      >>> rdd2 = sc.parallelize([(('a', 'b'), 'c')]).groupByKey()
 |      >>> list(rdd2.lookup(('a', 'b'))[0])
 |      ['c']
 |
 |  map(self: 'RDD[T]', f: Callable[[~T], ~U], preservesPartitioning: bool = False) -> 'RDD[U]'
 |      Return a new RDD by applying a function to each element of this RDD.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      f : function
 |          a function to run on each element of the RDD
 |      preservesPartitioning : bool, optional, default False
 |          indicates whether the input function preserves the partitioner,
 |          which should be False unless this is a pair RDD and the input
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a new :class:`RDD` by applying a function to all elements
 |
 |      See Also
 |      --------
 |      :meth:`RDD.flatMap`
 |      :meth:`RDD.mapPartitions`
 |      :meth:`RDD.mapPartitionsWithIndex`
 |      :meth:`RDD.mapPartitionsWithSplit`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize(["b", "a", "c"])
 |      >>> sorted(rdd.map(lambda x: (x, 1)).collect())
 |      [('a', 1), ('b', 1), ('c', 1)]
 |
 |  mapPartitions(self: 'RDD[T]', f: Callable[[Iterable[~T]], Iterable[~U]], preservesPartitioning: bool = False) -> 'RDD[U]'
 |      Return a new RDD by applying a function to each partition of this RDD.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      f : function
 |          a function to run on each partition of the RDD
 |      preservesPartitioning : bool, optional, default False
 |          indicates whether the input function preserves the partitioner,
 |          which should be False unless this is a pair RDD and the input
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a new :class:`RDD` by applying a function to each partition
 |
 |      See Also
 |      --------
 |      :meth:`RDD.map`
 |      :meth:`RDD.flatMap`
 |      :meth:`RDD.mapPartitionsWithIndex`
 |      :meth:`RDD.mapPartitionsWithSplit`
 |      :meth:`RDDBarrier.mapPartitions`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
 |      >>> def f(iterator): yield sum(iterator)
 |      ...
 |      >>> rdd.mapPartitions(f).collect()
 |      [3, 7]
 |
 |  mapPartitionsWithIndex(self: 'RDD[T]', f: Callable[[int, Iterable[~T]], Iterable[~U]], preservesPartitioning: bool = False) -> 'RDD[U]'
 |      Return a new RDD by applying a function to each partition of this RDD,
 |      while tracking the index of the original partition.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      f : function
 |          a function to run on each partition of the RDD
 |      preservesPartitioning : bool, optional, default False
 |          indicates whether the input function preserves the partitioner,
 |          which should be False unless this is a pair RDD and the input
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a new :class:`RDD` by applying a function to each partition
 |
 |      See Also
 |      --------
 |      :meth:`RDD.map`
 |      :meth:`RDD.flatMap`
 |      :meth:`RDD.mapPartitions`
 |      :meth:`RDD.mapPartitionsWithSplit`
 |      :meth:`RDDBarrier.mapPartitionsWithIndex`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
 |      >>> def f(splitIndex, iterator): yield splitIndex
 |      ...
 |      >>> rdd.mapPartitionsWithIndex(f).sum()
 |      6
 |
 |  mapPartitionsWithSplit(self: 'RDD[T]', f: Callable[[int, Iterable[~T]], Iterable[~U]], preservesPartitioning: bool = False) -> 'RDD[U]'
 |      Return a new RDD by applying a function to each partition of this RDD,
 |      while tracking the index of the original partition.
 |
 |      .. versionadded:: 0.7.0
 |
 |      .. deprecated:: 0.9.0
 |          use meth:`RDD.mapPartitionsWithIndex` instead.
 |
 |      Parameters
 |      ----------
 |      f : function
 |          a function to run on each partition of the RDD
 |      preservesPartitioning : bool, optional, default False
 |          indicates whether the input function preserves the partitioner,
 |          which should be False unless this is a pair RDD and the input
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a new :class:`RDD` by applying a function to each partition
 |
 |      See Also
 |      --------
 |      :meth:`RDD.map`
 |      :meth:`RDD.flatMap`
 |      :meth:`RDD.mapPartitions`
 |      :meth:`RDD.mapPartitionsWithIndex`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
 |      >>> def f(splitIndex, iterator): yield splitIndex
 |      ...
 |      >>> rdd.mapPartitionsWithSplit(f).sum()
 |      6
 |
 |  mapValues(self: 'RDD[Tuple[K, V]]', f: Callable[[~V], ~U]) -> 'RDD[Tuple[K, U]]'
 |      Pass each value in the key-value pair RDD through a map function
 |      without changing the keys; this also retains the original RDD's
 |      partitioning.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      f : function
 |         a function to turn a V into a U
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` containing the keys and the mapped value
 |
 |      See Also
 |      --------
 |      :meth:`RDD.map`
 |      :meth:`RDD.flatMapValues`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
 |      >>> def f(x): return len(x)
 |      ...
 |      >>> rdd.mapValues(f).collect()
 |      [('a', 3), ('b', 1)]
 |
 |  max(self: 'RDD[T]', key: Optional[Callable[[~T], ForwardRef('S')]] = None) -> ~T
 |      Find the maximum item in this RDD.
 |
 |      .. versionadded:: 1.0.0
 |
 |      Parameters
 |      ----------
 |      key : function, optional
 |          A function used to generate key for comparing
 |
 |      Returns
 |      -------
 |      T
 |          the maximum item
 |
 |      See Also
 |      --------
 |      :meth:`RDD.min`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0])
 |      >>> rdd.max()
 |      43.0
 |      >>> rdd.max(key=str)
 |      5.0
 |
 |  mean(self: 'RDD[NumberOrArray]') -> float
 |      Compute the mean of this RDD's elements.
 |
 |      .. versionadded:: 0.9.1
 |
 |      Returns
 |      -------
 |      float
 |          the mean of all elements
 |
 |      See Also
 |      --------
 |      :meth:`RDD.stats`
 |      :meth:`RDD.sum`
 |      :meth:`RDD.meanApprox`
 |
 |      Examples
 |      --------
 |      >>> sc.parallelize([1, 2, 3]).mean()
 |      2.0
 |
 |  meanApprox(self: 'RDD[Union[float, int]]', timeout: int, confidence: float = 0.95) -> pyspark.rdd.BoundedFloat
 |      Approximate operation to return the mean within a timeout
 |      or meet the confidence.
 |
 |      .. versionadded:: 1.2.0
 |
 |      Parameters
 |      ----------
 |      timeout : int
 |          maximum time to wait for the job, in milliseconds
 |      confidence : float
 |          the desired statistical confidence in the result
 |
 |      Returns
 |      -------
 |      :class:`BoundedFloat`
 |          a potentially incomplete result, with error bounds
 |
 |      See Also
 |      --------
 |      :meth:`RDD.mean`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize(range(1000), 10)
 |      >>> r = sum(range(1000)) / 1000.0
 |      >>> abs(rdd.meanApprox(1000) - r) / r < 0.05
 |      True
 |
 |  min(self: 'RDD[T]', key: Optional[Callable[[~T], ForwardRef('S')]] = None) -> ~T
 |      Find the minimum item in this RDD.
 |
 |      .. versionadded:: 1.0.0
 |
 |      Parameters
 |      ----------
 |      key : function, optional
 |          A function used to generate key for comparing
 |
 |      Returns
 |      -------
 |      T
 |          the minimum item
 |
 |      See Also
 |      --------
 |      :meth:`RDD.max`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([2.0, 5.0, 43.0, 10.0])
 |      >>> rdd.min()
 |      2.0
 |      >>> rdd.min(key=str)
 |      10.0
 |
 |  name(self) -> Optional[str]
 |      Return the name of this RDD.
 |
 |      .. versionadded:: 1.0.0
 |
 |      Returns
 |      -------
 |      str
 |          :class:`RDD` name
 |
 |      See Also
 |      --------
 |      :meth:`RDD.setName`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.range(5)
 |      >>> rdd.name() == None
 |      True
 |
 |  partitionBy(self: 'RDD[Tuple[K, V]]', numPartitions: Optional[int], partitionFunc: Callable[[~K], int] = <function portable_hash at 0x7d0ad45511c0>) -> 'RDD[Tuple[K, V]]'
 |      Return a copy of the RDD partitioned using the specified partitioner.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      numPartitions : int, optional
 |          the number of partitions in new :class:`RDD`
 |      partitionFunc : function, optional, default `portable_hash`
 |          function to compute the partition index
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` partitioned using the specified partitioner
 |
 |      See Also
 |      --------
 |      :meth:`RDD.repartition`
 |      :meth:`RDD.repartitionAndSortWithinPartitions`
 |
 |      Examples
 |      --------
 |      >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
 |      >>> sets = pairs.partitionBy(2).glom().collect()
 |      >>> len(set(sets[0]).intersection(set(sets[1])))
 |      0
 |
 |  persist(self: 'RDD[T]', storageLevel: pyspark.storagelevel.StorageLevel = StorageLevel(False, True, False, False, 1)) -> 'RDD[T]'
 |      Set this RDD's storage level to persist its values across operations
 |      after the first time it is computed. This can only be used to assign
 |      a new storage level if the RDD does not have a storage level set yet.
 |      If no storage level is specified defaults to (`MEMORY_ONLY`).
 |
 |      .. versionadded:: 0.9.1
 |
 |      Parameters
 |      ----------
 |      storageLevel : :class:`StorageLevel`, default `MEMORY_ONLY`
 |          the target storage level
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          The same :class:`RDD` with storage level set to `storageLevel`.
 |
 |      See Also
 |      --------
 |      :meth:`RDD.cache`
 |      :meth:`RDD.unpersist`
 |      :meth:`RDD.getStorageLevel`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize(["b", "a", "c"])
 |      >>> rdd.persist().is_cached
 |      True
 |      >>> str(rdd.getStorageLevel())
 |      'Memory Serialized 1x Replicated'
 |      >>> _ = rdd.unpersist()
 |      >>> rdd.is_cached
 |      False
 |
 |      >>> from pyspark import StorageLevel
 |      >>> rdd2 = sc.range(5)
 |      >>> _ = rdd2.persist(StorageLevel.MEMORY_AND_DISK)
 |      >>> rdd2.is_cached
 |      True
 |      >>> str(rdd2.getStorageLevel())
 |      'Disk Memory Serialized 1x Replicated'
 |
 |      Can not override existing storage level
 |
 |      >>> _ = rdd2.persist(StorageLevel.MEMORY_ONLY_2)
 |      Traceback (most recent call last):
 |          ...
 |      py4j.protocol.Py4JJavaError: ...
 |
 |      Assign another storage level after `unpersist`
 |
 |      >>> _ = rdd2.unpersist()
 |      >>> rdd2.is_cached
 |      False
 |      >>> _ = rdd2.persist(StorageLevel.MEMORY_ONLY_2)
 |      >>> str(rdd2.getStorageLevel())
 |      'Memory Serialized 2x Replicated'
 |      >>> rdd2.is_cached
 |      True
 |      >>> _ = rdd2.unpersist()
 |
 |  pipe(self, command: str, env: Optional[Dict[str, str]] = None, checkCode: bool = False) -> 'RDD[str]'
 |      Return an RDD created by piping elements to a forked external process.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      command : str
 |          command to run.
 |      env : dict, optional
 |          environment variables to set.
 |      checkCode : bool, optional
 |          whether to check the return value of the shell command.
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a new :class:`RDD` of strings
 |
 |      Examples
 |      --------
 |      >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
 |      ['1', '2', '', '3']
 |
 |  randomSplit(self: 'RDD[T]', weights: Sequence[Union[int, float]], seed: Optional[int] = None) -> 'List[RDD[T]]'
 |      Randomly splits this RDD with the provided weights.
 |
 |      .. versionadded:: 1.3.0
 |
 |      Parameters
 |      ----------
 |      weights : list
 |          weights for splits, will be normalized if they don't sum to 1
 |      seed : int, optional
 |          random seed
 |
 |      Returns
 |      -------
 |      list
 |          split :class:`RDD`\s in a list
 |
 |      See Also
 |      --------
 |      :meth:`pyspark.sql.DataFrame.randomSplit`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize(range(500), 1)
 |      >>> rdd1, rdd2 = rdd.randomSplit([2, 3], 17)
 |      >>> len(rdd1.collect() + rdd2.collect())
 |      500
 |      >>> 150 < rdd1.count() < 250
 |      True
 |      >>> 250 < rdd2.count() < 350
 |      True
 |
 |  reduce(self: 'RDD[T]', f: Callable[[~T, ~T], ~T]) -> ~T
 |      Reduces the elements of this RDD using the specified commutative and
 |      associative binary operator. Currently reduces partitions locally.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      f : function
 |          the reduce function
 |
 |      Returns
 |      -------
 |      T
 |          the aggregated result
 |
 |      See Also
 |      --------
 |      :meth:`RDD.treeReduce`
 |      :meth:`RDD.aggregate`
 |      :meth:`RDD.treeAggregate`
 |
 |      Examples
 |      --------
 |      >>> from operator import add
 |      >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
 |      15
 |      >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
 |      10
 |      >>> sc.parallelize([]).reduce(add)
 |      Traceback (most recent call last):
 |          ...
 |      ValueError: Can not reduce() empty RDD
 |
 |  reduceByKey(self: 'RDD[Tuple[K, V]]', func: Callable[[~V, ~V], ~V], numPartitions: Optional[int] = None, partitionFunc: Callable[[~K], int] = <function portable_hash at 0x7d0ad45511c0>) -> 'RDD[Tuple[K, V]]'
 |      Merge the values for each key using an associative and commutative reduce function.
 |
 |      This will also perform the merging locally on each mapper before
 |      sending results to a reducer, similarly to a "combiner" in MapReduce.
 |
 |      Output will be partitioned with `numPartitions` partitions, or
 |      the default parallelism level if `numPartitions` is not specified.
 |      Default partitioner is hash-partition.
 |
 |      .. versionadded:: 1.6.0
 |
 |      Parameters
 |      ----------
 |      func : function
 |          the reduce function
 |      numPartitions : int, optional
 |          the number of partitions in new :class:`RDD`
 |      partitionFunc : function, optional, default `portable_hash`
 |          function to compute the partition index
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` containing the keys and the aggregated result for each key
 |
 |      See Also
 |      --------
 |      :meth:`RDD.reduceByKeyLocally`
 |      :meth:`RDD.combineByKey`
 |      :meth:`RDD.aggregateByKey`
 |      :meth:`RDD.foldByKey`
 |      :meth:`RDD.groupByKey`
 |
 |      Examples
 |      --------
 |      >>> from operator import add
 |      >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
 |      >>> sorted(rdd.reduceByKey(add).collect())
 |      [('a', 2), ('b', 1)]
 |
 |  reduceByKeyLocally(self: 'RDD[Tuple[K, V]]', func: Callable[[~V, ~V], ~V]) -> Dict[~K, ~V]
 |      Merge the values for each key using an associative and commutative reduce function, but
 |      return the results immediately to the master as a dictionary.
 |
 |      This will also perform the merging locally on each mapper before
 |      sending results to a reducer, similarly to a "combiner" in MapReduce.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      func : function
 |          the reduce function
 |
 |      Returns
 |      -------
 |      dict
 |          a dict containing the keys and the aggregated result for each key
 |
 |      See Also
 |      --------
 |      :meth:`RDD.reduceByKey`
 |      :meth:`RDD.aggregateByKey`
 |
 |      Examples
 |      --------
 |      >>> from operator import add
 |      >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
 |      >>> sorted(rdd.reduceByKeyLocally(add).items())
 |      [('a', 2), ('b', 1)]
 |
 |  repartition(self: 'RDD[T]', numPartitions: int) -> 'RDD[T]'
 |       Return a new RDD that has exactly numPartitions partitions.
 |
 |       Can increase or decrease the level of parallelism in this RDD.
 |       Internally, this uses a shuffle to redistribute data.
 |       If you are decreasing the number of partitions in this RDD, consider
 |       using `coalesce`, which can avoid performing a shuffle.
 |
 |      .. versionadded:: 1.0.0
 |
 |      Parameters
 |      ----------
 |      numPartitions : int, optional
 |          the number of partitions in new :class:`RDD`
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` with exactly numPartitions partitions
 |
 |      See Also
 |      --------
 |      :meth:`RDD.coalesce`
 |      :meth:`RDD.partitionBy`
 |      :meth:`RDD.repartitionAndSortWithinPartitions`
 |
 |      Examples
 |      --------
 |       >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
 |       >>> sorted(rdd.glom().collect())
 |       [[1], [2, 3], [4, 5], [6, 7]]
 |       >>> len(rdd.repartition(2).glom().collect())
 |       2
 |       >>> len(rdd.repartition(10).glom().collect())
 |       10
 |
 |  repartitionAndSortWithinPartitions(self: 'RDD[Tuple[Any, Any]]', numPartitions: Optional[int] = None, partitionFunc: Callable[[Any], int] = <function portable_hash at 0x7d0ad45511c0>, ascending: bool = True, keyfunc: Callable[[Any], Any] = <function RDD.<lambda> at 0x7d0ad465b4c0>) -> 'RDD[Tuple[Any, Any]]'
 |      Repartition the RDD according to the given partitioner and, within each resulting partition,
 |      sort records by their keys.
 |
 |      .. versionadded:: 1.2.0
 |
 |      Parameters
 |      ----------
 |      numPartitions : int, optional
 |          the number of partitions in new :class:`RDD`
 |      partitionFunc : function, optional, default `portable_hash`
 |          a function to compute the partition index
 |      ascending : bool, optional, default True
 |          sort the keys in ascending or descending order
 |      keyfunc : function, optional, default identity mapping
 |          a function to compute the key
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a new :class:`RDD`
 |
 |      See Also
 |      --------
 |      :meth:`RDD.repartition`
 |      :meth:`RDD.partitionBy`
 |      :meth:`RDD.sortBy`
 |      :meth:`RDD.sortByKey`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)])
 |      >>> rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, True)
 |      >>> rdd2.glom().collect()
 |      [[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]]
 |
 |  rightOuterJoin(self: 'RDD[Tuple[K, V]]', other: 'RDD[Tuple[K, U]]', numPartitions: Optional[int] = None) -> 'RDD[Tuple[K, Tuple[Optional[V], U]]]'
 |      Perform a right outer join of `self` and `other`.
 |
 |      For each element (k, w) in `other`, the resulting RDD will either
 |      contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
 |      if no elements in `self` have key k.
 |
 |      Hash-partitions the resulting RDD into the given number of partitions.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      other : :class:`RDD`
 |          another :class:`RDD`
 |      numPartitions : int, optional
 |          the number of partitions in new :class:`RDD`
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` containing all pairs of elements with matching keys
 |
 |      See Also
 |      --------
 |      :meth:`RDD.join`
 |      :meth:`RDD.leftOuterJoin`
 |      :meth:`RDD.fullOuterJoin`
 |      :meth:`pyspark.sql.DataFrame.join`
 |
 |      Examples
 |      --------
 |      >>> rdd1 = sc.parallelize([("a", 1), ("b", 4)])
 |      >>> rdd2 = sc.parallelize([("a", 2)])
 |      >>> sorted(rdd2.rightOuterJoin(rdd1).collect())
 |      [('a', (2, 1)), ('b', (None, 4))]
 |
 |  sample(self: 'RDD[T]', withReplacement: bool, fraction: float, seed: Optional[int] = None) -> 'RDD[T]'
 |      Return a sampled subset of this RDD.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      withReplacement : bool
 |          can elements be sampled multiple times (replaced when sampled out)
 |      fraction : float
 |          expected size of the sample as a fraction of this RDD's size
 |          without replacement: probability that each element is chosen; fraction must be [0, 1]
 |          with replacement: expected number of times each element is chosen; fraction must be >= 0
 |      seed : int, optional
 |          seed for the random number generator
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a new :class:`RDD` containing a sampled subset of elements
 |
 |      See Also
 |      --------
 |      :meth:`RDD.takeSample`
 |      :meth:`RDD.sampleByKey`
 |      :meth:`pyspark.sql.DataFrame.sample`
 |
 |      Notes
 |      -----
 |      This is not guaranteed to provide exactly the fraction specified of the total
 |      count of the given :class:`DataFrame`.
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize(range(100), 4)
 |      >>> 6 <= rdd.sample(False, 0.1, 81).count() <= 14
 |      True
 |
 |  sampleByKey(self: 'RDD[Tuple[K, V]]', withReplacement: bool, fractions: Dict[~K, Union[float, int]], seed: Optional[int] = None) -> 'RDD[Tuple[K, V]]'
 |      Return a subset of this RDD sampled by key (via stratified sampling).
 |      Create a sample of this RDD using variable sampling rates for
 |      different keys as specified by fractions, a key to sampling rate map.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      withReplacement : bool
 |          whether to sample with or without replacement
 |      fractions : dict
 |          map of specific keys to sampling rates
 |      seed : int, optional
 |          seed for the random number generator
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` containing the stratified sampling result
 |
 |      See Also
 |      --------
 |      :meth:`RDD.sample`
 |
 |      Examples
 |      --------
 |      >>> fractions = {"a": 0.2, "b": 0.1}
 |      >>> rdd = sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0, 1000)))
 |      >>> sample = dict(rdd.sampleByKey(False, fractions, 2).groupByKey().collect())
 |      >>> 100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150
 |      True
 |      >>> max(sample["a"]) <= 999 and min(sample["a"]) >= 0
 |      True
 |      >>> max(sample["b"]) <= 999 and min(sample["b"]) >= 0
 |      True
 |
 |  sampleStdev(self: 'RDD[NumberOrArray]') -> float
 |      Compute the sample standard deviation of this RDD's elements (which
 |      corrects for bias in estimating the standard deviation by dividing by
 |      N-1 instead of N).
 |
 |      .. versionadded:: 0.9.1
 |
 |      Returns
 |      -------
 |      float
 |          the sample standard deviation of all elements
 |
 |      See Also
 |      --------
 |      :meth:`RDD.stats`
 |      :meth:`RDD.stdev`
 |      :meth:`RDD.variance`
 |      :meth:`RDD.sampleVariance`
 |
 |      Examples
 |      --------
 |      >>> sc.parallelize([1, 2, 3]).sampleStdev()
 |      1.0
 |
 |  sampleVariance(self: 'RDD[NumberOrArray]') -> float
 |      Compute the sample variance of this RDD's elements (which corrects
 |      for bias in estimating the variance by dividing by N-1 instead of N).
 |
 |      .. versionadded:: 0.9.1
 |
 |      Returns
 |      -------
 |      float
 |          the sample variance of all elements
 |
 |      See Also
 |      --------
 |      :meth:`RDD.stats`
 |      :meth:`RDD.variance`
 |      :meth:`RDD.stdev`
 |      :meth:`RDD.sampleStdev`
 |
 |      Examples
 |      --------
 |      >>> sc.parallelize([1, 2, 3]).sampleVariance()
 |      1.0
 |
 |  saveAsHadoopDataset(self: 'RDD[Tuple[K, V]]', conf: Dict[str, str], keyConverter: Optional[str] = None, valueConverter: Optional[str] = None) -> None
 |      Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file
 |      system, using the old Hadoop OutputFormat API (mapred package). Keys/values are
 |      converted for output using either user specified converters or, by default,
 |      "org.apache.spark.api.python.JavaToWritableConverter".
 |
 |      .. versionadded:: 1.1.0
 |
 |      Parameters
 |      ----------
 |      conf : dict
 |          Hadoop job configuration
 |      keyConverter : str, optional
 |          fully qualified classname of key converter (None by default)
 |      valueConverter : str, optional
 |          fully qualified classname of value converter (None by default)
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.hadoopRDD`
 |      :meth:`RDD.saveAsNewAPIHadoopDataset`
 |      :meth:`RDD.saveAsHadoopFile`
 |      :meth:`RDD.saveAsNewAPIHadoopFile`
 |      :meth:`RDD.saveAsSequenceFile`
 |
 |      Examples
 |      --------
 |      >>> import os
 |      >>> import tempfile
 |
 |      Set the related classes
 |
 |      >>> output_format_class = "org.apache.hadoop.mapred.TextOutputFormat"
 |      >>> input_format_class = "org.apache.hadoop.mapred.TextInputFormat"
 |      >>> key_class = "org.apache.hadoop.io.IntWritable"
 |      >>> value_class = "org.apache.hadoop.io.Text"
 |
 |      >>> with tempfile.TemporaryDirectory() as d:
 |      ...     path = os.path.join(d, "old_hadoop_file")
 |      ...
 |      ...     # Create the conf for writing
 |      ...     write_conf = {
 |      ...         "mapred.output.format.class": output_format_class,
 |      ...         "mapreduce.job.output.key.class": key_class,
 |      ...         "mapreduce.job.output.value.class": value_class,
 |      ...         "mapreduce.output.fileoutputformat.outputdir": path,
 |      ...     }
 |      ...
 |      ...     # Write a temporary Hadoop file
 |      ...     rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")])
 |      ...     rdd.saveAsHadoopDataset(conf=write_conf)
 |      ...
 |      ...     # Create the conf for reading
 |      ...     read_conf = {"mapreduce.input.fileinputformat.inputdir": path}
 |      ...
 |      ...     # Load this Hadoop file as an RDD
 |      ...     loaded = sc.hadoopRDD(input_format_class, key_class, value_class, conf=read_conf)
 |      ...     sorted(loaded.collect())
 |      [(0, '1\t'), (0, '1\ta'), (0, '3\tx')]
 |
 |  saveAsHadoopFile(self: 'RDD[Tuple[K, V]]', path: str, outputFormatClass: str, keyClass: Optional[str] = None, valueClass: Optional[str] = None, keyConverter: Optional[str] = None, valueConverter: Optional[str] = None, conf: Optional[Dict[str, str]] = None, compressionCodecClass: Optional[str] = None) -> None
 |      Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file
 |      system, using the old Hadoop OutputFormat API (mapred package). Key and value types
 |      will be inferred if not specified. Keys and values are converted for output using either
 |      user specified converters or "org.apache.spark.api.python.JavaToWritableConverter". The
 |      `conf` is applied on top of the base Hadoop conf associated with the SparkContext
 |      of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
 |
 |      .. versionadded:: 1.1.0
 |
 |      Parameters
 |      ----------
 |      path : str
 |          path to Hadoop file
 |      outputFormatClass : str
 |          fully qualified classname of Hadoop OutputFormat
 |          (e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat")
 |      keyClass : str, optional
 |          fully qualified classname of key Writable class
 |          (e.g. "org.apache.hadoop.io.IntWritable", None by default)
 |      valueClass : str, optional
 |          fully qualified classname of value Writable class
 |          (e.g. "org.apache.hadoop.io.Text", None by default)
 |      keyConverter : str, optional
 |          fully qualified classname of key converter (None by default)
 |      valueConverter : str, optional
 |          fully qualified classname of value converter (None by default)
 |      conf : dict, optional
 |          (None by default)
 |      compressionCodecClass : str
 |          fully qualified classname of the compression codec class
 |          i.e. "org.apache.hadoop.io.compress.GzipCodec" (None by default)
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.hadoopFile`
 |      :meth:`RDD.saveAsNewAPIHadoopFile`
 |      :meth:`RDD.saveAsHadoopDataset`
 |      :meth:`RDD.saveAsNewAPIHadoopDataset`
 |      :meth:`RDD.saveAsSequenceFile`
 |
 |      Examples
 |      --------
 |      >>> import os
 |      >>> import tempfile
 |
 |      Set the related classes
 |
 |      >>> output_format_class = "org.apache.hadoop.mapred.TextOutputFormat"
 |      >>> input_format_class = "org.apache.hadoop.mapred.TextInputFormat"
 |      >>> key_class = "org.apache.hadoop.io.IntWritable"
 |      >>> value_class = "org.apache.hadoop.io.Text"
 |
 |      >>> with tempfile.TemporaryDirectory() as d:
 |      ...     path = os.path.join(d, "old_hadoop_file")
 |      ...
 |      ...     # Write a temporary Hadoop file
 |      ...     rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")])
 |      ...     rdd.saveAsHadoopFile(path, output_format_class, key_class, value_class)
 |      ...
 |      ...     # Load this Hadoop file as an RDD
 |      ...     loaded = sc.hadoopFile(path, input_format_class, key_class, value_class)
 |      ...     sorted(loaded.collect())
 |      [(0, '1\t'), (0, '1\ta'), (0, '3\tx')]
 |
 |  saveAsNewAPIHadoopDataset(self: 'RDD[Tuple[K, V]]', conf: Dict[str, str], keyConverter: Optional[str] = None, valueConverter: Optional[str] = None) -> None
 |      Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file
 |      system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are
 |      converted for output using either user specified converters or, by default,
 |      "org.apache.spark.api.python.JavaToWritableConverter".
 |
 |      .. versionadded:: 1.1.0
 |
 |      Parameters
 |      ----------
 |      conf : dict
 |          Hadoop job configuration
 |      keyConverter : str, optional
 |          fully qualified classname of key converter (None by default)
 |      valueConverter : str, optional
 |          fully qualified classname of value converter (None by default)
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.newAPIHadoopRDD`
 |      :meth:`RDD.saveAsHadoopDataset`
 |      :meth:`RDD.saveAsHadoopFile`
 |      :meth:`RDD.saveAsNewAPIHadoopFile`
 |      :meth:`RDD.saveAsSequenceFile`
 |
 |      Examples
 |      --------
 |      >>> import os
 |      >>> import tempfile
 |
 |      Set the related classes
 |
 |      >>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"
 |      >>> input_format_class = "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"
 |      >>> key_class = "org.apache.hadoop.io.IntWritable"
 |      >>> value_class = "org.apache.hadoop.io.Text"
 |
 |      >>> with tempfile.TemporaryDirectory() as d:
 |      ...     path = os.path.join(d, "new_hadoop_file")
 |      ...
 |      ...     # Create the conf for writing
 |      ...     write_conf = {
 |      ...         "mapreduce.job.outputformat.class": (output_format_class),
 |      ...         "mapreduce.job.output.key.class": key_class,
 |      ...         "mapreduce.job.output.value.class": value_class,
 |      ...         "mapreduce.output.fileoutputformat.outputdir": path,
 |      ...     }
 |      ...
 |      ...     # Write a temporary Hadoop file
 |      ...     rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")])
 |      ...     rdd.saveAsNewAPIHadoopDataset(conf=write_conf)
 |      ...
 |      ...     # Create the conf for reading
 |      ...     read_conf = {"mapreduce.input.fileinputformat.inputdir": path}
 |      ...
 |      ...     # Load this Hadoop file as an RDD
 |      ...     loaded = sc.newAPIHadoopRDD(input_format_class,
 |      ...         key_class, value_class, conf=read_conf)
 |      ...     sorted(loaded.collect())
 |      [(1, ''), (1, 'a'), (3, 'x')]
 |
 |  saveAsNewAPIHadoopFile(self: 'RDD[Tuple[K, V]]', path: str, outputFormatClass: str, keyClass: Optional[str] = None, valueClass: Optional[str] = None, keyConverter: Optional[str] = None, valueConverter: Optional[str] = None, conf: Optional[Dict[str, str]] = None) -> None
 |      Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file
 |      system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types
 |      will be inferred if not specified. Keys and values are converted for output using either
 |      user specified converters or "org.apache.spark.api.python.JavaToWritableConverter". The
 |      `conf` is applied on top of the base Hadoop conf associated with the SparkContext
 |      of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
 |
 |      .. versionadded:: 1.1.0
 |
 |      Parameters
 |      ----------
 |      path : str
 |          path to Hadoop file
 |      outputFormatClass : str
 |          fully qualified classname of Hadoop OutputFormat
 |          (e.g. "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
 |      keyClass : str, optional
 |          fully qualified classname of key Writable class
 |           (e.g. "org.apache.hadoop.io.IntWritable", None by default)
 |      valueClass : str, optional
 |          fully qualified classname of value Writable class
 |          (e.g. "org.apache.hadoop.io.Text", None by default)
 |      keyConverter : str, optional
 |          fully qualified classname of key converter (None by default)
 |      valueConverter : str, optional
 |          fully qualified classname of value converter (None by default)
 |      conf : dict, optional
 |          Hadoop job configuration (None by default)
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.newAPIHadoopFile`
 |      :meth:`RDD.saveAsHadoopDataset`
 |      :meth:`RDD.saveAsNewAPIHadoopDataset`
 |      :meth:`RDD.saveAsHadoopFile`
 |      :meth:`RDD.saveAsSequenceFile`
 |
 |      Examples
 |      --------
 |      >>> import os
 |      >>> import tempfile
 |
 |      Set the class of output format
 |
 |      >>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"
 |
 |      >>> with tempfile.TemporaryDirectory() as d:
 |      ...     path = os.path.join(d, "hadoop_file")
 |      ...
 |      ...     # Write a temporary Hadoop file
 |      ...     rdd = sc.parallelize([(1, {3.0: "bb"}), (2, {1.0: "aa"}), (3, {2.0: "dd"})])
 |      ...     rdd.saveAsNewAPIHadoopFile(path, output_format_class)
 |      ...
 |      ...     # Load this Hadoop file as an RDD
 |      ...     sorted(sc.sequenceFile(path).collect())
 |      [(1, {3.0: 'bb'}), (2, {1.0: 'aa'}), (3, {2.0: 'dd'})]
 |
 |  saveAsPickleFile(self, path: str, batchSize: int = 10) -> None
 |      Save this RDD as a SequenceFile of serialized objects. The serializer
 |      used is :class:`pyspark.serializers.CPickleSerializer`, default batch size
 |      is 10.
 |
 |      .. versionadded:: 1.1.0
 |
 |      Parameters
 |      ----------
 |      path : str
 |          path to pickled file
 |      batchSize : int, optional, default 10
 |          the number of Python objects represented as a single Java object.
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.pickleFile`
 |
 |      Examples
 |      --------
 |      >>> import os
 |      >>> import tempfile
 |      >>> with tempfile.TemporaryDirectory() as d:
 |      ...     path = os.path.join(d, "pickle_file")
 |      ...
 |      ...     # Write a temporary pickled file
 |      ...     sc.parallelize(range(10)).saveAsPickleFile(path, 3)
 |      ...
 |      ...     # Load picked file as an RDD
 |      ...     sorted(sc.pickleFile(path, 3).collect())
 |      [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
 |
 |  saveAsSequenceFile(self: 'RDD[Tuple[K, V]]', path: str, compressionCodecClass: Optional[str] = None) -> None
 |      Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file
 |      system, using the "org.apache.hadoop.io.Writable" types that we convert from the
 |      RDD's key and value types. The mechanism is as follows:
 |
 |          1. Pickle is used to convert pickled Python RDD into RDD of Java objects.
 |          2. Keys and values of this Java RDD are converted to Writables and written out.
 |
 |      .. versionadded:: 1.1.0
 |
 |      Parameters
 |      ----------
 |      path : str
 |          path to sequence file
 |      compressionCodecClass : str, optional
 |          fully qualified classname of the compression codec class
 |          i.e. "org.apache.hadoop.io.compress.GzipCodec" (None by default)
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.sequenceFile`
 |      :meth:`RDD.saveAsHadoopFile`
 |      :meth:`RDD.saveAsNewAPIHadoopFile`
 |      :meth:`RDD.saveAsHadoopDataset`
 |      :meth:`RDD.saveAsNewAPIHadoopDataset`
 |      :meth:`RDD.saveAsSequenceFile`
 |
 |      Examples
 |      --------
 |      >>> import os
 |      >>> import tempfile
 |
 |      Set the related classes
 |
 |      >>> with tempfile.TemporaryDirectory() as d:
 |      ...     path = os.path.join(d, "sequence_file")
 |      ...
 |      ...     # Write a temporary sequence file
 |      ...     rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")])
 |      ...     rdd.saveAsSequenceFile(path)
 |      ...
 |      ...     # Load this sequence file as an RDD
 |      ...     loaded = sc.sequenceFile(path)
 |      ...     sorted(loaded.collect())
 |      [(1, ''), (1, 'a'), (3, 'x')]
 |
 |  saveAsTextFile(self, path: str, compressionCodecClass: Optional[str] = None) -> None
 |      Save this RDD as a text file, using string representations of elements.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      path : str
 |          path to text file
 |      compressionCodecClass : str, optional
 |          fully qualified classname of the compression codec class
 |          i.e. "org.apache.hadoop.io.compress.GzipCodec" (None by default)
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.textFile`
 |      :meth:`SparkContext.wholeTextFiles`
 |
 |      Examples
 |      --------
 |      >>> import os
 |      >>> import tempfile
 |      >>> from fileinput import input
 |      >>> from glob import glob
 |      >>> with tempfile.TemporaryDirectory() as d1:
 |      ...     path1 = os.path.join(d1, "text_file1")
 |      ...
 |      ...     # Write a temporary text file
 |      ...     sc.parallelize(range(10)).saveAsTextFile(path1)
 |      ...
 |      ...     # Load text file as an RDD
 |      ...     ''.join(sorted(input(glob(path1 + "/part-0000*"))))
 |      '0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n'
 |
 |      Empty lines are tolerated when saving to text files.
 |
 |      >>> with tempfile.TemporaryDirectory() as d2:
 |      ...     path2 = os.path.join(d2, "text2_file2")
 |      ...
 |      ...     # Write another temporary text file
 |      ...     sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(path2)
 |      ...
 |      ...     # Load text file as an RDD
 |      ...     ''.join(sorted(input(glob(path2 + "/part-0000*"))))
 |      '\n\n\nbar\nfoo\n'
 |
 |      Using compressionCodecClass
 |
 |      >>> from fileinput import input, hook_compressed
 |      >>> with tempfile.TemporaryDirectory() as d3:
 |      ...     path3 = os.path.join(d3, "text3")
 |      ...     codec = "org.apache.hadoop.io.compress.GzipCodec"
 |      ...
 |      ...     # Write another temporary text file with specified codec
 |      ...     sc.parallelize(['foo', 'bar']).saveAsTextFile(path3, codec)
 |      ...
 |      ...     # Load text file as an RDD
 |      ...     result = sorted(input(glob(path3 + "/part*.gz"), openhook=hook_compressed))
 |      ...     ''.join([r.decode('utf-8') if isinstance(r, bytes) else r for r in result])
 |      'bar\nfoo\n'
 |
 |  setName(self: 'RDD[T]', name: str) -> 'RDD[T]'
 |      Assign a name to this RDD.
 |
 |      .. versionadded:: 1.0.0
 |
 |      Parameters
 |      ----------
 |      name : str
 |          new name
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          the same :class:`RDD` with name updated
 |
 |      See Also
 |      --------
 |      :meth:`RDD.name`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([1, 2])
 |      >>> rdd.setName('I am an RDD').name()
 |      'I am an RDD'
 |
 |  sortBy(self: 'RDD[T]', keyfunc: Callable[[~T], ForwardRef('S')], ascending: bool = True, numPartitions: Optional[int] = None) -> 'RDD[T]'
 |      Sorts this RDD by the given keyfunc
 |
 |      .. versionadded:: 1.1.0
 |
 |      Parameters
 |      ----------
 |      keyfunc : function
 |          a function to compute the key
 |      ascending : bool, optional, default True
 |          sort the keys in ascending or descending order
 |      numPartitions : int, optional
 |          the number of partitions in new :class:`RDD`
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a new :class:`RDD`
 |
 |      See Also
 |      --------
 |      :meth:`RDD.sortByKey`
 |      :meth:`pyspark.sql.DataFrame.sort`
 |
 |      Examples
 |      --------
 |      >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
 |      >>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
 |      [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
 |      >>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
 |      [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
 |
 |  sortByKey(self: 'RDD[Tuple[K, V]]', ascending: Optional[bool] = True, numPartitions: Optional[int] = None, keyfunc: Callable[[Any], Any] = <function RDD.<lambda> at 0x7d0ad465b880>) -> 'RDD[Tuple[K, V]]'
 |      Sorts this RDD, which is assumed to consist of (key, value) pairs.
 |
 |      .. versionadded:: 0.9.1
 |
 |      Parameters
 |      ----------
 |      ascending : bool, optional, default True
 |          sort the keys in ascending or descending order
 |      numPartitions : int, optional
 |          the number of partitions in new :class:`RDD`
 |      keyfunc : function, optional, default identity mapping
 |          a function to compute the key
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a new :class:`RDD`
 |
 |      See Also
 |      --------
 |      :meth:`RDD.sortBy`
 |      :meth:`pyspark.sql.DataFrame.sort`
 |
 |      Examples
 |      --------
 |      >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
 |      >>> sc.parallelize(tmp).sortByKey().first()
 |      ('1', 3)
 |      >>> sc.parallelize(tmp).sortByKey(True, 1).collect()
 |      [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
 |      >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
 |      [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
 |      >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
 |      >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
 |      >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
 |      [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]
 |
 |  stats(self: 'RDD[NumberOrArray]') -> pyspark.statcounter.StatCounter
 |      Return a :class:`StatCounter` object that captures the mean, variance
 |      and count of the RDD's elements in one operation.
 |
 |      .. versionadded:: 0.9.1
 |
 |      Returns
 |      -------
 |      :class:`StatCounter`
 |          a :class:`StatCounter` capturing the mean, variance and count of all elements
 |
 |      See Also
 |      --------
 |      :meth:`RDD.stdev`
 |      :meth:`RDD.sampleStdev`
 |      :meth:`RDD.variance`
 |      :meth:`RDD.sampleVariance`
 |      :meth:`RDD.histogram`
 |      :meth:`pyspark.sql.DataFrame.stat`
 |
 |  stdev(self: 'RDD[NumberOrArray]') -> float
 |      Compute the standard deviation of this RDD's elements.
 |
 |      .. versionadded:: 0.9.1
 |
 |      Returns
 |      -------
 |      float
 |          the standard deviation of all elements
 |
 |      See Also
 |      --------
 |      :meth:`RDD.stats`
 |      :meth:`RDD.sampleStdev`
 |      :meth:`RDD.variance`
 |      :meth:`RDD.sampleVariance`
 |
 |      Examples
 |      --------
 |      >>> sc.parallelize([1, 2, 3]).stdev()
 |      0.816...
 |
 |  subtract(self: 'RDD[T]', other: 'RDD[T]', numPartitions: Optional[int] = None) -> 'RDD[T]'
 |      Return each value in `self` that is not contained in `other`.
 |
 |      .. versionadded:: 0.9.1
 |
 |      Parameters
 |      ----------
 |      other : :class:`RDD`
 |          another :class:`RDD`
 |      numPartitions : int, optional
 |          the number of partitions in new :class:`RDD`
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` with the elements from this that are not in `other`
 |
 |      See Also
 |      --------
 |      :meth:`RDD.subtractByKey`
 |
 |      Examples
 |      --------
 |      >>> rdd1 = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
 |      >>> rdd2 = sc.parallelize([("a", 3), ("c", None)])
 |      >>> sorted(rdd1.subtract(rdd2).collect())
 |      [('a', 1), ('b', 4), ('b', 5)]
 |
 |  subtractByKey(self: 'RDD[Tuple[K, V]]', other: 'RDD[Tuple[K, Any]]', numPartitions: Optional[int] = None) -> 'RDD[Tuple[K, V]]'
 |      Return each (key, value) pair in `self` that has no pair with matching
 |      key in `other`.
 |
 |      .. versionadded:: 0.9.1
 |
 |      Parameters
 |      ----------
 |      other : :class:`RDD`
 |          another :class:`RDD`
 |      numPartitions : int, optional
 |          the number of partitions in new :class:`RDD`
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` with the pairs from this whose keys are not in `other`
 |
 |      See Also
 |      --------
 |      :meth:`RDD.subtract`
 |
 |      Examples
 |      --------
 |      >>> rdd1 = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
 |      >>> rdd2 = sc.parallelize([("a", 3), ("c", None)])
 |      >>> sorted(rdd1.subtractByKey(rdd2).collect())
 |      [('b', 4), ('b', 5)]
 |
 |  sum(self: 'RDD[NumberOrArray]') -> 'NumberOrArray'
 |      Add up the elements in this RDD.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Returns
 |      -------
 |      float, int, or complex
 |          the sum of all elements
 |
 |      See Also
 |      --------
 |      :meth:`RDD.mean`
 |      :meth:`RDD.sumApprox`
 |
 |      Examples
 |      --------
 |      >>> sc.parallelize([1.0, 2.0, 3.0]).sum()
 |      6.0
 |
 |  sumApprox(self: 'RDD[Union[float, int]]', timeout: int, confidence: float = 0.95) -> pyspark.rdd.BoundedFloat
 |      Approximate operation to return the sum within a timeout
 |      or meet the confidence.
 |
 |      .. versionadded:: 1.2.0
 |
 |      Parameters
 |      ----------
 |      timeout : int
 |          maximum time to wait for the job, in milliseconds
 |      confidence : float
 |          the desired statistical confidence in the result
 |
 |      Returns
 |      -------
 |      :class:`BoundedFloat`
 |          a potentially incomplete result, with error bounds
 |
 |      See Also
 |      --------
 |      :meth:`RDD.sum`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize(range(1000), 10)
 |      >>> r = sum(range(1000))
 |      >>> abs(rdd.sumApprox(1000) - r) / r < 0.05
 |      True
 |
 |  take(self: 'RDD[T]', num: int) -> List[~T]
 |      Take the first num elements of the RDD.
 |
 |      It works by first scanning one partition, and use the results from
 |      that partition to estimate the number of additional partitions needed
 |      to satisfy the limit.
 |
 |      Translated from the Scala implementation in RDD#take().
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      num : int
 |          first number of elements
 |
 |      Returns
 |      -------
 |      list
 |          the first `num` elements
 |
 |      See Also
 |      --------
 |      :meth:`RDD.first`
 |      :meth:`pyspark.sql.DataFrame.take`
 |
 |      Notes
 |      -----
 |      This method should only be used if the resulting array is expected
 |      to be small, as all the data is loaded into the driver's memory.
 |
 |      Examples
 |      --------
 |      >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
 |      [2, 3]
 |      >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
 |      [2, 3, 4, 5, 6]
 |      >>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
 |      [91, 92, 93]
 |
 |  takeOrdered(self: 'RDD[T]', num: int, key: Optional[Callable[[~T], ForwardRef('S')]] = None) -> List[~T]
 |      Get the N elements from an RDD ordered in ascending order or as
 |      specified by the optional key function.
 |
 |      .. versionadded:: 1.0.0
 |
 |      Parameters
 |      ----------
 |      num : int
 |          top N
 |      key : function, optional
 |          a function used to generate key for comparing
 |
 |      Returns
 |      -------
 |      list
 |          the top N elements
 |
 |      See Also
 |      --------
 |      :meth:`RDD.top`
 |      :meth:`RDD.max`
 |      :meth:`RDD.min`
 |
 |      Notes
 |      -----
 |      This method should only be used if the resulting array is expected
 |      to be small, as all the data is loaded into the driver's memory.
 |
 |      Examples
 |      --------
 |      >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
 |      [1, 2, 3, 4, 5, 6]
 |      >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
 |      [10, 9, 7, 6, 5, 4]
 |      >>> sc.emptyRDD().takeOrdered(3)
 |      []
 |
 |  takeSample(self: 'RDD[T]', withReplacement: bool, num: int, seed: Optional[int] = None) -> List[~T]
 |      Return a fixed-size sampled subset of this RDD.
 |
 |      .. versionadded:: 1.3.0
 |
 |      Parameters
 |      ----------
 |      withReplacement : bool
 |          whether sampling is done with replacement
 |      num : int
 |          size of the returned sample
 |      seed : int, optional
 |          random seed
 |
 |      Returns
 |      -------
 |      list
 |          a fixed-size sampled subset of this :class:`RDD` in an array
 |
 |      See Also
 |      --------
 |      :meth:`RDD.sample`
 |
 |      Notes
 |      -----
 |      This method should only be used if the resulting array is expected
 |      to be small, as all the data is loaded into the driver's memory.
 |
 |      Examples
 |      --------
 |      >>> import sys
 |      >>> rdd = sc.parallelize(range(0, 10))
 |      >>> len(rdd.takeSample(True, 20, 1))
 |      20
 |      >>> len(rdd.takeSample(False, 5, 2))
 |      5
 |      >>> len(rdd.takeSample(False, 15, 3))
 |      10
 |      >>> sc.range(0, 10).takeSample(False, sys.maxsize)
 |      Traceback (most recent call last):
 |          ...
 |      ValueError: Sample size cannot be greater than ...
 |
 |  toDF(self: 'RDD[Any]', schema: Optional[Any] = None, sampleRatio: Optional[float] = None) -> 'DataFrame'
 |
 |  toDebugString(self) -> Optional[bytes]
 |      A description of this RDD and its recursive dependencies for debugging.
 |
 |      .. versionadded:: 1.0.0
 |
 |      Returns
 |      -------
 |      bytes
 |          debugging information of this :class:`RDD`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.range(5)
 |      >>> rdd.toDebugString()
 |      b'...PythonRDD...ParallelCollectionRDD...'
 |
 |  toLocalIterator(self: 'RDD[T]', prefetchPartitions: bool = False) -> Iterator[~T]
 |      Return an iterator that contains all of the elements in this RDD.
 |      The iterator will consume as much memory as the largest partition in this RDD.
 |      With prefetch it may consume up to the memory of the 2 largest partitions.
 |
 |      .. versionadded:: 1.3.0
 |
 |      Parameters
 |      ----------
 |      prefetchPartitions : bool, optional
 |          If Spark should pre-fetch the next partition
 |          before it is needed.
 |
 |      Returns
 |      -------
 |      :class:`collections.abc.Iterator`
 |          an iterator that contains all of the elements in this :class:`RDD`
 |
 |      See Also
 |      --------
 |      :meth:`RDD.collect`
 |      :meth:`pyspark.sql.DataFrame.toLocalIterator`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize(range(10))
 |      >>> [x for x in rdd.toLocalIterator()]
 |      [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
 |
 |  top(self: 'RDD[T]', num: int, key: Optional[Callable[[~T], ForwardRef('S')]] = None) -> List[~T]
 |      Get the top N elements from an RDD.
 |
 |      .. versionadded:: 1.0.0
 |
 |      Parameters
 |      ----------
 |      num : int
 |          top N
 |      key : function, optional
 |          a function used to generate key for comparing
 |
 |      Returns
 |      -------
 |      list
 |          the top N elements
 |
 |      See Also
 |      --------
 |      :meth:`RDD.takeOrdered`
 |      :meth:`RDD.max`
 |      :meth:`RDD.min`
 |
 |      Notes
 |      -----
 |      This method should only be used if the resulting array is expected
 |      to be small, as all the data is loaded into the driver's memory.
 |
 |      It returns the list sorted in descending order.
 |
 |      Examples
 |      --------
 |      >>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
 |      [12]
 |      >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2)
 |      [6, 5]
 |      >>> sc.parallelize([10, 4, 2, 12, 3]).top(3, key=str)
 |      [4, 3, 2]
 |
 |  treeAggregate(self: 'RDD[T]', zeroValue: ~U, seqOp: Callable[[~U, ~T], ~U], combOp: Callable[[~U, ~U], ~U], depth: int = 2) -> ~U
 |      Aggregates the elements of this RDD in a multi-level tree
 |      pattern.
 |
 |      .. versionadded:: 1.3.0
 |
 |      Parameters
 |      ----------
 |      zeroValue : U
 |          the initial value for the accumulated result of each partition
 |      seqOp : function
 |          a function used to accumulate results within a partition
 |      combOp : function
 |          an associative function used to combine results from different partitions
 |      depth : int, optional, default 2
 |          suggested depth of the tree
 |
 |      Returns
 |      -------
 |      U
 |          the aggregated result
 |
 |      See Also
 |      --------
 |      :meth:`RDD.aggregate`
 |      :meth:`RDD.treeReduce`
 |
 |      Examples
 |      --------
 |      >>> add = lambda x, y: x + y
 |      >>> rdd = sc.parallelize([-5, -4, -3, -2, -1, 1, 2, 3, 4], 10)
 |      >>> rdd.treeAggregate(0, add, add)
 |      -5
 |      >>> rdd.treeAggregate(0, add, add, 1)
 |      -5
 |      >>> rdd.treeAggregate(0, add, add, 2)
 |      -5
 |      >>> rdd.treeAggregate(0, add, add, 5)
 |      -5
 |      >>> rdd.treeAggregate(0, add, add, 10)
 |      -5
 |
 |  treeReduce(self: 'RDD[T]', f: Callable[[~T, ~T], ~T], depth: int = 2) -> ~T
 |      Reduces the elements of this RDD in a multi-level tree pattern.
 |
 |      .. versionadded:: 1.3.0
 |
 |      Parameters
 |      ----------
 |      f : function
 |          the reduce function
 |      depth : int, optional, default 2
 |          suggested depth of the tree (default: 2)
 |
 |      Returns
 |      -------
 |      T
 |          the aggregated result
 |
 |      See Also
 |      --------
 |      :meth:`RDD.reduce`
 |      :meth:`RDD.aggregate`
 |      :meth:`RDD.treeAggregate`
 |
 |      Examples
 |      --------
 |      >>> add = lambda x, y: x + y
 |      >>> rdd = sc.parallelize([-5, -4, -3, -2, -1, 1, 2, 3, 4], 10)
 |      >>> rdd.treeReduce(add)
 |      -5
 |      >>> rdd.treeReduce(add, 1)
 |      -5
 |      >>> rdd.treeReduce(add, 2)
 |      -5
 |      >>> rdd.treeReduce(add, 5)
 |      -5
 |      >>> rdd.treeReduce(add, 10)
 |      -5
 |
 |  union(self: 'RDD[T]', other: 'RDD[U]') -> 'RDD[Union[T, U]]'
 |      Return the union of this RDD and another one.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Parameters
 |      ----------
 |      other : :class:`RDD`
 |          another :class:`RDD`
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          the union of this :class:`RDD` and another one
 |
 |      See Also
 |      --------
 |      :meth:`SparkContext.union`
 |      :meth:`pyspark.sql.DataFrame.union`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([1, 1, 2, 3])
 |      >>> rdd.union(rdd).collect()
 |      [1, 1, 2, 3, 1, 1, 2, 3]
 |
 |  unpersist(self: 'RDD[T]', blocking: bool = False) -> 'RDD[T]'
 |      Mark the RDD as non-persistent, and remove all blocks for it from
 |      memory and disk.
 |
 |      .. versionadded:: 0.9.1
 |
 |      Parameters
 |      ----------
 |      blocking : bool, optional, default False
 |          whether to block until all blocks are deleted
 |
 |          .. versionadded:: 3.0.0
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          The same :class:`RDD`
 |
 |      See Also
 |      --------
 |      :meth:`RDD.cache`
 |      :meth:`RDD.persist`
 |      :meth:`RDD.getStorageLevel`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.range(5)
 |      >>> rdd.is_cached
 |      False
 |      >>> _ = rdd.unpersist()
 |      >>> rdd.is_cached
 |      False
 |      >>> _ = rdd.cache()
 |      >>> rdd.is_cached
 |      True
 |      >>> _ = rdd.unpersist()
 |      >>> rdd.is_cached
 |      False
 |      >>> _ = rdd.unpersist()
 |
 |  values(self: 'RDD[Tuple[K, V]]') -> 'RDD[V]'
 |      Return an RDD with the values of each tuple.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` only containing the values
 |
 |      See Also
 |      --------
 |      :meth:`RDD.keys`
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.parallelize([(1, 2), (3, 4)]).values()
 |      >>> rdd.collect()
 |      [2, 4]
 |
 |  variance(self: 'RDD[NumberOrArray]') -> float
 |      Compute the variance of this RDD's elements.
 |
 |      .. versionadded:: 0.9.1
 |
 |      Returns
 |      -------
 |      float
 |          the variance of all elements
 |
 |      See Also
 |      --------
 |      :meth:`RDD.stats`
 |      :meth:`RDD.sampleVariance`
 |      :meth:`RDD.stdev`
 |      :meth:`RDD.sampleStdev`
 |
 |      Examples
 |      --------
 |      >>> sc.parallelize([1, 2, 3]).variance()
 |      0.666...
 |
 |  withResources(self: 'RDD[T]', profile: pyspark.resource.profile.ResourceProfile) -> 'RDD[T]'
 |      Specify a :class:`pyspark.resource.ResourceProfile` to use when calculating this RDD.
 |      This is only supported on certain cluster managers and currently requires dynamic
 |      allocation to be enabled. It will result in new executors with the resources specified
 |      being acquired to calculate the RDD.
 |
 |      .. versionadded:: 3.1.0
 |
 |      Parameters
 |      ----------
 |      profile : :class:`pyspark.resource.ResourceProfile`
 |          a resource profile
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          the same :class:`RDD` with user specified profile
 |
 |      See Also
 |      --------
 |      :meth:`RDD.getResourceProfile`
 |
 |      Notes
 |      -----
 |      This API is experimental
 |
 |  zip(self: 'RDD[T]', other: 'RDD[U]') -> 'RDD[Tuple[T, U]]'
 |      Zips this RDD with another one, returning key-value pairs with the
 |      first element in each RDD second element in each RDD, etc. Assumes
 |      that the two RDDs have the same number of partitions and the same
 |      number of elements in each partition (e.g. one was made through
 |      a map on the other).
 |
 |      .. versionadded:: 1.0.0
 |
 |      Parameters
 |      ----------
 |      other : :class:`RDD`
 |          another :class:`RDD`
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` containing the zipped key-value pairs
 |
 |      See Also
 |      --------
 |      :meth:`RDD.zipWithIndex`
 |      :meth:`RDD.zipWithUniqueId`
 |
 |      Examples
 |      --------
 |      >>> rdd1 = sc.parallelize(range(0,5))
 |      >>> rdd2 = sc.parallelize(range(1000, 1005))
 |      >>> rdd1.zip(rdd2).collect()
 |      [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
 |
 |  zipWithIndex(self: 'RDD[T]') -> 'RDD[Tuple[T, int]]'
 |      Zips this RDD with its element indices.
 |
 |      The ordering is first based on the partition index and then the
 |      ordering of items within each partition. So the first item in
 |      the first partition gets index 0, and the last item in the last
 |      partition receives the largest index.
 |
 |      This method needs to trigger a spark job when this RDD contains
 |      more than one partitions.
 |
 |      .. versionadded:: 1.2.0
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` containing the zipped key-index pairs
 |
 |      See Also
 |      --------
 |      :meth:`RDD.zip`
 |      :meth:`RDD.zipWithUniqueId`
 |
 |      Examples
 |      --------
 |      >>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()
 |      [('a', 0), ('b', 1), ('c', 2), ('d', 3)]
 |
 |  zipWithUniqueId(self: 'RDD[T]') -> 'RDD[Tuple[T, int]]'
 |      Zips this RDD with generated unique Long ids.
 |
 |      Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
 |      n is the number of partitions. So there may exist gaps, but this
 |      method won't trigger a spark job, which is different from
 |      :meth:`zipWithIndex`.
 |
 |      .. versionadded:: 1.2.0
 |
 |      Returns
 |      -------
 |      :class:`RDD`
 |          a :class:`RDD` containing the zipped key-UniqueId pairs
 |
 |      See Also
 |      --------
 |      :meth:`RDD.zip`
 |      :meth:`RDD.zipWithIndex`
 |
 |      Examples
 |      --------
 |      >>> sc.parallelize(["a", "b", "c", "d", "e"], 3).zipWithUniqueId().collect()
 |      [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)]
 |
 |  ----------------------------------------------------------------------
 |  Readonly properties inherited from RDD:
 |
 |  context
 |      The :class:`SparkContext` that this RDD was created on.
 |
 |      .. versionadded:: 0.7.0
 |
 |      Returns
 |      -------
 |      :class:`SparkContext`
 |          The :class:`SparkContext` that this RDD was created on
 |
 |      Examples
 |      --------
 |      >>> rdd = sc.range(5)
 |      >>> rdd.context
 |      <SparkContext ...>
 |      >>> rdd.context is sc
 |      True
 |
 |  ----------------------------------------------------------------------
 |  Data descriptors inherited from RDD:
 |
 |  __dict__
 |      dictionary for instance variables
 |
 |  __weakref__
 |      list of weak references to the object
 |
 |  ----------------------------------------------------------------------
 |  Class methods inherited from typing.Generic:
 |
 |  __class_getitem__(...)
 |      Parameterizes a generic class.
 |
 |      At least, parameterizing a generic class is the *main* thing this
 |      method does. For example, for some generic class `Foo`, this is called
 |      when we do `Foo[int]` - there, with `cls=Foo` and `params=int`.
 |
 |      However, note that this method is also called when defining generic
 |      classes in the first place with `class Foo[T]: ...`.
 |
 |  __init_subclass__(...)
 |      Function to initialize subclasses.
# Veamos en cuántas particiones se dividirá el RDD utilizando la función getNumPartitions()
xrangeRDD.getNumPartitions()
8

(3c): Restar uno de cada valor usando map#

Hasta ahora, hemos creado un conjunto de datos distribuidos que se divide en muchas particiones, donde cada partición se almacena en una sola máquina en nuestro clúster. Veamos qué sucede cuando realizamos una operación básica en el conjunto de datos. Muchas operaciones útiles de análisis de datos se pueden especificar como «hacer algo a cada elemento en el conjunto de datos». Estas operaciones paralelas de datos son convenientes porque cada elemento en el conjunto de datos se puede procesar individualmente: la operación en una entrada no afecta las operaciones en ninguna de las otras entradas. Por lo tanto, Spark puede paralelizar la operación.#

map(f), la transformación más común de Spark, es un ejemplo de esto: aplica una función f a cada elemento en el conjunto de datos y genera el conjunto de datos resultante. Cuando ejecutas map() en un conjunto de datos, se lanza una sola etapa de tareas. Una etapa es un grupo de tareas que realizan el mismo cálculo, pero con datos de entrada diferentes. Se lanza una tarea para cada partición, como se muestra en el ejemplo a continuación. Una tarea es una unidad de ejecución que se ejecuta en una sola máquina. Cuando ejecutamos map(f) dentro de una partición, una nueva tarea aplica f a todas las entradas en una partición en particular y genera una nueva partición. En este ejemplo, el conjunto de datos se divide en cuatro particiones, por lo que se lanzan cuatro tareas map().#

tasks

La figura a continuación muestra cómo funcionaría esto en el conjunto de datos más pequeño de las figuras anteriores. Ten en cuenta que se lanza una tarea para cada partición.#

foo

Al aplicar la transformación map(), cada elemento en el RDD principal se asignará a un elemento en el nuevo RDD. Entonces, si el RDD principal tiene veinte elementos, el nuevo RDD también tendrá veinte elementos.#

Ahora usaremos map() para restar uno de cada valor en el RDD base que acabamos de crear. Primero, definimos una función de Python llamada sub() que restará uno del entero de entrada. En segundo lugar, pasaremos cada elemento en el RDD base a una transformación map() que aplica la función sub() a cada elemento. Y finalmente, imprimimos la jerarquía de transformación del RDD usando toDebugString().#

# Crear subfunción para restar 1
def sub(value):
    """"Subtracts one from `value`.

    Args:
       value (int): A number.

    Returns:
        int: `value` minus one.
    """
    return (value - 1)

# Transformar xrangeRDD a través de la transformación map usando la subfunción
# Debido a que map es una transformación y Spark usa lazy evaluation, ningún job, stage,
# o tareas serán lanzadas cuando ejecutemos este código.
subRDD = xrangeRDD.map(sub)

# Veamos la jerarquía de transformación RDD
print(subRDD.toDebugString())
b'(8) PythonRDD[5] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289 []'

(3d) Realizar la acción collect para ver los resultados#

Para ver una lista de elementos decrementados en uno, necesitamos crear una nueva lista en el driver a partir de los datos distribuidos en los nodos ejecutores. Para hacer esto, llamamos al método collect() en nuestro RDD. collect() se usa a menudo después de un filtro u otra operación para asegurarnos de que solo estamos devolviendo una pequeña cantidad de datos al driver. Esto se hace porque los datos devueltos al driver deben caber en la memoria disponible del driver. Si no, el driver fallará.#

El método collect() es la primera operación de acción que hemos encontrado. Las operaciones de acción hacen que Spark realice las operaciones de transformación (perezosa) necesarias para calcular el RDD devuelto por la acción. En nuestro ejemplo, esto significa que ahora se lanzarán tareas para realizar las operaciones de parallelize, map y collect.#

En este ejemplo, el conjunto de datos se divide en cuatro particiones, por lo que se lanzan cuatro tareas collect(). Cada tarea recoge las entradas en su partición y envía el resultado al SparkContext, que crea una lista de los valores, como se muestra en la figura a continuación.#

collect

Las figuras anteriores mostraban lo que sucedería si ejecutáramos collect() en un conjunto de datos de ejemplo pequeño con solo cuatro particiones.#

Ahora ejecutemos collect() en subRDD.#

# Recojamos los datos
print(subRDD.collect())


(3d) Realizar la acción count para ver los conteos#

Uno de los trabajos más básicos que podemos ejecutar es el trabajo count(), que contará el número de elementos en un RDD usando la acción count(). Dado que map() crea un nuevo RDD con el mismo número de elementos que el RDD inicial, esperamos que aplicar count() a cada RDD devuelva el mismo resultado.#

Ten en cuenta que, debido a que count() es una operación de acción, si no hubiéramos realizado una acción con collect(), Spark ahora realizaría las operaciones de transformación cuando ejecutáramos count().#

Cada tarea cuenta las entradas en su partición y envía el resultado a tu SparkContext, que suma todos los conteos. La figura a continuación muestra lo que sucedería si ejecutáramos count() en un conjunto de datos de ejemplo pequeño con solo cuatro particiones.#

count

print(xrangeRDD.count())
print(subRDD.count())
10000
10000

(3e) Aplicar la transformación filter y ver los resultados con collect#

A continuación, crearemos un nuevo RDD que solo contenga los valores menores que diez utilizando la operación de datos paralelos filter(f). El método filter(f) es una operación de transformación que crea un nuevo RDD a partir del RDD de entrada aplicando la función de filtro f a cada elemento en el RDD principal y solo pasando aquellos elementos donde la función de filtro devuelve True. Los elementos que no devuelvan True serán descartados. Al igual que map(), filter se puede aplicar individualmente a cada entrada en el conjunto de datos, por lo que se paraleliza fácilmente usando Spark.#

La figura a continuación muestra cómo funcionaría esto en el conjunto de datos pequeño de cuatro particiones.#

filter

Para filtrar este conjunto de datos, definiremos una función llamada ten(), que devuelve True si la entrada es menor que 10 y False en caso contrario. Esta función se pasará a la transformación filter() como la función de filtro f.#

Para ver la lista filtrada de elementos menores que diez, necesitamos crear una nueva lista en el driver a partir de los datos distribuidos en los nodos ejecutores. Usamos el método collect() para devolver una lista que contenga todos los elementos en este RDD filtrado al programa driver.#

# Definir una función para filtrar un único valor
def ten(value):
    """Return whether value is below ten.

    Args:
        value (int): A number.

    Returns:
        bool: Whether `value` is less than ten.
    """
    if (value < 10):
        return True
    else:
        return False
# La función diez también podría escribirse de forma concisa como: def diez(valor): return valor < 10

# Pasa la función diez a la transformación filtro
# El filtro es una transformación por lo que no se ejecutan tareas
filteredRDD = subRDD.filter(ten)

# Ver los resultados usando collect()
# Recopilar es una acción y hace que se ejecute la transformación del filtro
print(filteredRDD.collect())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Part 4: Lambda Functions#

(4a) Uso de funciones lambda() en Python#

Python admite el uso de pequeñas funciones anónimas de una sola línea que no están vinculadas a un nombre en tiempo de ejecución. Tomadas de LISP, estas funciones lambda se pueden usar donde se requieran objetos de función. Están restringidas sintácticamente a una sola expresión. Recuerda que las funciones lambda son una cuestión de estilo y nunca son necesarias: semánticamente, son solo azúcar sintáctica para una definición de función normal. Siempre puedes definir una función normal por separado, pero usar una función lambda() es una forma equivalente y más compacta de codificación. Idealmente, deberías considerar usar funciones lambda donde quieras encapsular código no reutilizable sin llenar tu código con funciones de una sola línea.#

Aquí, en lugar de definir una función separada para la transformación filter(), usaremos una función lambda() en línea.#

lambdaRDD = subRDD.filter(lambda x: x < 10)
lambdaRDD.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# Recojamos los valores pares menores que 10
evenRDD = lambdaRDD.filter(lambda x: x % 2 == 0)
evenRDD.collect()
[0, 2, 4, 6, 8]

Part 5: Additional RDD actions#

(5a) Otras acciones comunes#

Vamos a investigar las acciones adicionales: first(), take(), top(), takeOrdered() y reduce()#

Una cosa útil para hacer cuando tenemos un nuevo conjunto de datos es observar las primeras entradas para obtener una idea aproximada de la información disponible. En Spark, podemos hacerlo usando las acciones first(), take(), top() y takeOrdered(). Ten en cuenta que para las acciones first() y take(), los elementos que se devuelven dependen de cómo esté particionado el RDD.#

En lugar de usar la acción collect(), podemos usar la acción take(n) para devolver los primeros n elementos del RDD. La acción first() devuelve el primer elemento de un RDD y es equivalente a take(1).#

La acción takeOrdered() devuelve los primeros n elementos del RDD, usando su orden natural o un comparador personalizado. La ventaja clave de usar takeOrdered() en lugar de first() o take() es que takeOrdered() devuelve un resultado determinista, mientras que las otras dos acciones pueden devolver resultados diferentes, dependiendo del número de particiones o del entorno de ejecución. takeOrdered() devuelve la lista ordenada en orden ascendente. La acción top() es similar a takeOrdered(), excepto que devuelve la lista en orden descendente.#

La acción reduce() reduce los elementos de un RDD a un solo valor aplicando una función que toma dos parámetros y devuelve un solo valor. La función debe ser conmutativa y asociativa, ya que reduce() se aplica a nivel de partición y luego nuevamente para agregar resultados de particiones. Si estas reglas no se cumplen, los resultados de reduce() serán inconsistentes. Reducir localmente en las particiones hace que reduce() sea muy eficiente.#

# Obtengamos el primer elemento
print(filteredRDD.first())
# Los primeros 4
print(filteredRDD.take(4))
# Tenga en cuenta que está bien tomar más elementos que el RDD tiene
print(filteredRDD.take(12))
0
[0, 1, 2, 3]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# Recuperar los tres elementos más pequeños
print(filteredRDD.takeOrdered(3))
# Recuperar los cinco elementos más grandes
print(filteredRDD.top(5))
[0, 1, 2]
[9, 8, 7, 6, 5]
# Pasa una función lambda a takeOrdered para invertir el orden
filteredRDD.takeOrdered(4, lambda s: -s)
[9, 8, 7, 6]
# Obtener la función add de Python
from operator import add
# Suma eficientemente el RDD usando reduce
print(filteredRDD.reduce(add))
# Suma usando reduce con una función lambda
print(filteredRDD.reduce(lambda a, b: a + b))
# Tenga en cuenta que la resta no es a la vez asociativa y conmutativa
print(filteredRDD.reduce(lambda a, b: a - b))
print(filteredRDD.repartition(4).reduce(lambda a, b: a - b))
# Mientras que la suma es
print(filteredRDD.repartition(4).reduce(lambda a, b: a + b))
45
45
-45
-45
45

(5b) Acciones avanzadas#

Aquí hay dos acciones adicionales que son útiles para recuperar información de un RDD: takeSample() y countByValue()#

La acción takeSample() devuelve un array con una muestra aleatoria de elementos del conjunto de datos. Toma un argumento withReplacement, que especifica si está permitido seleccionar aleatoriamente el mismo elemento varias veces del RDD principal (por lo que cuando withReplacement=True, puedes obtener el mismo elemento varias veces). También toma un parámetro opcional seed que te permite especificar un valor de semilla para el generador de números aleatorios, de modo que se puedan obtener resultados reproducibles.#

La acción countByValue() devuelve el conteo de cada valor único en el RDD como un diccionario que asigna valores a conteos.#

# takeSample reutilización de elementos
print(filteredRDD.takeSample(withReplacement=True, num=6))
# takeSample without reuse
print(filteredRDD.takeSample(withReplacement=False, num=6))
[4, 7, 5, 8, 4, 9]
[4, 0, 8, 6, 2, 3]
# Establecer semilla para la previsibilidad
print(filteredRDD.takeSample(withReplacement=False, num=6, seed=500))
# Intenta volver a ejecutar esta celda y la celda de arriba -- los resultados de esta celda permanecerán constantes
# Use ctrl-enter para correr sin moverse a la siguiente celda
[6, 4, 0, 1, 3, 5]
# Crear nuevo RDD base para mostrar countByValue
repetitiveRDD = sc.parallelize([1, 2, 3, 1, 2, 3, 1, 2, 1, 2, 3, 3, 3, 4, 5, 4, 6])
print(repetitiveRDD.countByValue())
defaultdict(<class 'int'>, {1: 4, 2: 4, 3: 5, 4: 2, 5: 1, 6: 1})

Part 6: Additional RDD transformations#

(6a) flatMap#

Al realizar una transformación map() utilizando una función, a veces la función devolverá más (o menos) de un elemento. Nos gustaría que el nuevo RDD consistiera en los elementos producidos por la función. Simplemente aplicar una transformación map() generaría un nuevo RDD compuesto de iteradores. Cada iterador podría tener cero o más elementos. En cambio, a menudo queremos un RDD que consista en los valores contenidos en esos iteradores. La solución es usar una transformación flatMap(). flatMap() es similar a map(), excepto que con flatMap() cada elemento de entrada se puede asignar a cero o más elementos de salida.#

Para demostrar flatMap(), primero emitiremos una palabra junto con su plural, y luego un rango que crece en longitud con cada operación subsiguiente.#

# Vamos a crear un nuevo RDD base a partir del cual trabajar
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)

# Use map
singularAndPluralWordsRDDMap = wordsRDD.map(lambda x: (x, x + 's'))
# Use flatMap
singularAndPluralWordsRDD = wordsRDD.flatMap(lambda x: (x, x + 's'))

# Ver los resultados
print(singularAndPluralWordsRDDMap.collect())
print(singularAndPluralWordsRDD.collect())
# Ver el número de elementos en el RDD
print(singularAndPluralWordsRDDMap.count())
print(singularAndPluralWordsRDD.count())
[('cat', 'cats'), ('elephant', 'elephants'), ('rat', 'rats'), ('rat', 'rats'), ('cat', 'cats')]
['cat', 'cats', 'elephant', 'elephants', 'rat', 'rats', 'rat', 'rats', 'cat', 'cats']
5
10
simpleRDD = sc.parallelize([2, 3, 4])
print(simpleRDD.map(lambda x: range(1, x)).collect())
print(simpleRDD.flatMap(lambda x: range(1, x)).collect())
[range(1, 2), range(1, 3), range(1, 4)]
[1, 1, 2, 1, 2, 3]

(6b) groupByKey y reduceByKey#

Vamos a investigar las transformaciones adicionales: groupByKey() y reduceByKey().#

Ambas transformaciones operan en RDDs de pares. Un RDD de pares es un RDD donde cada elemento es una tupla de pares (clave, valor). Por ejemplo, sc.parallelize([('a', 1), ('a', 2), ('b', 1)]) crearía un RDD de pares donde las claves son “a”, “a”, “b” y los valores son 1, 2, 1.#

La transformación reduceByKey() reúne pares que tienen la misma clave y aplica una función a dos valores asociados a la vez. reduceByKey() opera aplicando primero la función dentro de cada partición por clave y luego a través de las particiones.#

Si bien las transformaciones groupByKey() y reduceByKey() a menudo se pueden usar para resolver el mismo problema y producirán la misma respuesta, la transformación reduceByKey() funciona mucho mejor para conjuntos de datos distribuidos grandes. Esto se debe a que Spark sabe que puede combinar la salida con una clave común en cada partición antes de reorganizar (redistribuir) los datos entre los nodos. Solo usa groupByKey() si la operación no se beneficiaría de reducir los datos antes de que ocurra la reorganización.#

Observa el diagrama a continuación para entender cómo funciona reduceByKey. Observa cómo se combinan los pares en la misma máquina con la misma clave (usando la función lambda pasada a reduceByKey) antes de que los datos se redistribuyan. Luego, la función lambda se llama nuevamente para reducir todos los valores de cada partición y producir un resultado final.#

reduceByKey() figure

Por otro lado, al usar la transformación groupByKey(), todos los pares clave-valor se reorganizan, lo que provoca que se transfiera una gran cantidad de datos innecesarios a través de la red.#

Para determinar a qué máquina reorganizar un par, Spark llama a una función de particionamiento en la clave del par. Spark transfiere datos al disco cuando hay más datos reorganizados en una sola máquina ejecutora de los que pueden caber en memoria. Sin embargo, escribe los datos en el disco una clave a la vez, por lo que si una sola clave tiene más pares clave-valor de los que pueden caber en memoria, se produce una excepción de falta de memoria. Esto se manejará de manera más elegante en una versión posterior de Spark para que el trabajo aún pueda continuar, pero aún así debe evitarse. Cuando Spark necesita escribir en disco, el rendimiento se ve gravemente afectado.#

groupByKey() figure

A medida que tu conjunto de datos crece, la diferencia en la cantidad de datos que necesitan ser reorganizados, entre las transformaciones reduceByKey() y groupByKey(), se vuelve cada vez más exagerada.#

Aquí hay más transformaciones que se prefieren sobre groupByKey():#

  • combineByKey() se puede usar cuando estás combinando elementos pero tu tipo de retorno difiere de tu tipo de valor de entrada.

  • foldByKey() combina los valores para cada clave utilizando una función asociativa y un «valor cero» neutral.

Ahora repasemos un ejemplo simple de groupByKey() y reduceByKey().#

pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])
# mapValues sólo se utiliza para mejorar el formato de impresión
print(pairRDD.groupByKey().mapValues(lambda x: list(x)).collect())

# Diferentes formas de sumar por tecla
print(pairRDD.groupByKey().map(lambda kv: (kv[0], sum(kv[1]))).collect())
# Usando mapValues, que se recomienda cuando la clave no cambia
print(pairRDD.groupByKey().mapValues(lambda x: sum(x)).collect())
# reduceByKey es más eficiente / escalable
print(pairRDD.reduceByKey(add).collect())
[('a', [1, 2]), ('b', [1])]
[('a', 3), ('b', 1)]
[('a', 3), ('b', 1)]
[('a', 3), ('b', 1)]

(6c) Transformaciones avanzadas [Opcional]#

Vamos a investigar las transformaciones avanzadas: mapPartitions() y mapPartitionsWithIndex()#

La transformación mapPartitions() utiliza una función que toma un iterador (para los elementos en esa partición específica) y devuelve un iterador. La función se aplica partición por partición.#

La transformación mapPartitionsWithIndex() utiliza una función que toma un índice de partición (piensa en esto como el número de partición) y un iterador (para los elementos en esa partición específica). Para cada par (índice, iterador) de partición, la función devuelve una tupla del mismo número de índice de partición y un iterador de los elementos transformados en esa partición.#

# mapPartitions toma una función que toma un iterador y devuelve un iterador
print(wordsRDD.collect())
itemsRDD = wordsRDD.mapPartitions(lambda iterator: [','.join(iterator)])
print(itemsRDD.collect())
['cat', 'elephant', 'rat', 'rat', 'cat']
['cat', 'elephant', 'rat', 'rat,cat']
itemsByPartRDD = wordsRDD.mapPartitionsWithIndex(lambda index, iterator: [(index, list(iterator))])
# Podemos ver que tres de los trabajadores (particiones) tienen un elemento y el cuarto trabajador tiene dos
# elementos, aunque las cosas no auguran nada bueno para la rata...
print(itemsByPartRDD.collect())
# Reejecutar sin devolver una lista (actúa más como flatMap)
itemsByPartRDD = wordsRDD.mapPartitionsWithIndex(lambda index, iterator: (index, list(iterator)))
print(itemsByPartRDD.collect())
[(0, ['cat']), (1, ['elephant']), (2, ['rat']), (3, ['rat', 'cat'])]
[0, ['cat'], 1, ['elephant'], 2, ['rat'], 3, ['rat', 'cat']]

Part 7: Caching RDDs and storage options#

(7a) Cacheo de RDDs#

Para mayor eficiencia, Spark mantiene tus RDDs en memoria. Al mantener el contenido en memoria, Spark puede acceder rápidamente a los datos. Sin embargo, la memoria es limitada, por lo que si intentas mantener demasiados RDDs en memoria, Spark eliminará automáticamente los RDDs de la memoria para hacer espacio para nuevos RDDs. Si luego haces referencia a uno de los RDDs, Spark lo recreará automáticamente, pero eso lleva tiempo.#

Entonces, si planeas usar un RDD más de una vez, deberías decirle a Spark que cachee ese RDD. Puedes usar la operación cache() para mantener el RDD en memoria. Sin embargo, si cacheas demasiados RDDs y Spark se queda sin memoria, eliminará primero el RDD menos utilizado recientemente (LRU). Nuevamente, el RDD se recreará automáticamente cuando se acceda a él.#

Puedes comprobar si un RDD está en caché utilizando el atributo is_cached, y puedes ver tu RDD en caché en la sección «Storage» de la interfaz web de Spark. Si haces clic en el nombre del RDD, puedes ver más información sobre dónde se almacena el RDD.#

# Nombrar el RDD
filteredRDD.setName('My Filtered RDD')
# Cache del RDD
filteredRDD.cache()
# Está en caché?
print(filteredRDD.is_cached)
True

(7b) Unpersist y opciones de almacenamiento#

Spark administra automáticamente los RDDs almacenados en memoria y los guardará en disco si se queda sin memoria. Para mayor eficiencia, una vez que hayas terminado de usar un RDD, puedes opcionalmente decirle a Spark que deje de almacenarlo en memoria utilizando el método unpersist() del RDD para informar a Spark que ya no necesitas el RDD en memoria.#

Puedes ver el conjunto de transformaciones que se aplicaron para crear un RDD utilizando el método toDebugString(), que proporcionará información de almacenamiento, y puedes consultar directamente la información de almacenamiento actual de un RDD utilizando la operación getStorageLevel().#

Avanzado: Spark proporciona muchas más opciones para gestionar cómo se almacenan los RDDs en memoria o incluso guardarlos en disco. Puedes explorar la API para la operación persist() de RDD utilizando el comando help() de Python. La operación persist(), opcionalmente, toma un objeto StorageLevel de pySpark.#

# Tenga en cuenta que toDebugString también proporciona información de almacenamiento
print(filteredRDD.toDebugString())
b'(8) My Filtered RDD PythonRDD[7] at collect at <ipython-input-36-bdec22de0542>:23 [Memory Serialized 1x Replicated]\n |  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289 [Memory Serialized 1x Replicated]'
# Si ya hemos terminado con el RDD podemos desempaquetarlo para que su memoria pueda ser recuperada
filteredRDD.unpersist()
# Nivel de almacenamiento para un RDD sin caché
print(filteredRDD.getStorageLevel())
filteredRDD.cache()
# Nivel de almacenamiento para un RDD en caché
print(filteredRDD.getStorageLevel())
Serialized 1x Replicated
Memory Serialized 1x Replicated