Multiprocessing.ipynb#

Librería Multiprocessing de Python#

Definición:#

La librería Multiprocessing de Python es una biblioteca estándar que permite la creación de procesos en paralelo en una computadora con múltiples núcleos o CPUs. Proporciona una interfaz de programación para crear y administrar procesos de manera fácil y eficiente, lo que permite acelerar la ejecución de programas que realizan tareas intensivas en cómputo.

1 - ¿Cómo ejecutar funciones en paralelo utilizando la librería multiprocessing de Python?#

  • La librería multiprocessing de Python permite ejecutar funciones en paralelo utilizando procesos.

  • Para ello, se utiliza la función Process() de la librería multiprocessing.

  • La función Process() recibe como argumentos el nombre de la función que se desea ejecutar y los argumentos de la función.

  • La función Process() devuelve un objeto de tipo Process que se puede almacenar en una lista.

  • Para ejecutar la función, se utiliza el método start() del objeto Process.

  • Para esperar a que termine la ejecución de la función, se utiliza el método join() del objeto Process.

  • En el ejemplo, se crea una función mi_fun que imprime el número del proceso en el que se ejecuta.

  • Se crea una lista Process_jobs que almacena los objetos Process.

  • Se crea un bucle for que crea 5 procesos, los almacena en la lista Process_jobs, y los ejecuta.

  • Se crea un bucle for que espera a que terminen los procesos almacenados en la lista Process_jobs.

import multiprocessing

def mi_fun(i):
    print ('Función llamada en el proceso: %s \n' %i)
    return

if __name__ == '__main__':
    Process_jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=mi_fun, args=(i,))
        Process_jobs.append(p)
        p.start()
    for p in Process_jobs:
        p.join()
Función llamada en el proceso: 0 

Función llamada en el proceso: 1 

Función llamada en el proceso: 2 

Función llamada en el proceso: 3 

Función llamada en el proceso: 4 

2 - ¿Cómo asignar y obtener los nombres de los procesos?#

  • El nombre del proceso se puede obtener utilizando la función current_process() de la librería multiprocessing.

  • El nombre del proceso se puede asignar utilizando el argumento name de la función Process() de la librería multiprocessing.

import multiprocessing
import time

def foo():
    name = multiprocessing.current_process().name
    print ("Inicio del proceso llamado %s \n" %name)
    time.sleep(2)
    print ("Final del proceso llamado %s \n" %name)

if __name__ == '__main__':
    # Crear procesos
    process_with_default_name1 = multiprocessing.Process(target=foo)
    process_with_default_name2 = multiprocessing.Process(target=foo)
    process_with_name = multiprocessing.Process(name='MeNombraron',target=foo)

    # Ejecutar procesos
    process_with_name.start()
    process_with_default_name1.start()
    process_with_default_name2.start()

    # Esperar finalización de los procesos
    process_with_name.join()
    process_with_default_name1.join()
    process_with_default_name2.join()
Inicio del proceso llamado MeNombraron 

Inicio del proceso llamado Process-15 

Inicio del proceso llamado Process-16 

Final del proceso llamado MeNombraron 

Final del proceso llamado Process-15 

Final del proceso llamado Process-16 

3 - ¿Cómo ejecutar un proceso en segundo plano? - Ejecutar Python File#

  • Los procesos en el modo NO_background_process tienen una salida, por lo que el proceso demoníaco finaliza automáticamente después de que finaliza el programa principal para evitar la persistencia de los procesos en ejecución.

import multiprocessing
import time

def foo():
    name = multiprocessing.current_process().name
    print ("Inicio del proceso llamado %s" %name)
    time.sleep(2)
    print ("Final del proceso llamado %s" %name)

if __name__ == '__main__':
    background_process = multiprocessing.Process(name='background_process',target=foo)
    background_process.daemon = True

    NO_background_process = multiprocessing.Process(name='NO_background_process',target=foo)    
    NO_background_process.daemon = False
    
    background_process.start()
    NO_background_process.start()    
    background_process.join()
    NO_background_process.join()    
