MapReduce#

Tomado y adaptado del curso de Big Data por Raúl Ramos Pollán GitHub.

Procesamiento de información con MAP-REDUCE#

pip install mrjob
Collecting mrjob
  Downloading mrjob-0.7.4-py2.py3-none-any.whl.metadata (7.3 kB)
Requirement already satisfied: PyYAML>=3.10 in /usr/local/lib/python3.12/dist-packages (from mrjob) (6.0.2)
Downloading mrjob-0.7.4-py2.py3-none-any.whl (439 kB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 439.6/439.6 kB 6.5 MB/s eta 0:00:00
?25hInstalling collected packages: mrjob
Successfully installed mrjob-0.7.4

Un ejemplo#

Tenemos un log de la siguiente manera

2012-01-01 09:08 BOG Libros 88.56 Discover
2012-01-01 09:52 BGA Libros 62.41 Discover
2012-01-01 10:08 MED Musica 93.37 Visa
2012-01-01 10:58 MED Musica 395.93 Discover
2012-01-01 14:38 BOG Musica 113.24 MasterCard
2012-01-01 14:44 BGA Libros 290.5 Visa
2012-01-01 16:26 MED Musica 246.12 Efectivo

Con el dia, hora, ciudad, producto, importe y medio de pago de compras realizadas en almacenes de una cierta cadena. Queremos obtener el total del importe de compras realizadas en cada ciudad. Una forma de hacerlo es la siguiente:

  1. procesamos cada linea y, de cada una, generamos un par (ciudad, importe), obteniendo los siguientes pares:

    (BOG, 88.56) (BGA, 62.41) (MED, 93.37) (MED, 395.93) (BOG, 113.24) (BGA, 290.5) (MED, 246.12)

  2. agrupamos las tuplas generadas por el valor del primer componente:

    (BOG, [88.56, 113.24]) (BGA, [62.41, 290.5]) (MED, [93.37, 395.93, 246.12])

  3. sumamos los elementos de cada lista para cada ciudad:

    (BOG, 201.8) (BGA, 352.91) (MED, 735.42)

Tres fases#

El primer paso en el ejemplo anterior se denomina MAP y, para cada registro de entrada, genera una tupla de formato (clave, valor).

El segundo paso se denomina SHUFFLE y lo que hace es recopilar todas las tuplas generadas en el MAP anterior de cada clave y construir una lista con todos los valores generados. Es decir, una tupla de formato (clave, lista_de_valores) para cada clave.

El tercer paso se denomina REDUCE y, para cada clave, agrega los resultados de la lista generada en el SHUFFLE anterior.

Esta forma de procesar los datos constituye un modelo de programación llamado MAP-REDUCE y que está implementado por varios frameworks de programación y en varios lenguajes, de forma que el programador solo implementa las funciones MAP y REDUCE y el framework se encarga del shuffle.

tokens=["2012-01-01","09:08","BOG","Libros","88.56","Discover"]

mr-job#

Usaremos el framework mr-job para hacer nuestros programas map-reduce. El siguiente código implementa el proceso que acabamos de describir. Fíjate cómo sólo programamos las funciones mapper y reducer.

Si están trabajando en Google Colab pueden usar !pip install mrjob para instalar la librería.

%%writefile /content/mr-basico.py
from mrjob.job import MRJob
import os

class Compras(MRJob):

    def mapper(self, _, line):
        tokens = line.split()
        yield tokens[2], float(tokens[4])

    def reducer(self, key, values):
        yield key, sum(values)

if __name__ == '__main__':
    c = Compras()
    c.run()
Writing /content/mr-basico.py
%%script python /content/mr-basico.py -r inline --quiet
2012-01-01 09:08 BOG Libros 88.56 Discover
2012-01-01 09:52 BGA Libros 62.41 Discover
2012-01-01 10:08 MED Musica 93.37 Visa
2012-01-01 10:58 MED Musica 395.93 Discover
2012-01-01 14:38 BOG Musica 113.24 MasterCard
2012-01-01 14:44 BGA Libros 290.5 Visa
2012-01-01 16:26 MED Musica 246.12 Efectivo
"BOG"	201.8
"MED"	735.4200000000001
"BGA"	352.90999999999997

Prueba a eliminar el término --quiet de la celda anterior y así verás los mensajes que emite mr-job durante la ejecución del programa. Útil para diagnosticar errores en la ejecución del código.

%%script python /content/mr-basico.py -r inline
2012-01-01 09:08 BOG Libros 88.56 Discover
2012-01-01 09:52 BGA Libros 62.41 Discover
2012-01-01 10:08 MED Musica 93.37 Visa
2012-01-01 10:58 MED Musica 395.93 Discover
2012-01-01 14:38 BOG Musica 113.24 MasterCard
2012-01-01 14:44 BGA Libros 290.5 Visa
2012-01-01 16:26 MED Musica 246.12 Efectivo
"BOG"	201.8
"MED"	735.4200000000001
"BGA"	352.90999999999997
No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/mr-basico.root.20250913.011812.021996
Running step 1 of 1...
reading from STDIN
job output is in /tmp/mr-basico.root.20250913.011812.021996/output
Streaming final output from /tmp/mr-basico.root.20250913.011812.021996/output...
Removing temp directory /tmp/mr-basico.root.20250913.011812.021996...

Otro ejemplo#

En este ejemplo partimos de una tuplas (personaA, personaB) que representa que la personaA considera a la personaB como amiga. La relación no es simétrica. El siguiente programa cuenta cuantas amigos considera cada persona que tiene.

Fíjate que la entrada es en formato JSON y cómo usamos la librería json de Python para convertir la entrada JSON en una lista de valores.

["juan", "pepe"]
["juan", "sebastian"]
["raul", "pepe"]
["ana", "pepe"]
["juan", "ana"]
["ana", "pedro"]
  1. procesamos cada linea y, de cada una, generamos un par (ciudad, importe), obteniendo los siguientes pares:

    (juan, 1) (juan, 1) (raul, 1) (ana, 1) (juan, 1) (ana, 1)

  2. agrupamos las tuplas generadas por el valor del primer componente:

    (juan, [1, 1, 1]) (raul, [1]) (ana, [1, 1])

  3. sumamos los elementos de cada lista para cada ciudad:

    (juan, 3) (raul, 1) (ana, 2)

%%writefile /content/mr-amigos.py
from mrjob.job import MRJob
import json

class Amigos(MRJob):

    def mapper(self, _, line):
        record = json.loads(line)
        yield record[0], 1

    def reducer(self, key, values):
        yield key, sum(values)

if __name__ == '__main__':
    c = Amigos()
    c.run()
Writing /content/mr-amigos.py
%%script python /content/mr-amigos.py -r inline --quiet
["juan", "pepe"]
["juan", "sebastian"]
["raul", "pepe"]
["ana", "pepe"]
["juan", "ana"]
["ana", "pedro"]
"juan"	3
"raul"	1
"ana"	2

Instrumentación y runners#

Usamos la instrumentación para encontrar errores, entender nuestro código, etc. Por ahora usamos stderr.write para saber cuantas veces se llama a cada función. Fíjate como el reducer se llama una vez por cada clave distinta que generamos en el mapper. Cuando usemos varios procesos o máquinas tendremos que recurrir a otros tipos de instrumentación.

Fíjate también que values en la función reducer es un generador [ref], es decir, no contiene en sí una lista de valores, sino que cada vez que va devolviendo valores uno a uno hasta que se acaban. Típicamente esto sucede porque va obteniendo los valores que devuelve de un stream de entrada como un fichero, o una conexión remota. Por esto sólo podemos iterar sobre todos los valores una única vez.

Como ahora en el reducer queremos obtenter ambos el número de valores que tenemos y la suma de los mismos, tenemos que modificar nuestro código.

%%writefile /content/py-files/mr-basico-instrumentado.py
from mrjob.job import MRJob
from sys import stdin, stderr
import os

class Compras(MRJob):

    def mapper(self, _, line):
        tokens = line.split()
        stderr.write("MAPPER >> {0}\n".format(line))
        #print("mapper >>", line)
        yield tokens[2], float(tokens[4])

    def reducer(self, key, values):
        n_values, sum_values = 0, 0
        for i in values:
            n_values += 1
            sum_values += i
        stderr.write("REDUCER >> {0}, number of values {1}\n".format(key, n_values))
        #print("reducer >>", key, "number of values", n_values)
        yield key, sum_values

if __name__ == '__main__':
    c = Compras()
    c.run()
Writing /content/py-files/mr-basico-instrumentado.py
%%script python /content/py-files/mr-basico-instrumentado.py --quiet
2012-01-01 09:08 BOG Libros 88.56 Discover
2012-01-01 09:52 BGA Libros 62.41 Discover
2012-01-01 10:08 MED Musica 93.37 Visa
2012-01-01 10:58 MED Musica 395.93 Discover
2012-01-01 14:38 BOG Musica 113.24 MasterCard
2012-01-01 14:44 BGA Libros 290.5 Visa
2012-01-01 16:26 MED Musica 246.12 Efectivo
"MED"	735.4200000000001
"BOG"	201.8
"BGA"	352.90999999999997
MAPPER >> 2012-01-01 09:08 BOG Libros 88.56 Discover
MAPPER >> 2012-01-01 09:52 BGA Libros 62.41 Discover
MAPPER >> 2012-01-01 10:08 MED Musica 93.37 Visa
MAPPER >> 2012-01-01 10:58 MED Musica 395.93 Discover
MAPPER >> 2012-01-01 14:38 BOG Musica 113.24 MasterCard
MAPPER >> 2012-01-01 14:44 BGA Libros 290.5 Visa
MAPPER >> 2012-01-01 16:26 MED Musica 246.12 Efectivo
REDUCER >> BGA, number of values 2
REDUCER >> BOG, number of values 2
REDUCER >> MED, number of values 3

Local runner#

Los programas en mr-job pueden ejecutarse en distintos runners. Fíjate en la opción -r cuando ejecutamos nuestro código. Los runners posibles son los siguientes:

  • inline: todos los mapper y reducer corren en el mismo proceso. Esta opción es útil para empezar a desarrollar un código y verificarlo funcionalmente.

  • local: cada mapper y reducer corren en distintos procesos independientes en la misma máquina. Con esta opción podemos hacer una primera simulación de nuetro código en un entorno distribuido.

  • hadoop: nuestro código se ejecuta en un cluster Hadoop

Instrumentar código con un local runner o en Hadoop ya no es tan fácil porque cada ejecución de las funciones mapper y reducer se hace en procesos o en máquinas distintas. Si no tenemos un mecanismo para recoger y coordinar la instrumentación generada en los distintos procesos o máquinas perderemos al menos parte de ella.

%%script python /content/py-files/mr-basico-instrumentado.py -r local --quiet
2012-01-01 09:08 BOG Libros 88.56 Discover
2012-01-01 09:52 BGA Libros 62.41 Discover
2012-01-01 10:08 MED Musica 93.37 Visa
2012-01-01 10:58 MED Musica 395.93 Discover
2012-01-01 14:38 BOG Musica 113.24 MasterCard
2012-01-01 14:44 BGA Libros 290.5 Visa
2012-01-01 16:26 MED Musica 246.12 Efectivo
"MED"	735.4200000000001
"BOG"	201.8
"BGA"	352.90999999999997

Distintos frameworks usan distintos mecanismos para instrumentar el código distribuido. En este caso (mrjob/Hadoop) usamos los counters, y el framework asegura un estado global de los mismos de manera consistente. Ahora necesitamos mostrar los mensajes de salida del framework para ver el valor final de los contadores.

Usamos a partir de ahora el fichero Data/compras.txt que tiene 30 registros con el mismo formato que el usado hasta ahora.

!cat compras.txt
2012-01-01 09:08 BOG Libros 88.56 Discover
2012-01-01 09:09 BGA Libros 337.71 Efectivo
2012-01-01 09:52 BGA Libros 450.33 MasterCard
2012-01-01 09:52 BGA Libros 62.41 Discover
2012-01-01 10:08 MED Musica 93.37 Visa
2012-01-01 10:22 BGA Musica 369.94 MasterCard
2012-01-01 10:58 MED Musica 119.12 Efectivo
2012-01-01 10:58 MED Musica 395.93 Discover
2012-01-01 11:01 MED Musica 476.82 Efectivo
2012-01-01 11:15 MED Musica 22.34 Visa
2012-01-01 11:31 BGA Musica 114.03 Efectivo
2012-01-01 11:36 BOG Musica 296.76 Discover
2012-01-01 11:52 BGA Musica 347.24 Visa
2012-01-01 12:01 MED Libros 154.86 Discover
2012-01-01 12:08 MED Libros 391.65 Visa
2012-01-01 12:19 BOG Libros 165.05 Efectivo
2012-01-01 12:19 BOG Libros 293.76 Discover
2012-01-01 12:48 MED Libros 212.47 MasterCard
2012-01-01 12:55 BOG Libros 352.38 Discover
2012-01-01 13:04 BOG Musica 303.96 MasterCard
2012-01-01 13:12 MED Musica 429.44 Efectivo
2012-01-01 13:44 BOG Libros 249.6 MasterCard
2012-01-01 14:20 BOG Libros 71.13 MasterCard
2012-01-01 14:34 BGA Libros 72.04 Visa
2012-01-01 14:38 BOG Musica 113.24 MasterCard
2012-01-01 14:44 BGA Libros 290.5 Visa
2012-01-01 15:22 BGA Libros 0.79 MasterCard
2012-01-01 15:51 BOG Libros 74.13 Efectivo
2012-01-01 16:01 MED Libros 92.19 MasterCard
2012-01-01 16:26 MED Musica 2246.12 Efectivo
%%writefile /content/py-files/mr-basico-instrumentado-counters.py
from mrjob.job import MRJob
from sys import stdin
import os

class Compras(MRJob):

    def mapper(self, _, line):
        tokens = line.split()
        self.increment_counter("llamadas al map", "numero", 1)
        yield tokens[2], float(tokens[4])

    def reducer(self, key, values):
        n_values, sum_values = 0,0
        for i in values:
            n_values += 1
            sum_values += i
        self.increment_counter("longitud valores reduce", key, n_values )
        yield key, sum_values

if __name__ == '__main__':
    c = Compras()
    c.run()
Writing /content/py-files/mr-basico-instrumentado-counters.py
%%script python /content/py-files/mr-basico-instrumentado-counters.py -r local compras.txt
--
"MED"	4634.3099999999995
"BOG"	2008.5699999999997
"BGA"	2044.9899999999998
No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/mr-basico-instrumentado-counters.root.20230822.165351.715020
Running step 1 of 1...

Counters: 4
	llamadas al map
		numero=30
	longitud valores reduce
		BGA=9
		BOG=10
		MED=11

job output is in /tmp/mr-basico-instrumentado-counters.root.20230822.165351.715020/output
Streaming final output from /tmp/mr-basico-instrumentado-counters.root.20230822.165351.715020/output...
Removing temp directory /tmp/mr-basico-instrumentado-counters.root.20230822.165351.715020...

Controlando el número de mappers y reducers#

Ahora cambiamos la instrumentación para que cuente el número de llamadas de cada función por cada proceso. Puedes controlar el número de mappers y reducers al ejecutar tu programa como se indica abajo. Prueba a ejecutar el código con distintos números de mappers.

%%writefile /content/py-files/mr-basico-instrumentado-counters-tasks.py
from mrjob.job import MRJob
from sys import stdin
import os

class Compras(MRJob):

    def mapper(self, _, line):
        tokens = line.split()
        self.increment_counter("map: proceso "+str(os.getpid()), "numero", 1)
        yield tokens[2], float(tokens[4])

    def reducer(self, key, values):
        n_values, sum_values = 0,0
        for i in values:
            n_values += 1
            sum_values += i
        self.increment_counter("longitud valores reduce: proceso "+str(os.getpid()), key, n_values )
        yield key, sum_values


if __name__ == '__main__':
    c = Compras()
    c.run()
Writing /content/py-files/mr-basico-instrumentado-counters-tasks.py

la linea entera que se ejecuta a continuación es la siguiente

python files/mr-basico-instrumentado-counters-tasks.py
       -r local
       --jobconf mapred.map.tasks=5
       --jobconf mapred.reduce.tasks=1
       ../Data/compras.txt
%%script python /content/py-files/mr-basico-instrumentado-counters-tasks.py -r local --jobconf mapred.map.tasks=5 --jobconf mapred.reduce.tasks=1 compras.txt
--
"MED"	4634.3099999999995
"BOG"	2008.5699999999997
"BGA"	2044.9899999999998
No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/mr-basico-instrumentado-counters-tasks.root.20230822.165512.835914
Running step 1 of 1...

Counters: 7
	longitud valores reduce: proceso 3862
		BOG=10
	longitud valores reduce: proceso 3863
		BGA=9
	longitud valores reduce: proceso 3866
		MED=11
	map: proceso 3840
		numero=8
	map: proceso 3841
		numero=8
	map: proceso 3846
		numero=8
	map: proceso 3847
		numero=6

job output is in /tmp/mr-basico-instrumentado-counters-tasks.root.20230822.165512.835914/output
Streaming final output from /tmp/mr-basico-instrumentado-counters-tasks.root.20230822.165512.835914/output...
Removing temp directory /tmp/mr-basico-instrumentado-counters-tasks.root.20230822.165512.835914...

Combiners#

los combiners permiten resumir los datos que emite cada proceso map antes de que lleguen al reduce y así reducir el tráfico de red que sale de los procesos map y que entra en los reducers. Los combiners se ejecutan en la misma máquina que el map, alimentándose directamente de la salida de éste. Típicamente un combiner tiene la misma implementación que el reducer si es que éste es asociativo. Si no, puede tener otra implementación particular. Fíjate en la relación entre el número de procesos map y el número de valores que le entran a cada reduce.

%%writefile /content/py-files/mr-basico-combiners.py
from mrjob.job import MRJob
from sys import stdin
import os

class Compras(MRJob):

    def mapper(self, _, line):
        tokens = line.split()
        self.increment_counter("map: ", "numero", 1)
        yield tokens[2], float(tokens[4])

    def combiner(self, key, values):
        yield key, sum(values)

    def reducer(self, key, values):
        n_values, sum_values = 0,0
        for i in values:
            n_values += 1
            sum_values += i
        self.increment_counter("longitud valores reduce: ", key, n_values )
        yield key, sum_values

if __name__ == '__main__':
    c = Compras()
    c.run()
Writing /content/py-files/mr-basico-combiners.py
%%script python /content/py-files/mr-basico-combiners.py -r local compras.txt
--
"MED"	4634.3099999999995
"BOG"	2008.5699999999997
"BGA"	2044.9899999999998
No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/mr-basico-combiners.root.20230822.165559.388036
Running step 1 of 1...

Counters: 4
	longitud valores reduce: 
		BGA=4
		BOG=4
		MED=4
	map: 
		numero=30

job output is in /tmp/mr-basico-combiners.root.20230822.165559.388036/output
Streaming final output from /tmp/mr-basico-combiners.root.20230822.165559.388036/output...
Removing temp directory /tmp/mr-basico-combiners.root.20230822.165559.388036...

Ejemplo WordCount#

%%writefile /content/py-files/mr-basico-wordcount.py
from mrjob.job import MRJob
import re
WORD_RE = re.compile(r"[\w']+")

class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield word.lower(), 1

    def combiner(self, word, counts):
        yield word, sum(counts)

    def reducer(self, word, counts):
        yield word, sum(counts)


if __name__ == '__main__':
    MRWordFreqCount.run()
Writing /content/py-files/mr-basico-wordcount.py
%%script python /content/py-files/mr-basico-wordcount.py -r inline -q
Ancient influences have helped spawn variant interpretations
of the nature of history which have evolved over the centuries
"nature"	1
"of"	2
"over"	1
"spawn"	1
"the"	2
"helped"	1
"history"	1
"influences"	1
"interpretations"	1
"variant"	1
"which"	1
"ancient"	1
"centuries"	1
"evolved"	1
"have"	2
%%writefile /content/py-files/mr-wordcount-max.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")


class MRMostUsedWord(MRJob):

    def mapper_get_words(self, _, line):
        # yield each word in the line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner_count_words(self, word, counts):
        # sum the words we've seen so far
        yield (word, sum(counts))

    def reducer_count_words(self, word, counts):
        # send all (num_occurrences, word) pairs to the same reducer.
        # num_occurrences is so we can easily use Python's max() function.
        yield None, (sum(counts), word)

    # discard the key; it is just None
    def reducer_find_max_word(self, _, word_count_pairs):
        # each item of word_count_pairs is (count, word),
        # so yielding one results in key=counts, value=word
        yield max(word_count_pairs)

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_max_word)
        ]


