Skip to content Skip to sidebar Skip to footer

How To Get Concurrent.futures Processpoolexecutor Work With A Dictionary?

I watched the python multiprocessing tutorial on youtube, here's the link https://www.youtube.com/watch?v=fKl2JW_qrso&t=2316s&ab_channel=CoreySchafer Then, I tried to apply

Solution 1:

This problem could also be solved using multithreading rather than multiprocessing since most of the time spent in getCurrency_data is waiting for data to come back from your requests.get request and as such there is little contention among the threads competing for the Global Interpreter Lock. But as there is some CPU-intensive processing of the data returned done by BeautifulSoup, there will always be some contention for the GIL and this suggests that:

(1) Multiprocessing will probably perform slightly better than multithreading but only if you create as many processes as the number of URLs you have to retrieve to reflect the fact that most of the time your "worker" function is waiting and (2) you should use a requests.Session instance for retrieving the URLs since all your URLs are going against the same website and efficiency could be improved by doing so.

To convert your program to multiprocessing or multithreading (try it both ways -- you only need to change ProcessPoolExecutor to ThreadPoolExecutor, but I found that multiprocessing was slightly more performant), function getCurrency_datashould be processing only a single URL and returning back to the main process the data it has retrieved. It is the main process that should then accumulate the data returned by all the subprocesses and initialize the dataframe:

import requests
from bs4 import BeautifulSoup
import pandas as pd
#import investpy#from pandas import Timestamp#import json#from pandas.io.json import json_normalizeimport time
import concurrent.futures
from functools import partial

defgetCurrency_data(session, item):
    key, value = item

    data = session.get(value)
    soup = BeautifulSoup(data.content, 'html.parser')
    tags1 = soup.find_all('div', {'class':'top bold inlineblock'})
    span_tag =  []
    for div in tags1:
        spans = div.find_all('span')
        for span in spans:
            x = span.text
            span_tag.append(x)
    current_tmp = span_tag[0]
    change_tmp = span_tag[1]
    currency_tmp = key

    cur = []
    tags2 = soup.find('div', {'class':'clear overviewDataTable overviewDataTableWithTooltip'})
    for a in tags2.findAll('div', {'class':'first inlineblock'}):
        for b in a.findAll('span', {'class':'float_lang_base_2 bold'}):
            cur.append(b.text)
    prevclose_tmp = cur[0]
    open_tmp = cur[1]
    oneyearchange_tmp = cur[2]

    return currency_tmp, current_tmp, change_tmp, prevclose_tmp, open_tmp, oneyearchange_tmp


defmain():

    t1 = time.perf_counter()

    links = {"USD-IDR":"https://www.investing.com/currencies/usd-idr",
             "USD-JPY":"https://www.investing.com/currencies/usd-jpy",
             "USD-CNY":"https://www.investing.com/currencies/usd-cny",
             "USD-EUR":"https://www.investing.com/currencies/usd-eur",
             "USD-SGD":"https://www.investing.com/currencies/usd-sgd",
             "USD-THB":"https://www.investing.com/currencies/usd-thb",
             "USD-MXN":"https://www.investing.com/currencies/usd-mxn",
             "USD-MYR":"https://www.investing.com/currencies/usd-myr",
             "USD-KRW":"https://www.investing.com/currencies/usd-krw",
             "USD-INR":"https://www.investing.com/currencies/usd-inr"}

    with requests.Session() as session:
        user_agent = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.37"
        session.headers = {'User-Agent': user_agent}
        currency = []
        current = []
        change = []
        prev_close = []
        open_ = []
        oneyear_change = []
        with concurrent.futures.ProcessPoolExecutor(max_workers=len(links)) as executor:
            for return_value in executor.map(partial(getCurrency_data, session), links.items()):
                currency_tmp, current_tmp, change_tmp, prevclose_tmp, open_tmp, oneyearchange_tmp = return_value
                currency.append(currency_tmp)
                current.append(current_tmp)
                change.append(change_tmp)
                prev_close.append(prevclose_tmp)
                open_.append(open_tmp)
                oneyear_change.append(oneyearchange_tmp)
        df_currency = pd.DataFrame(columns=['Currency', 'Current', 'Change', 'Prev. Close', 'Open', '1 Year Change'])
        df_currency["Currency"] = currency
        df_currency["Current"] = current
        df_currency["Change"] = change
        df_currency["Prev. Close"] = prev_close
        df_currency["Open"] = open_
        df_currency["1 Year Change"] = oneyear_change

    t2 = time.perf_counter()

    print(f'Finished in {t2-t1} seconds')

    print(df_currency)

# Required for Windows:if __name__ == '__main__':
    main()

Prints:

Finished in 4.4468559 seconds
  Currency   Current   Change Prev. Close      Open 1 Year Change
0  USD-IDR  14,452.5    +52.5      14,400    14,450       - 2.49%
1  USD-JPY    109.81    +0.09      109.72    109.73         3.47%
2  USD-CNY    6.5006  +0.0064      6.4942    6.4951       - 6.13%
3  USD-EUR    0.8564   0.0001      0.8563    0.8565         1.33%
4  USD-SGD    1.3628  -0.0014      1.3643     1.364       - 0.44%
5  USD-THB    33.370   +0.020       33.35     33.34         6.54%
6  USD-MXN   20.3829  +0.2309      20.152    20.152       - 8.88%
7  USD-MYR    4.2375  +0.0005       4.237    4.2395         1.67%
8  USD-KRW  1,182.31    +6.03    1,176.28  1,175.46       - 0.69%
9  USD-INR    74.400   +0.030       74.37     74.38       - 0.62%