Inicio del proceso llamado background_process
Inicio del proceso llamado NO_background_process
Final del proceso llamado background_process
Final del proceso llamado NO_background_process

4 - ¿Cómo matar un proceso?#

  • El método is_alive() del objeto Process devuelve True si el proceso está en ejecución.

  • El proceso se puede matar utilizando el método terminate() del objeto Process.

  • El método terminate() envía una señal al proceso.

  • El método exitcode del objeto Process devuelve el código de salida del proceso.

  • Código de salida del proceso:

    • “== 0”: Esto significa que no se produjo ningún error

    • “> 0”: Esto significa que el proceso tuvo un error y salió de ese código

    • “< 0”: Esto significa que el proceso se eliminó con una señal de -1 * ExitCode

import multiprocessing
import time

def foo():
    print('Inicio función')
    time.sleep(1)
    print('Final función')

if __name__ == '__main__':
    p = multiprocessing.Process(target=foo)
    print('Proceso antes ejecución:', p, p.is_alive())
    
    p.start()
    print('Proceso ejecutandose:', p, p.is_alive())
    
    p.terminate()
    print('Proceso terminado:', p, p.is_alive())

    time.sleep(1)
    print('Proceso terminado 1 segundo:', p, p.is_alive())

    print('Código de salida del proceso:', p.exitcode)
Proceso antes ejecución: <Process name='Process-24' parent=7109 initial> False
Proceso ejecutandose: <Process name='Process-24' pid=9478 parent=7109 started> True
Proceso terminado: <Process name='Process-24' pid=9478 parent=7109 started> True
Proceso terminado 1 segundo: <Process name='Process-24' pid=9478 parent=7109 stopped exitcode=-SIGTERM> False
Código de salida del proceso: -15

5 - ¿Cómo usar un proceso en una subclase?#

  • La librería multiprocessing de Python permite crear procesos utilizando subclases.

  • Para ello, se crea una clase que hereda de la clase Process de la librería multiprocessing.

  • En el ejemplo, se crea una clase MyProcess que hereda de la clase Process.

  • La clase MyProcess sobreescribe el método run() de la clase Process.

  • La clase MyProcess llama al método run() de la clase Process.

  • Se crea una lista jobs que almacena los objetos MyProcess.

  • Se crea un bucle for que crea 5 procesos, los almacena en la lista jobs, y los ejecuta.

  • Se crea un bucle for que espera a que terminen los procesos almacenados en la lista jobs.

import multiprocessing

class MyProcess(multiprocessing.Process):

    def run(self):
        print ('Ejecución en %s \n' %self.name)
        return

if __name__ == '__main__':
    jobs = []

    for i in range(5):
        p = MyProcess()
        jobs.append(p)
        p.start()

    for p in jobs:        
        p.join()
Ejecución en MyProcess-56 
Ejecución en MyProcess-55 


Ejecución en MyProcess-58 
Ejecución en MyProcess-57 


Ejecución en MyProcess-59 

6 - ¿Cómo intercambiar objetos entre procesos utilizando una cola?#

  • La librería multiprocessing de Python permite intercambiar objetos entre procesos utilizando una cola.

  • Para ello, se utiliza la clase Queue() de la librería multiprocessing.

  • La clase Queue() recibe como argumento el tamaño máximo de la cola.

  • La clase Queue() tiene los métodos put() y get() para añadir y obtener objetos de la cola.

  • En el ejemplo, se crea una clase producer que hereda de la clase Process de la librería multiprocessing.

  • La clase producer sobreescribe el método run() de la clase Process.

  • La clase producer añade objetos a la cola utilizando el método put() de la clase Queue.

  • La clase producer imprime el tamaño de la cola utilizando el método qsize() de la clase Queue.

  • Se crea una clase consumer que hereda de la clase Process de la librería multiprocessing.

  • La clase consumer sobreescribe el método run() de la clase Process.

  • La clase consumer obtiene objetos de la cola utilizando el método get() de la clase Queue.

  • La clase consumer imprime los objetos obtenidos de la cola.

  • La clase consumer imprime un mensaje si la cola está vacía.

