且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

修复了在计算多处理期间调用函数的频率时返回负值的问题

更新时间:2023-02-19 23:15:29

我找到了一种无需询问正在运行的 worker 数量的方法:

I found a way to do it without asking for the number of workers running:

import random
import time
from time import sleep
from multiprocessing import Manager, Process, Lock

def foo(call_rate, lock):
    # Shift this to the start of the function
    with lock:
        call_rate['total_time'] += time.time() - call_rate['last_call']
        call_rate['last_call'] = time.time()
        call_rate['total_calls'] += 1
        call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
    
    # Mimic blocking of function
    sleep(random.randint(1, 3))

    output = 'result of some logic'


    # By doing this, we are ignoring the time spent within the function
    with lock:
        call_rate['last_call'] = time.time()
    return output

def worker(num, call_rate, lock):
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(call_rate, lock)

if __name__ == '__main__':
    # Create a shared dictionary accessible by all processes
    m = Manager()
    lock = m.Lock()
    call_rate = m.dict({'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0})

    w = []

    # Create 3 worker processes that run foo() thrice 
    for i in range(3):
        w.append(Process(target=worker, args=(3, call_rate, lock, )))
        w[i].start()
    for i in range(3):
        w[i].join()

    # Output: 0.354s 
    print('foo called once every {}s'.format(call_rate['rate']))

我会解释为什么会这样.在原始代码中,最后一次调用时间是在函数被阻塞后记录的.这意味着需要减去在函数中花费的时间.但是,正如@Booboo 在对他们的回答的评论中已经指出的那样,这是有问题的,因为可能有多个工作人员在运行,我们不能只减去每个工作人员在函数中花费的等待时间.

I will explain why this works. In the original code, the last call time was recorded AFTER the function had blocked. This meant that the time spent in the function need to be subtracted. But, as @Booboo had already pointed out in the comment to their answer, this was problematic because there maybe multiple workers running and we can't just subtract the waiting time EACH worker spends in the function.

对此的一个简单解决方法是在函数开始时记录上次调用时间,其中尚未添加函数内花费的时间.但它仍然没有解决更广泛的问题,因为下一次 foo() 将从工作人员调用,它将包括从上次调用开始在函数内花费的时间,让我们留在第一个再次.但是这个,我不知道为什么我以前没有看到这个,可以很简单地修复;通过在函数退出之前添加这一行:

A simple workaround to this is to record the last call time at the start of the function, where the time spent within the function has not yet been added. But it still doesn't solve the broader problem because the next time foo() will be called from the worker, it will include the time spent within the function from the last call, leaving us at square one again. But this, and I don't know why I didn't see this before, can be fixed very simply; by adding this line just before the function exits:

call_rate['last_call'] = time.time()

这确保当函数退出时,最后一次调用被刷新,这样工作人员似乎根本没有在函数中花费任何时间.这种方法不需要减去任何东西,这就是它起作用的原因.

This makes sure that when the function exits, the last call is refreshed such that it seems the worker did not spend any time in the function at all. This approach does not require subtracting anything and thats why it works.

我做了一个测试,我运行了 10 次,并使用下面的代码计算了一些统计数据:

I did a test where I ran this 10 times and calculated some statistics using the code below:

import random
import time
from time import sleep
from multiprocessing import Manager, Process, Lock
import statistics


def foo(call_rate, lock):
    with lock:
        call_rate['total_time'] += time.time() - call_rate['last_call']
        call_rate['last_call'] = time.time()
        call_rate['total_calls'] += 1
        call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
    # Mimic blocking of function
    sleep(2)

    output = 'result of some logic'


    # By doing this, we are ignoring the time spent within the function
    with lock:
        call_rate['last_call'] = time.time()
    return output

def worker(num, call_rate, lock):
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(call_rate, lock)

def main():
    # Create a shared dictionary accessible by all processes
    m = Manager()
    lock = m.Lock()
    call_rate = m.dict({'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0})

    w = []

    # Create 3 worker processes that run foo() thrice
    for i in range(3):
        w.append(Process(target=worker, args=(3, call_rate, lock, )))
        w[i].start()
    for i in range(3):
        w[i].join()

    return call_rate['rate']

if __name__ == '__main__':
    avgs = []
    for i in range(10):
        avgs.append(main())

    print("Highest is : {}".format(max(avgs)))
    print("Lowest is : {}".format(min(avgs)))
    print("Avergae is : {}".format(statistics.mean(avgs)))

输出:

Highest is : 0.35980285538567436
Lowest is : 0.3536567423078749
Avergae is : 0.356808172331916

作为证明"上述代码确实忽略了函数内花费的时间,您可以使函数块的时间更长,例如 15 秒,并且输出仍将大致相同.

As a 'proof' that the above code does ignore the time spent within the function, you can make the function block for a larger time, say 15s, and the output will still be approximately the same.

更新

不同时间功能块的频率不是0.3s的原因与worker进入和退出有关foo().考虑下面的代码,其中两个 worker 运行一次,执行 foo() 两次并输出 call_rate 每次进入和退出 foo() 以及用于标识工作人员的唯一 ID:

The reason why the frequency is not 0.3s when the function blocks for a varying time has to do with when the workers enter and exit foo(). Consider the code below where two workers are run once which execute foo() twice and output call_rate every enter and exit of foo() along with a unique id to identify the worker:

import random
import time
from time import sleep
from multiprocessing import Manager, Process, Lock
import statistics
import string

def foo(call_rate, lock, id):
    with lock:
        call_rate['total_time'] += time.time() - call_rate['last_call']
        call_rate['last_call'] = time.time()
        call_rate['total_calls'] += 1
        call_rate['rate'] = call_rate['total_time'] / call_rate['total_calls']
        print("{} entered, call rate {}".format(id, call_rate))
    # Mimic blocking of function
    sleep(1)

    output = 'result of some logic'

    # By doing this, we are ignoring the time spent within the function
    with lock:
        call_rate['last_call'] = time.time()
        print("{} exited, call rate {}".format(id, call_rate))
    return output


def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))


def worker(num, call_rate, lock):
    id = id_generator()
    for _ in range(num):
        # Mimic doing some logic before asking output
        sleep(1)

        foo(call_rate, lock, id)

def main():
    # Create a shared dictionary accessible by all processes
    m = Manager()
    lock = m.Lock()
    call_rate = m.dict({'rate': 0.0, 'total_time': 0.0, 'last_call': time.time(), 'total_calls': 0})

    w = []

    # Create 3 worker processes that run foo() thrice
    for i in range(2):
        w.append(Process(target=worker, args=(2, call_rate, lock, )))
        w[i].start()
    for i in range(2):
        w[i].join()

    return call_rate['rate']

if __name__ == '__main__':
    avgs = []
    for i in range(1):
        avgs.append(main())

    print("Highest is : {}".format(max(avgs)))
    print("Lowest is : {}".format(min(avgs)))
    print("Avergae is : {}".format(statistics.mean(avgs)))

注意在这段代码中,foo() 总是阻塞 1s.由于有两名工作人员在场,因此该速率应接近 0.5 秒.运行此代码:

Note that in this code, foo() always blocks for 1s. The rate should be close to 0.5s since there are two workers present. Running this code:

输出 #1:

XEC6AU entered, call rate {'rate': 1.1851444244384766, 'total_time': 1.1851444244384766, 'last_call': 1624950732.381014, 'total_calls': 1}
O43FUI entered, call rate {'rate': 0.6178374290466309, 'total_time': 1.2356748580932617, 'last_call': 1624950732.4325447, 'total_calls': 2}
XEC6AU exited, call rate {'rate': 0.6178374290466309, 'total_time': 1.2356748580932617, 'last_call': 1624950733.4327667, 'total_calls': 2}
O43FUI exited, call rate {'rate': 0.6178374290466309, 'total_time': 1.2356748580932617, 'last_call': 1624950733.4484024, 'total_calls': 2}
XEC6AU entered, call rate {'rate': 0.7401185035705566, 'total_time': 2.22035551071167, 'last_call': 1624950734.433083, 'total_calls': 3}
O43FUI entered, call rate {'rate': 0.558994710445404, 'total_time': 2.235978841781616, 'last_call': 1624950734.4487064, 'total_calls': 4}
XEC6AU exited, call rate {'rate': 0.558994710445404, 'total_time': 2.235978841781616, 'last_call': 1624950735.4333804, 'total_calls': 4}
O43FUI exited, call rate {'rate': 0.558994710445404, 'total_time': 2.235978841781616, 'last_call': 1624950735.4958992, 'total_calls': 4}
Highest is : 0.558994710445404
Lowest is : 0.558994710445404
Avergae is : 0.558994710445404

速率为 0.5 秒,这应该是预期的.请注意工作人员如何同时进入和退出功能.现在将函数阻塞时间从 1s 更改为 random.randint(1, 10),这就是我得到的:

The rate is 0.5s, which should be expected. Notice how both the workers enter and exit the functions simultaneously. Now after changing the function blocking time from 1s to random.randint(1, 10), this is what I get:

输出 #2

NHXAKF entered, call rate {'rate': 1.1722326278686523, 'total_time': 1.1722326278686523, 'last_call': 1624886294.4630196, 'total_calls': 1}
R2DD8H entered, call rate {'rate': 0.5939309597015381, 'total_time': 1.1878619194030762, 'last_call': 1624886294.478649, 'total_calls': 2}
NHXAKF exited, call rate {'rate': 0.5939309597015381, 'total_time': 1.1878619194030762, 'last_call': 1624886300.4648588, 'total_calls': 2}
NHXAKF entered, call rate {'rate': 0.7293914159138998, 'total_time': 2.188174247741699, 'last_call': 1624886301.465171, 'total_calls': 3}
R2DD8H exited, call rate {'rate': 0.7293914159138998, 'total_time': 2.188174247741699, 'last_call': 1624886302.4811018, 'total_calls': 3}
R2DD8H entered, call rate {'rate': 0.7971136569976807, 'total_time': 3.1884546279907227, 'last_call': 1624886303.4813821, 'total_calls': 4}
NHXAKF exited, call rate {'rate': 0.7971136569976807, 'total_time': 3.1884546279907227, 'last_call': 1624886304.4660738, 'total_calls': 4}
R2DD8H exited, call rate {'rate': 0.7971136569976807, 'total_time': 3.1884546279907227, 'last_call': 1624886307.4826, 'total_calls': 4}
Highest is : 0.7971136569976807
Lowest is : 0.7971136569976807
Avergae is : 0.7971136569976807

与以前不同,该比率几乎为 0.8.此外,两个工作人员也不再一起进入和退出该功能.这当然是由于一个阻塞的时间比另一个更长.但是因为它们不再同步,它们在不同的时间等待 1,而不是在 worker() 函数内一起等待.您甚至可以在 call_rate['total_time'] 中看到这一点.对于输出 #1,worker 是同步的,它是 ~2s,而对于输出 #2,它是 ~3s.因此,利率的差异.所以 0.8s 是在这种情况下工作人员调用 foo() 的真实速率,而不是假设的 0.5s.将速率乘以进程数会忽略这种细微差别.

The rate, unlike before, is almost 0.8. Moreover, both workers are no longer entering and exiting the function together either. This is ofcourse due to one blocking for a longer time than the other. But because they are no longer in sync, they are waiting for 1s at separate times instead of together inside of the worker() function. You can even see that in the call_rate['total_time']. For Output #1, where the workers are in sync, it is ~2s, while for Output #2 it is ~3s. And hence the difference in rates. So the 0.8s is the true rate of the workers calling foo() in this scenario, not the assumed 0.5s. Multiplying the rate by the number of processes would miss this nuance.