The Best of Both Worlds

Since there is a fair bit of overhead in creating processes, it can be most efficient to separate the work needed to be done into what is primarily I/O (i.e. retrieving the URL) and primarily CPU (parsing and processing the retrieved HTML document) and using multithreading for the former and multiprocessing for the later. In that way you are not creating more processes than you actually need.

As before, the multithreading pool size should be equal to the number of URLs needed to be retrieved (as long as that number is not unreasonably large; creating hundreds of threads should not be a problem) and the multiprocessing pool size should use at most the number of CPU cores you have. We therefore create both pools and pass the multiprocessing pool to our worker function, which retrieves the URL and then submits the data to the multiprocessing pool for parsing and processing that data.

In the code below I am creating a multiprocessing pool size equal to the number of physical cores that I have (4), which is half of the number of logical cores I have, which would otherwise be the default pool size if I did not specify a size. To determine this value dynamically, you can install package psutil from the PyPI repository:

import requests
from bs4 import BeautifulSoup
import pandas as pd
#import investpy#from pandas import Timestamp#import json#from pandas.io.json import json_normalizeimport time
import concurrent.futures
from functools import partial
import psutil

defprocess_data(key, data):
    soup = BeautifulSoup(data, 'html.parser')
    tags1 = soup.find_all('div', {'class':'top bold inlineblock'})
    span_tag =  []
    for div in tags1:
        spans = div.find_all('span')
        for span in spans:
            x = span.text
            span_tag.append(x)
    current_tmp = span_tag[0]
    change_tmp = span_tag[1]
    currency_tmp = key

    cur = []
    tags2 = soup.find('div', {'class':'clear overviewDataTable overviewDataTableWithTooltip'})
    for a in tags2.findAll('div', {'class':'first inlineblock'}):
        for b in a.findAll('span', {'class':'float_lang_base_2 bold'}):
            cur.append(b.text)
    prevclose_tmp = cur[0]
    open_tmp = cur[1]
    oneyearchange_tmp = cur[2]

    return currency_tmp, current_tmp, change_tmp, prevclose_tmp, open_tmp, oneyearchange_tmp


defgetCurrency_data(session, pool_executor, item):
    key, value = item

    data = session.get(value)
    f = pool_executor.submit(process_data, key, data.content)
    return f.result()

defmain():

    t1 = time.perf_counter()

    links = {"USD-IDR":"https://www.investing.com/currencies/usd-idr",
             "USD-JPY":"https://www.investing.com/currencies/usd-jpy",
             "USD-CNY":"https://www.investing.com/currencies/usd-cny",
             "USD-EUR":"https://www.investing.com/currencies/usd-eur",
             "USD-SGD":"https://www.investing.com/currencies/usd-sgd",
             "USD-THB":"https://www.investing.com/currencies/usd-thb",
             "USD-MXN":"https://www.investing.com/currencies/usd-mxn",
             "USD-MYR":"https://www.investing.com/currencies/usd-myr",
             "USD-KRW":"https://www.investing.com/currencies/usd-krw",
             "USD-INR":"https://www.investing.com/currencies/usd-inr"}

    with requests.Session() as session:
        user_agent = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.37"
        session.headers = {'User-Agent': user_agent}
        currency = []
        current = []
        change = []
        prev_close = []
        open_ = []
        oneyear_change = []
        with concurrent.futures.ProcessPoolExecutor(psutil.cpu_count(logical=False)) as pool_executor, \
        concurrent.futures.ThreadPoolExecutor(max_workers=len(links)) as executor:
            for return_value in executor.map(partial(getCurrency_data, session, pool_executor), links.items()):
                currency_tmp, current_tmp, change_tmp, prevclose_tmp, open_tmp, oneyearchange_tmp = return_value
                currency.append(currency_tmp)
                current.append(current_tmp)
                change.append(change_tmp)
                prev_close.append(prevclose_tmp)
                open_.append(open_tmp)
                oneyear_change.append(oneyearchange_tmp)
        df_currency = pd.DataFrame(columns=['Currency', 'Current', 'Change', 'Prev. Close', 'Open', '1 Year Change'])
        df_currency["Currency"] = currency
        df_currency["Current"] = current
        df_currency["Change"] = change
        df_currency["Prev. Close"] = prev_close
        df_currency["Open"] = open_
        df_currency["1 Year Change"] = oneyear_change

    t2 = time.perf_counter()

    print(f'Finished in {t2-t1} seconds')

    print(df_currency)

# Required for Windows:if __name__ == '__main__':
    main()

Prints:

Finished in 3.5800665 seconds
  Currency   Current   Change Prev. Close      Open 1 Year Change
0  USD-IDR  14,452.5    +52.5      14,400    14,450       - 2.49%
1  USD-JPY    109.81    +0.09      109.72    109.73         3.47%
2  USD-CNY    6.5015  +0.0073      6.4942    6.4951       - 6.13%
3  USD-EUR    0.8545  -0.0018      0.8563    0.8565         1.33%
4  USD-SGD    1.3615  -0.0027      1.3643     1.364       - 0.44%
5  USD-THB    33.360   +0.010       33.35     33.34         6.54%
6  USD-MXN   20.4000  +0.2480      20.152    20.152       - 8.88%
7  USD-MYR    4.2375  +0.0005       4.237    4.2395         1.67%
8  USD-KRW  1,177.58    +1.30    1,176.28  1,175.46       - 0.69%
9  USD-INR    74.352   -0.018       74.37     74.38       - 0.62%

Post a Comment for "How To Get Concurrent.futures Processpoolexecutor Work With A Dictionary?"