import multiprocessing
import random
import time

class producer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self) :
        for i in range(5):
            item = random.randint(0, 256)
            self.queue.put(item) 
            print ("Proceso Producer: item %d agregado a la cola %s" % (item,self.name))
            time.sleep(0.5)
            print ("El tamaño de la cola es %s" % self.queue.qsize())
       
class consumer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            if (self.queue.empty()):
                print("La cola está vacía")
                break
            else :
                time.sleep(1)
                item = self.queue.get()
                print ('Proceso Consumer: item %d sacado de %s \n' % (item, self.name))
                time.sleep(0.5)

if __name__ == '__main__':
        queue = multiprocessing.Queue()
        process_producer = producer(queue)
        process_consumer = consumer(queue)
        process_producer.start()
        process_consumer.start()
        process_producer.join()
        process_consumer.join()
Proceso Producer: item 56 agregado a la cola producer-60
El tamaño de la cola es 1
Proceso Producer: item 23 agregado a la cola producer-60
Proceso Consumer: item 56 sacado de consumer-61 

El tamaño de la cola es 1
Proceso Producer: item 162 agregado a la cola producer-60
El tamaño de la cola es 2
Proceso Producer: item 87 agregado a la cola producer-60
El tamaño de la cola es 3
Proceso Producer: item 140 agregado a la cola producer-60
Proceso Consumer: item 23 sacado de consumer-61 

El tamaño de la cola es 3
Proceso Consumer: item 162 sacado de consumer-61 

Proceso Consumer: item 87 sacado de consumer-61 

Proceso Consumer: item 140 sacado de consumer-61 

La cola está vacía

7 - ¿Cómo intercambiar objetos entre procesos utilizando una tubería?#

  • La librería multiprocessing de Python permite intercambiar objetos entre procesos utilizando una tubería.

  • Para ello, se utiliza la clase Pipe() de la librería multiprocessing.

  • La clase Pipe() tiene los métodos send() y recv() para añadir y obtener objetos de la tubería.

  • Después de iniciar los procesos y establecer las tuberías entre ellos, se cierran los extremos de lectura de pipe_1 y pipe_2 en el proceso principal usando pipe_1[0].close() y pipe_2[0].close().

  • Esto se hace para indicar que el proceso principal ya no está utilizando los extremos de lectura de las tuberías y para que los procesos secundarios puedan continuar funcionando sin interrupciones.

  • Luego, se utiliza un bucle try-except para recibir los objetos enviados por el proceso secundario a través de pipe_2.

  • El bucle continúa hasta que se produce un EOFError, lo que indica que ya no hay más objetos en la tubería.

  • En cada iteración del bucle, se llama a pipe_2[1].recv() para recibir el objeto enviado por el proceso secundario y se imprime en la consola usando print().

  • En el código, el índice [0] se utiliza para hacer referencia al extremo de escritura y el índice [1] se utiliza para hacer referencia al extremo de lectura.

import multiprocessing 

def create_items(pipe):
    output_pipe, _ = pipe
    for item in range(10):
        output_pipe.send(item)
    output_pipe.close()

def multiply_items(pipe_1, pipe_2):
    close, input_pipe = pipe_1
    close.close()
    output_pipe, _ = pipe_2
    try:
        while True:
            item = input_pipe.recv()
            output_pipe.send(item * item)
    except EOFError:
        output_pipe.close()

if __name__== '__main__':
    pipe_1 = multiprocessing.Pipe(True)
    process_pipe_1 = multiprocessing.Process(target=create_items, args=(pipe_1,))
    process_pipe_1.start()

    pipe_2 = multiprocessing.Pipe(True)
    process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2,))
    process_pipe_2.start()
 
    pipe_1[0].close()
    pipe_2[0].close()

    try:
        while True:            
            print(pipe_2[1].recv())
    except EOFError:
        print ("Final de la comunicación")
