How To Fix Multithreading/multiprocessing With Dictionaries?
Solution 1:
you can use threads and queue to communicate, first you will start get_ips_from_sysinfo
as a single thread to monitor and process any finished sysinfo
which will store output in output_list
then fire all get_sys_info
threads, be careful not to run out of memory with 100k threads
from threading import Thread
from queue import Queue
jobs = Queue() # buffer for sysinfo
output_list = [] # store ipsdefget_sys_info(self, host_id, appliance):
sysinfo = self.hx_request("https://{}:3000//hx/api/v3/hosts/{}/sysinfo"
jobs.put(sysinfo) # add sysinfo to jobs queuereturn sysinfo # comment if you don't need itdefget_ips_from_sysinfo(self):
"""it will run contineously untill finish all jobd"""whileTrue:
# get sysinfo from jobs queue
sysinfo = jobs.get() # it will wait here for new entryif sysinfo == 'exit':
print('we are done here')
break
sysinfo = sysinfo["data"]
network_array = sysinfo.get("networkArray", {})
network_info = network_array.get("networkInfo", [])
ips = []
for ni in network_info:
ip_array = ni.get("ipArray", {})
ip_info = ip_array.get("ipInfo", [])
for i in ip_info:
ips.append(i)
output_list.append(ips)
if __name__ == "__main__":
# start our listner thread
Thread(target=rr.get_ips_from_sysinfo)
threads = []
for i in ids:
t = Thread(target=rr.get_sys_info, args=(i, appliance))
threads.append(t)
t.start()
# wait for threads to finish then terminate get_ips_from_sysinfo() by send 'exit' flagfor t in threads:
t.join()
jobs.put('exit')
Solution 2:
As @wwii commented, concurrent.futures
offer some conveniences that you may help you, particularly since this looks like a batch job.
It appears that your performance hit is most likely to come from the network calls so multithreading is probably more suitable for your use case (here is a comparison with multiprocessing). If not, you can switch the pool from threads to processes while using the same APIs.
from concurrent.futures import ThreadPoolExecutor, as_completed
# You can import ProcessPoolExecutor instead and use the same APIsdefthread_worker(instance, host_id, appliance):
"""Wrapper for your class's `get_sys_info` method"""
sysinfo = instance.get_sys_info(host_id, appliance)
return sysinfo, instance
# instantiate the class that contains the methods in your example code# I will call it `RR`
instances = (RR(*your_args, **your_kwds) for your_args, your_kwds
inzip(iterable_of_args, iterable_of_kwds))
all_host_ids = another_iterable
all_appliances = still_another_iterable
if __name__ == "__main__":
with ThreadPoolExecutor(max_workers=50) as executor: # assuming 10 threads per core; your example uses 5 processes
pool = {executor.submit(thread_worker, instance, _id, _app): (_id, _app)
for _id, _app inzip(instances, all_host_ids, all_appliances)}
# handle the `sysinfo` dicts as they arrivefor future in as_completed(pool):
_result = future.result()
ifisinstance(_sysinfo, Exception): # just one way of handling exceptions# do somethingprint(f"{pool[future]} raised {future.result()}")
else:
# enqueue results for parallel processing in a separate stage, or# process the results serially
_sysinfo, _instance = _result
ips = _instance.get_ips_from_sysinfo(_sysinfo)
# do something with `ips`
You can streamline this example by refactoring your methods into functions, if indeed they don't make use of state as seems to be the case in your code.
If extracting the sysinfo
data is expensive, you can enqueue the results and in turn feed those to a ProcessPoolExecutor
that calls get_ips_from_sysinfo
on the queued dicts.
Solution 3:
For whatever reason I was a little leary about calling an instance method in numerous threads - but it seems to work. I made this toy example using concurrent.futures - hopefully it mimics your actual situation well enough. This submits 4000 instance method calls to a thread pool of (at max) 500 workers. Playing around with the max_workers
value I found that execution time improvements were pretty linear up to about a 1000 workers then the improvement ratio started tailing off.
import concurrent.futures, time, random
a = [.001*n for n inrange(1,4001)]
classF:
def__init__(self, name):
self.name = f'{name}:{self.__class__.__name__}'defapicall(self,n):
wait = random.choice(a)
time.sleep(wait)
return (n,wait, self.name)
f = F('foo')
if __name__ == '__main__':
nworkers = 500with concurrent.futures.ThreadPoolExecutor(nworkers) as executor:
# t = time.time()
futures = [executor.submit(f.apicall, n) for n inrange(4000)]
results = [future.result() for future in concurrent.futures.as_completed(futures)]
# t = time.time() - t# q = sum(r[1] for r in results)# print(f'# workers:{nworkers} - ratio:{q/t}')
I didn't account for possible Exceptions being thrown during the method call but the example in the docs is pretty clear how to handle that.
Solution 4:
So... after days of looking at the suggestions on here(thank you so much!!!) And a couple outside reading (Fluent Python Ch 17 and Effective Python 59 Specific Ways..)
def get_ips_from_sysinfo(urls):
sysinfo = lx_request(urls)
ip_dict =[]
sysinfo = sysinfo["data"]
hostname = sysinfo.get("hostname")
network_array = sysinfo.get("networkArray", {})
network_info = network_array.get("networkInfo", [])
ips = []
entry = {}
entry["hostname"] = hostname
entry["ip_addrs"] = []
for ni in network_info:
ip_array = ni.get("ipArray", {})
ip_info = ip_array.get("ipInfo", [])
for ip in ip_info:
ip_addr = ip.get("ipAddress", None)
ifnot ip_addr:
ip_addr = ip.get("ipv6Address", None)
if ip is None:
continueifnotis_ip_private(ip_addr):
entry["ip_addrs"].append(ip_addr)
iflen(entry["ip_addrs"]) == 0:
continueelse:
ip_dict.append(entry)
return ip_dict
urls = get_sys_info(appliance, ids)
def main():
pool = ThreadPoolExecutor(max_workers = 15)
results = list(tqdm(pool.map(get_ips_from_sysinfo, urls), total=len(urls)))
withopen("ip_array.json", "w+") as f:
json.dump(results, f, indent=2, sort_keys=True)
main()
*Modified this works now, hope it helps someone else
Post a Comment for "How To Fix Multithreading/multiprocessing With Dictionaries?"