开发者

Limiting concurrency and rate for Python threads

开发者 https://www.devze.com 2023-04-08 04:01 出处:网络
Given a number threads I want to limit the rate of calls to the worker function to a rate of say one per second.

Given a number threads I want to limit the rate of calls to the worker function to a rate of say one per second.

My idea was to keep track of the last time a call was made across all threads and compare this to the current time in each thread. Then if current_time - last_time < rate. I let the thread sleep for a bit. Something is wrong with my implementation - I presume I may have gotten the wrong idea about how locks work.

My code:

from Queue import Queue
from threading import Thread, Lock, RLock
import time

num_worker_threads = 2
rate = 1
q = Queue()
lock = Lock()
last_time = [time.time()]

def do_work(i, idx):
    # Do work here, print is just a dummy.
    print('Thread: {0}, Item: {1}, Time: {2}'.format(i, idx, time.time()))

def worker(i):
    while True:
        lock.acquire()
        current_time = time.time()
        interval = current_time - last_time[0]
        last_time[0] = current_time
        if interval < rate:
            time.sleep(rate - interval)
        lock.release()
        item = q.get()
        do_work(i, item)
        q.task_done()

for i in range(num_worker_threads):
     t = Thread(target=worker, args=[i])
     t.daemon = True
     t.start()

for item in xrange(10):
    q.put(item)

q.join()

I was expecting to see one call per second to do_work, however, I get mostly 2 calls at the same time (1 for each thread), followed by a one second pause. What is wrong?


Ok, some edit. The advice to simply throttle the rate at which items are put in the queue was good, however I remembered that I had to take care of the case in which items are re-added to the queue by the workers. Canonical example: pagination or backing-off-retry in network tasks. I came up with the following. I guess that for actual network tasks eventlet/gevent libraries may be easier on resources but this is just an example. It basically uses a priority queue to pile up the requests and uses an extra thread to shovel items from the pile to the actual task queue at an even rate. I simulated re-insertion into the pile by the workers, re-inserted items are then treated first.

import sys
import os
import time
import random

from Queue import Queue, PriorityQueue
from threading import Thread

rate = 0.1

def worker(q, q_pile, idx):
    while True:
        item = q.get()
        print("Thread: {0} processed: {1}".format(item[1], idx))
        if random.random() > 0.3:
            print("Thread: {1} reinserting item: {0}".format(item[1], idx))
            q_pile.put((-1 * time.time(), item[1]))
开发者_运维技巧        q.task_done()

def schedule(q_pile, q):
    while True:
        if not q_pile.empty():
            print("Items on pile: {0}".format(q_pile.qsize()))
            q.put(q_pile.get())
            q_pile.task_done()
        time.sleep(rate)

def main():

    q_pile = PriorityQueue()
    q = Queue()

    for i in range(5):
        t = Thread(target=worker, args=[q, q_pile, i])
        t.daemon = True
        t.start()

    t_schedule = Thread(target=schedule, args=[q_pile, q])
    t_schedule.daemon = True
    t_schedule.start()

    [q_pile.put((-1 * time.time(), i)) for i in range(10)]
    q_pile.join()
    q.join()

if __name__ == '__main__':
    main()


I get mostly 2 calls at the same time (1 for each thread), followed by a one second pause. What is wrong?

That's exactly what you should expect from your implementation. Lets say the time t starts at 0 and the rate is 1:

Thread1 does this:

    lock.acquire() # both threads wait here, one gets the lock
    current_time = time.time() # we start at t=0
    interval = current_time - last_time[0] # so interval = 0
    last_time[0] = current_time # last_time = t = 0
    if interval < rate: # rate = 1 so we sleep
        time.sleep(rate - interval) # to t=1
    lock.release() # now the other thread wakes up
    # it's t=1 and we do the job

Thread2 does this:

    lock.acquire() # we get the lock at t=1 
    current_time = time.time() # still t=1
    interval = current_time - last_time[0] # interval = 1
    last_time[0] = current_time
    if interval < rate: # interval = rate = 1 so we don't sleep
        time.sleep(rate - interval)
    lock.release() 
    # both threads start the work around t=1

My advice is to limit the speed at which the items are put into the queue.


It seems weird to me to try and limit the rate across multiple threads. If you limit each thread independently you can avoid all the locking nonsense.

Just a guess, but I think you want to set last_time[0] to time.time() (not current_time) after the sleep.

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号