0
1
4
9
16
25
36
49
64
81
Final de la comunicación

8 - ¿Cómo sincronizar procesos? - Ejecutar Python File#

  • La sincronización de procesos se puede lograr utilizando:

    • la clase Barrier de la librería multiprocessing.

    • la clase Lock de la librería multiprocessing.

import multiprocessing
from multiprocessing import Barrier, Lock, Process
from time import time
from datetime import datetime

def test_with_barrier(synchronizer, serializer):
    name = multiprocessing.current_process().name
    synchronizer.wait()
    now = time()
    
    with serializer:
        print("Proceso %s ----> %s \n" %(name,datetime.fromtimestamp(now)))

    #serializer.acquire()
    #print("Proceso %s ----> %s" %(name,datetime.fromtimestamp(now)))
    #serializer.release()

def test_without_barrier():
    name = multiprocessing.current_process().name
    now = time()
    print("Proceso %s ----> %s \n" %(name ,datetime.fromtimestamp(now)))

if __name__ == '__main__':
    synchronizer = Barrier(2)
    serializer = Lock()
    Process(name='p1 - prueba_con_barrera',target=test_with_barrier,args=(synchronizer,serializer)).start()
    Process(name='p2 - prueba_con_barrera',target=test_with_barrier,args=(synchronizer,serializer)).start()
    Process(name='p3 - prueba_sin_barrera',target=test_without_barrier).start()
    Process(name='p4 - prueba_sin_barrera',target=test_without_barrier).start()
    
Proceso p1 - prueba_con_barrera ----> 2023-05-07 16:41:32.844288 

Proceso p2 - prueba_con_barrera ----> 2023-05-07 16:41:32.846685 
Proceso p3 - prueba_sin_barrera ----> 2023-05-07 16:41:32.854381 
Proceso p4 - prueba_sin_barrera ----> 2023-05-07 16:41:32.872981 

9 - ¿Cómo sincronizar procesos utilizando un administrador de procesos (manager)? - Ejecutar Python File#

  • La sincronización de procesos se puede lograr utilizando un administrador de procesos (manager).

import multiprocessing
import time

def worker(dictionary, key, item):
    print(key,item,"\n")
    time.sleep(2)
    dictionary[key] = item

if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    dictionary = mgr.dict()
    jobs = [ multiprocessing.Process(target=worker, args=(dictionary, i, i*2)) for i in range(10) ]
    
    start_time = time.time()
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()

    end_time = time.time()
    print ('Results:', dictionary)

    print("El tiempo de ejecución fue:", end_time - start_time, "segundos")
01  220 4
 3  

 4
6 
 8

5
  

106  712
  

148
  
16
 

9 18 

Results: {1: 2, 2: 4, 0: 0, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18}
El tiempo de ejecución fue: 2.338897228240967 segundos

10 - ¿Cómo ejecutar funciones en paralelo utilizando procesos con Pool?#

  • La función Pool() recibe como argumento el número de procesos que se desea crear.

  • Para ejecutar la función, se utiliza el método map() del objeto Pool.

  • Para esperar a que terminen la ejecución de las funciones, se utiliza el método close() y el método join() del objeto Pool.

  • Cuando se crea la instancia de multiprocessing.Pool con processes=4, se están creando 4 procesos en paralelo para ejecutar la función function_square en cada uno de ellos, utilizando los elementos de la lista inputs como entrada.

  • Cada proceso recibe un subconjunto de los datos y ejecuta la función sobre ellos de manera independiente y concurrente con los otros procesos.

  • Al final, se recopilan los resultados y se devuelven como una lista en el orden en que se completaron.

import multiprocessing

def function_square(data):
    result = data*data
    return result

if __name__ == '__main__':
    inputs = list(range(0,100))
    pool = multiprocessing.Pool(processes=4)
    pool_outputs = pool.map(function_square, inputs)

    pool.close() 
    pool.join()  
    print ('Pool    :', pool_outputs)
Pool    : [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]