Multiprocessing.ipynb#
Librería Multiprocessing de Python#
Definición:#
La librería Multiprocessing de Python es una biblioteca estándar que permite la creación de procesos en paralelo en una computadora con múltiples núcleos o CPUs. Proporciona una interfaz de programación para crear y administrar procesos de manera fácil y eficiente, lo que permite acelerar la ejecución de programas que realizan tareas intensivas en cómputo.
1 - ¿Cómo ejecutar funciones en paralelo utilizando la librería multiprocessing de Python?#
La librería multiprocessing de Python permite ejecutar funciones en paralelo utilizando procesos.
Para ello, se utiliza la función Process() de la librería multiprocessing.
La función Process() recibe como argumentos el nombre de la función que se desea ejecutar y los argumentos de la función.
La función Process() devuelve un objeto de tipo Process que se puede almacenar en una lista.
Para ejecutar la función, se utiliza el método start() del objeto Process.
Para esperar a que termine la ejecución de la función, se utiliza el método join() del objeto Process.
En el ejemplo, se crea una función mi_fun que imprime el número del proceso en el que se ejecuta.
Se crea una lista Process_jobs que almacena los objetos Process.
Se crea un bucle for que crea 5 procesos, los almacena en la lista Process_jobs, y los ejecuta.
Se crea un bucle for que espera a que terminen los procesos almacenados en la lista Process_jobs.
import multiprocessing
def mi_fun(i):
print ('Función llamada en el proceso: %s \n' %i)
return
if __name__ == '__main__':
Process_jobs = []
for i in range(5):
p = multiprocessing.Process(target=mi_fun, args=(i,))
Process_jobs.append(p)
p.start()
for p in Process_jobs:
p.join()
Función llamada en el proceso: 0
Función llamada en el proceso: 1
Función llamada en el proceso: 2
Función llamada en el proceso: 3
Función llamada en el proceso: 4
2 - ¿Cómo asignar y obtener los nombres de los procesos?#
El nombre del proceso se puede obtener utilizando la función current_process() de la librería multiprocessing.
El nombre del proceso se puede asignar utilizando el argumento name de la función Process() de la librería multiprocessing.
import multiprocessing
import time
def foo():
name = multiprocessing.current_process().name
print ("Inicio del proceso llamado %s \n" %name)
time.sleep(2)
print ("Final del proceso llamado %s \n" %name)
if __name__ == '__main__':
# Crear procesos
process_with_default_name1 = multiprocessing.Process(target=foo)
process_with_default_name2 = multiprocessing.Process(target=foo)
process_with_name = multiprocessing.Process(name='MeNombraron',target=foo)
# Ejecutar procesos
process_with_name.start()
process_with_default_name1.start()
process_with_default_name2.start()
# Esperar finalización de los procesos
process_with_name.join()
process_with_default_name1.join()
process_with_default_name2.join()
Inicio del proceso llamado MeNombraron
Inicio del proceso llamado Process-15
Inicio del proceso llamado Process-16
Final del proceso llamado MeNombraron
Final del proceso llamado Process-15
Final del proceso llamado Process-16
3 - ¿Cómo ejecutar un proceso en segundo plano? - Ejecutar Python File#
Los procesos en el modo NO_background_process tienen una salida, por lo que el proceso demoníaco finaliza automáticamente después de que finaliza el programa principal para evitar la persistencia de los procesos en ejecución.
import multiprocessing
import time
def foo():
name = multiprocessing.current_process().name
print ("Inicio del proceso llamado %s" %name)
time.sleep(2)
print ("Final del proceso llamado %s" %name)
if __name__ == '__main__':
background_process = multiprocessing.Process(name='background_process',target=foo)
background_process.daemon = True
NO_background_process = multiprocessing.Process(name='NO_background_process',target=foo)
NO_background_process.daemon = False
background_process.start()
NO_background_process.start()
background_process.join()
NO_background_process.join()
Inicio del proceso llamado background_process
Inicio del proceso llamado NO_background_process
Final del proceso llamado background_process
Final del proceso llamado NO_background_process
4 - ¿Cómo matar un proceso?#
El método is_alive() del objeto Process devuelve True si el proceso está en ejecución.
El proceso se puede matar utilizando el método terminate() del objeto Process.
El método terminate() envía una señal al proceso.
El método exitcode del objeto Process devuelve el código de salida del proceso.
Código de salida del proceso:
“== 0”: Esto significa que no se produjo ningún error
“> 0”: Esto significa que el proceso tuvo un error y salió de ese código
“< 0”: Esto significa que el proceso se eliminó con una señal de -1 * ExitCode
import multiprocessing
import time
def foo():
print('Inicio función')
time.sleep(1)
print('Final función')
if __name__ == '__main__':
p = multiprocessing.Process(target=foo)
print('Proceso antes ejecución:', p, p.is_alive())
p.start()
print('Proceso ejecutandose:', p, p.is_alive())
p.terminate()
print('Proceso terminado:', p, p.is_alive())
time.sleep(1)
print('Proceso terminado 1 segundo:', p, p.is_alive())
print('Código de salida del proceso:', p.exitcode)
Proceso antes ejecución: <Process name='Process-24' parent=7109 initial> False
Proceso ejecutandose: <Process name='Process-24' pid=9478 parent=7109 started> True
Proceso terminado: <Process name='Process-24' pid=9478 parent=7109 started> True
Proceso terminado 1 segundo: <Process name='Process-24' pid=9478 parent=7109 stopped exitcode=-SIGTERM> False
Código de salida del proceso: -15
5 - ¿Cómo usar un proceso en una subclase?#
La librería multiprocessing de Python permite crear procesos utilizando subclases.
Para ello, se crea una clase que hereda de la clase Process de la librería multiprocessing.
En el ejemplo, se crea una clase MyProcess que hereda de la clase Process.
La clase MyProcess sobreescribe el método run() de la clase Process.
La clase MyProcess llama al método run() de la clase Process.
Se crea una lista jobs que almacena los objetos MyProcess.
Se crea un bucle for que crea 5 procesos, los almacena en la lista jobs, y los ejecuta.
Se crea un bucle for que espera a que terminen los procesos almacenados en la lista jobs.
import multiprocessing
class MyProcess(multiprocessing.Process):
def run(self):
print ('Ejecución en %s \n' %self.name)
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = MyProcess()
jobs.append(p)
p.start()
for p in jobs:
p.join()
Ejecución en MyProcess-56
Ejecución en MyProcess-55
Ejecución en MyProcess-58
Ejecución en MyProcess-57
Ejecución en MyProcess-59
6 - ¿Cómo intercambiar objetos entre procesos utilizando una cola?#
La librería multiprocessing de Python permite intercambiar objetos entre procesos utilizando una cola.
Para ello, se utiliza la clase Queue() de la librería multiprocessing.
La clase Queue() recibe como argumento el tamaño máximo de la cola.
La clase Queue() tiene los métodos put() y get() para añadir y obtener objetos de la cola.
En el ejemplo, se crea una clase producer que hereda de la clase Process de la librería multiprocessing.
La clase producer sobreescribe el método run() de la clase Process.
La clase producer añade objetos a la cola utilizando el método put() de la clase Queue.
La clase producer imprime el tamaño de la cola utilizando el método qsize() de la clase Queue.
Se crea una clase consumer que hereda de la clase Process de la librería multiprocessing.
La clase consumer sobreescribe el método run() de la clase Process.
La clase consumer obtiene objetos de la cola utilizando el método get() de la clase Queue.
La clase consumer imprime los objetos obtenidos de la cola.
La clase consumer imprime un mensaje si la cola está vacía.
import multiprocessing
import random
import time
class producer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self) :
for i in range(5):
item = random.randint(0, 256)
self.queue.put(item)
print ("Proceso Producer: item %d agregado a la cola %s" % (item,self.name))
time.sleep(0.5)
print ("El tamaño de la cola es %s" % self.queue.qsize())
class consumer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
while True:
if (self.queue.empty()):
print("La cola está vacía")
break
else :
time.sleep(1)
item = self.queue.get()
print ('Proceso Consumer: item %d sacado de %s \n' % (item, self.name))
time.sleep(0.5)
if __name__ == '__main__':
queue = multiprocessing.Queue()
process_producer = producer(queue)
process_consumer = consumer(queue)
process_producer.start()
process_consumer.start()
process_producer.join()
process_consumer.join()
Proceso Producer: item 56 agregado a la cola producer-60
El tamaño de la cola es 1
Proceso Producer: item 23 agregado a la cola producer-60
Proceso Consumer: item 56 sacado de consumer-61
El tamaño de la cola es 1
Proceso Producer: item 162 agregado a la cola producer-60
El tamaño de la cola es 2
Proceso Producer: item 87 agregado a la cola producer-60
El tamaño de la cola es 3
Proceso Producer: item 140 agregado a la cola producer-60
Proceso Consumer: item 23 sacado de consumer-61
El tamaño de la cola es 3
Proceso Consumer: item 162 sacado de consumer-61
Proceso Consumer: item 87 sacado de consumer-61
Proceso Consumer: item 140 sacado de consumer-61
La cola está vacía
7 - ¿Cómo intercambiar objetos entre procesos utilizando una tubería?#
La librería multiprocessing de Python permite intercambiar objetos entre procesos utilizando una tubería.
Para ello, se utiliza la clase Pipe() de la librería multiprocessing.
La clase Pipe() tiene los métodos send() y recv() para añadir y obtener objetos de la tubería.
Después de iniciar los procesos y establecer las tuberías entre ellos, se cierran los extremos de lectura de pipe_1 y pipe_2 en el proceso principal usando pipe_1[0].close() y pipe_2[0].close().
Esto se hace para indicar que el proceso principal ya no está utilizando los extremos de lectura de las tuberías y para que los procesos secundarios puedan continuar funcionando sin interrupciones.
Luego, se utiliza un bucle try-except para recibir los objetos enviados por el proceso secundario a través de pipe_2.
El bucle continúa hasta que se produce un EOFError, lo que indica que ya no hay más objetos en la tubería.
En cada iteración del bucle, se llama a pipe_2[1].recv() para recibir el objeto enviado por el proceso secundario y se imprime en la consola usando print().
En el código, el índice [0] se utiliza para hacer referencia al extremo de escritura y el índice [1] se utiliza para hacer referencia al extremo de lectura.
import multiprocessing
def create_items(pipe):
output_pipe, _ = pipe
for item in range(10):
output_pipe.send(item)
output_pipe.close()
def multiply_items(pipe_1, pipe_2):
close, input_pipe = pipe_1
close.close()
output_pipe, _ = pipe_2
try:
while True:
item = input_pipe.recv()
output_pipe.send(item * item)
except EOFError:
output_pipe.close()
if __name__== '__main__':
pipe_1 = multiprocessing.Pipe(True)
process_pipe_1 = multiprocessing.Process(target=create_items, args=(pipe_1,))
process_pipe_1.start()
pipe_2 = multiprocessing.Pipe(True)
process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2,))
process_pipe_2.start()
pipe_1[0].close()
pipe_2[0].close()
try:
while True:
print(pipe_2[1].recv())
except EOFError:
print ("Final de la comunicación")
0
1
4
9
16
25
36
49
64
81
Final de la comunicación
8 - ¿Cómo sincronizar procesos? - Ejecutar Python File#
La sincronización de procesos se puede lograr utilizando:
la clase Barrier de la librería multiprocessing.
la clase Lock de la librería multiprocessing.
import multiprocessing
from multiprocessing import Barrier, Lock, Process
from time import time
from datetime import datetime
def test_with_barrier(synchronizer, serializer):
name = multiprocessing.current_process().name
synchronizer.wait()
now = time()
with serializer:
print("Proceso %s ----> %s \n" %(name,datetime.fromtimestamp(now)))
#serializer.acquire()
#print("Proceso %s ----> %s" %(name,datetime.fromtimestamp(now)))
#serializer.release()
def test_without_barrier():
name = multiprocessing.current_process().name
now = time()
print("Proceso %s ----> %s \n" %(name ,datetime.fromtimestamp(now)))
if __name__ == '__main__':
synchronizer = Barrier(2)
serializer = Lock()
Process(name='p1 - prueba_con_barrera',target=test_with_barrier,args=(synchronizer,serializer)).start()
Process(name='p2 - prueba_con_barrera',target=test_with_barrier,args=(synchronizer,serializer)).start()
Process(name='p3 - prueba_sin_barrera',target=test_without_barrier).start()
Process(name='p4 - prueba_sin_barrera',target=test_without_barrier).start()
Proceso p1 - prueba_con_barrera ----> 2023-05-07 16:41:32.844288
Proceso p2 - prueba_con_barrera ----> 2023-05-07 16:41:32.846685
Proceso p3 - prueba_sin_barrera ----> 2023-05-07 16:41:32.854381
Proceso p4 - prueba_sin_barrera ----> 2023-05-07 16:41:32.872981
9 - ¿Cómo sincronizar procesos utilizando un administrador de procesos (manager)? - Ejecutar Python File#
La sincronización de procesos se puede lograr utilizando un administrador de procesos (manager).
import multiprocessing
import time
def worker(dictionary, key, item):
print(key,item,"\n")
time.sleep(2)
dictionary[key] = item
if __name__ == '__main__':
mgr = multiprocessing.Manager()
dictionary = mgr.dict()
jobs = [ multiprocessing.Process(target=worker, args=(dictionary, i, i*2)) for i in range(10) ]
start_time = time.time()
for j in jobs:
j.start()
for j in jobs:
j.join()
end_time = time.time()
print ('Results:', dictionary)
print("El tiempo de ejecución fue:", end_time - start_time, "segundos")
01 220 4
3
4
6
8
5
106 712
148
16
9 18
Results: {1: 2, 2: 4, 0: 0, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18}
El tiempo de ejecución fue: 2.338897228240967 segundos
10 - ¿Cómo ejecutar funciones en paralelo utilizando procesos con Pool?#
La función Pool() recibe como argumento el número de procesos que se desea crear.
Para ejecutar la función, se utiliza el método map() del objeto Pool.
Para esperar a que terminen la ejecución de las funciones, se utiliza el método close() y el método join() del objeto Pool.
Cuando se crea la instancia de multiprocessing.Pool con processes=4, se están creando 4 procesos en paralelo para ejecutar la función function_square en cada uno de ellos, utilizando los elementos de la lista inputs como entrada.
Cada proceso recibe un subconjunto de los datos y ejecuta la función sobre ellos de manera independiente y concurrente con los otros procesos.
Al final, se recopilan los resultados y se devuelven como una lista en el orden en que se completaron.
import multiprocessing
def function_square(data):
result = data*data
return result
if __name__ == '__main__':
inputs = list(range(0,100))
pool = multiprocessing.Pool(processes=4)
pool_outputs = pool.map(function_square, inputs)
pool.close()
pool.join()
print ('Pool :', pool_outputs)
Pool : [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]