开发者

Does multiprocessing.Queue work with gevent? [duplicate]

开发者 https://www.devze.com 2023-04-07 01:37 出处:网络
This question already has answers here: Passing a multiprocessing queue/dictionary/etc.. to green threads
This question already has answers here: Passing a multiprocessing queue/dictionary/etc.. to green threads (3 answers) Closed 3 years ago.

An开发者_开发知识库yone know what is wrong with this code? It simply "loads" forever. No output. "Sites" is a list of a few dozen strings.

num_worker_threads = 30

def mwRegisterWorker():
    while True:
        try:
            print q.get()
        finally:
            pass

q = multiprocessing.JoinableQueue()
for i in range(num_worker_threads):
     gevent.spawn(mwRegisterWorker)

for site in sites:
    q.put(site)

q.join()  # block until all tasks are done


gevent.spawn() creates greenlets not processes (even more: all greenlets run in a single OS thread). So multiprocessing.JoinableQueue is not appropriate here.

gevent is based on cooperative multitasking i.e, until you call a blocking function that switches to gevent's event loop other greenlets won't run. For example conn below uses patched for gevent socket methods that allow other greenlets to run while they wait for a reply from the site. And without pool.join() that gives up control to the greenlet that runs the event loop no connections will be made.

To limit concurrency while making requests to several sites you could use gevent.pool.Pool:

#!/usr/bin/env python
from gevent.pool import Pool
from gevent import monkey; monkey.patch_socket()
import httplib # now it can be used from multiple greenlets

import logging
info = logging.getLogger().info

def process(site):
    """Make HEAD request to the `site`."""
    conn = httplib.HTTPConnection(site)
    try:
        conn.request("HEAD", "/")
        res = conn.getresponse()
    except IOError, e:
        info("error %s reason: %s" % (site, e))
    else:
        info("%s %s %s" % (site, res.status, res.reason))
    finally:
        conn.close()

def main():
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(msg)s")

    num_worker_threads = 2
    pool = Pool(num_worker_threads)    
    sites = ["google.com", "bing.com", "duckduckgo.com", "stackoverflow.com"]*3
    for site in sites:
        pool.apply_async(process, args=(site,))
    pool.join()

if __name__=="__main__":
   main()


Use gevent.queue.JoinableQueue instead. Green threads (gevent internally uses it) are neither threads nor process, but coroutine w/ user-level scheduling.

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号