Multiprocessing-Multithreading.md

Overview

Both multiprocessing and multithreading both are ways to achieve multitasking

Multithreading

  • will share the same address space (global variable can be access)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import time
import threading
def calc_square(numbers):
print("calculate square numbers")
for n in numbers:
time.sleep(1)
print('square:',n*n)
def calc_cube(numbers):
print("calculate cube of numbers")
for n in numbers:
time.sleep(1)
print('cube:',n*n*n)
arr = [2,3,8,9]
t = time.time()
t1= threading.Thread(target=calc_square, args=(arr,))
t2= threading.Thread(target=calc_cube, args=(arr,))
t1.start()
t2.start()
t1.join()
t2.join()
print("done in : ",time.time()-t)
print("Hah... I am done with all my work now!")

real example of making async calls

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import json
from queue import Queue
import sys
from threading import Thread
import requests
# endpoint = 'http://localhost:8000/api/v1/search'
num_threads = 20
requests_q = Queue(maxsize=0)
write_q = Queue(maxsize=0)
def make_query(rq, stat_dict, wq):
while True:
next_request = rq.get()
to_save = {'input': formatted_data, 'output': None}
try:
res = requests.post(endpoint, json=next_request)
res_json = res.json()
to_save['output'] = res_json
except Exception as e:
print(e)
# put in queue to write to file
wq.put(to_save)
rq.task_done()
def write_to_file(wq):
with open('output.json', 'w') as out_file:
while True:
line_to_write = wq.get()
print(json.dumps(line_to_write), file=out_file)
wq.task_done()
for i in range(num_threads):
worker = Thread(target=make_query, args=(requests_q, stats, write_q))
# spawn a thread as a daemon that runs without blocking the main program from exiting
# (if the thread dies in the middle of its work, it does not cause losing or corrupting data)
worker.setDaemon(True)
worker.start()
worker = Thread(target=write_to_file, args=(write_q, ))
worker.setDaemon(True)
worker.start()
# populate queue for requests
with open('input.json'.format(customer_name)) as f:
for line in f:
requests_q.put(json.loads(line))
requests_q.join()
write_q.join()

Multiprocessing

Multiprocessing via Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import multiprocessing
def calc_square(numbers, q):
for n in numbers:
q.put(n*n)
if __name__ == "__main__":
numbers = [2,3,5]
q = multiprocessing.Queue()
p = multiprocessing.Process(target=calc_square, args=(numbers,q))
p.start()
p.join()
while q.empty() is False:
print(q.get())

Lock

Process or Thread 1 -> Shared Resource (Memory, Files, Databases etc.)
Process or Thread 2 -> Shared Resource (Memory, Files, Databases etc.)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import time
import multiprocessing
def deposit(balance, lock):
for i in range(100):
time.sleep(0.01)
# critical section (lock)
lock.acquire()
balance.value = balance.value + 1
lock.release()
def withdraw(balance, lock):
for i in range(100):
time.sleep(0.01)
# critical section (lock)
lock.acquire()
balance.value = balance.value - 1
lock.release()
if __name__ == '__main__':
balance = multiprocessing.Value('i', 200)
lock = multiprocessing.Lock()
d = multiprocessing.Process(target=deposit, args=(balance,lock))
w = multiprocessing.Process(target=withdraw, args=(balance,lock))
d.start()
w.start()
d.join()
w.join()
print(balance.value)

Pool

1
2
3
4
5
6
7
8
9
10
11
from multiprocessing import Pool
def f(n):
return n*n
if __name__ == "__main__":
p = Pool(processes=3)
result = p.map(f,[1,2,3,4,5])
for n in result:
print(n)

Multiprocessing Queue v.s. Queue Module

  • different processes use different memory space
  • heavy
  • the benefit of multiprocessing is that error or memory leak in one process won’t hurt execution of another process

Multiprocessing Queue

1
2
import multiprocessing
q = multiprocessing.Queue()
  • Lives in shared memory
  • Used to share data between processes

Queue Module

1
2
import queue
q = queue.Queue()
  • Lives in in-process memory
  • Used to share data between threads

Questions

  • How to know when all threads are done with the tasks?
    Global counter with initial value of the total number of tasks, and threads update the global counter with a global lock. Once the global count turns 0, the thread that gets the lock and counter report back to the main process that all the work is done.
  • References

  • https://www.youtube.com/playlist?list=PLeo1K3hjS3uub3PRhdoCTY8BxMKSW7RjN