if __name__ == '__main__':
    MRMostUsedWord.run()
Writing /content/py-files/mr-wordcount-max.py
%%script python /content/py-files/mr-wordcount-max.py -r inline -q text.txt
--
239	"the"
%%writefile /content/py-files/mr-wordcount-max-clean.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")


class MRMostUsedWord(MRJob):

    stopwords = ['ourselves', 'hers', 'between', 'yourself', 'but', 'again', 'there', 'about', 'once', 'during', 'out',
    'very', 'having', 'with', 'they', 'own', 'an', 'be', 'some', 'for', 'do', 'its', 'yours', 'such', 'into', 'of', 'most',
    'itself', 'other', 'off', 'is', 's', 'am', 'or', 'who', 'as', 'from', 'him', 'each', 'the', 'themselves', 'until', 'below',
    'are', 'we', 'these', 'your', 'his', 'through', 'don', 'nor', 'me', 'were', 'her', 'more', 'himself', 'this', 'down', 'should',
    'our', 'their', 'while', 'above', 'both', 'up', 'to', 'ours', 'had', 'she', 'all', 'no', 'when', 'at', 'any', 'before', 'them',
    'same', 'and', 'been', 'have', 'in', 'will', 'on', 'does', 'yourselves', 'then', 'that', 'because', 'what', 'over', 'why', 'so',
    'can', 'did', 'not', 'now', 'under', 'he', 'you', 'herself', 'has', 'just', 'where', 'too', 'only', 'myself', 'which', 'those',
    'i', 'after', 'few', 'whom', 't', 'being', 'if', 'theirs', 'my', 'against', 'a', 'by', 'doing', 'it', 'how', 'further', 'was', 'here', 'than']

    def mapper_get_words(self, _, line):
        # yield each word in the line
        for word in WORD_RE.findall(line):
            if word.lower() not in self.stopwords:
                yield (word.lower(), 1)

    def combiner_count_words(self, word, counts):
        # sum the words we've seen so far
        yield (word, sum(counts))

    def reducer_count_words(self, word, counts):
        # send all (num_occurrences, word) pairs to the same reducer.
        # num_occurrences is so we can easily use Python's max() function.
        yield None, (sum(counts), word)

    # discard the key; it is just None
    def reducer_find_max_word(self, _, word_count_pairs):
        # each item of word_count_pairs is (count, word),
        # so yielding one results in key=counts, value=word
        yield max(word_count_pairs)

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_max_word)
        ]


if __name__ == '__main__':
    MRMostUsedWord.run()
Writing /content/py-files/mr-wordcount-max-clean.py
%%script python /content/py-files/mr-wordcount-max-clean.py -r inline -q text.txt
--
58	"blind"