Here's the issue: I have a thread which runs a for-loop reading from a generator, doing some processing on that data, etc.. The generator always has data coming in, so no StopIteration
exception is ever raised by it. I would like to stop this thread (cleanly) from the main thread (i.e., exit out of the for-loop which is processing data from the generator). Below is an example of the above scenario, with the correct result, but in the limited sense I'll describe below:
import threading
import time
import random
def add():
r = random.Random()
i = 0
while True:
sleep_time = r.randint(0, 3)
time.sleep(sleep_time)
yield i
i = i + 1
class Test(object):
def __init__(self):
self.func = add
self.stopped = False
def stop(sel开发者_运维技巧f):
self.stopped = True
def run(self):
self.generator = self.func()
for x in self.generator:
print x
if self.stopped is True:
break
print 'DONE'
tester = Test()
thread = threading.Thread(target=tester.run)
thread.daemon = True
thread.start()
time.sleep(10)
print 'Stopping thread'
tester.stop()
print 'Complete, but should stop immediately!'
Now, while this works in the above example (obviously the above doesn't prevent race conditions on self.stopped
, but that's not the problem at hand so I left that code out), the problem I have is that the generator in my real code does not always have data immediately, so there can be a long pause between when self.stopped
is set and the break
statement is actually executed. So, the gist of my problem is that I would like to be able to cleanly exit out of the for-loop as soon as possible, rather than waiting for data from the generator before being able to exit, and obviously the above solution does not do that.
Is there any hope? It's a pretty out-there problem, which likely has no clean solution, but any help would be greatly appreciated.
EDIT: To clarify, in my real application I have a generator (let's denote it as G) which grabs data from a kernel driver. This data is to be sent out to a server, but while the socket is attempting to connect to the server (which may not always be running) I want to process the data from the driver (once connected this processing does not occur). So I launched a thread to grab data from G (and process it) while the main thread attempts to connect to the server. Once connected, ideally the following should occur:
I pause the execution of G, exit the thread, and pass the same G instance to another function which sends the data straight to the server.
From the answers/comments below, I believe this is impossible without destroying G, because there is no way to cleanly pause a currently executing generator.
Sorry for the confusion.
Sounds like what you really want is a coroutine, not a generator. See David Beazley's mind-bending A Curious Course on Coroutines and Concurrency, which, while being more information than you require and then some, should give you some clarity on what you're trying to do.
Couldn't you just 'close' the generator ? Doing something like
def stop(self):
self.generator.close()
def run(self):
self.generator = self.func()
try:
for x in self.generator:
print x
time.sleep(1)
except GeneratorExit:
pass
print 'DONE'
You need the self:generator to have a timeout capability. Conceptually
wait(1 sec);
rather than just
wait();
I don't know if that's possible (show us your generator code). For example if you were reading from a pipe or a socket don't code
giveMeSomeBytes( buffer); // wait indefinately
code
giveMeSomeBytesOrTimeout( buffer, howLongToWait); // wait for a while and
// then go see if we should dies
First, generators are probably a red herring; don't worry about them.
The canonical way to solve this kind producer-consumer problem in Python is using the built-in queue
module. It acts as an intermediary, allowing your producer thread to keep grabbing/processing data from the kernel into the queue, and your consumer thread to send queue data to the server, without their respective blocking I/O calls interfering with one another.
Here's a sketch of the basic idea, without the details filled in:
from queue import Queue
class Application(object):
def __init__(self):
self.q = Queue()
self.running = False
# From kernel to queue
def produce(self):
while self.running:
data = read_from_kernel()
self.q.put(data)
# From queue to server
def consume(self):
while self.running:
data = self.q.get()
send_to_server(data)
# Start producer thread, then consume
def run():
try:
self.running = True
producer = Thread(target=self.produce)
producer.start()
self.consume()
finally:
self.running = False
If self.running
is set to False, the above code's produce
method will still block inside the read_from_kernel
until its next return before exiting itself, but there's little Python can do about that. Whatever system call you use must support this somehow: if it's an actual read
, for example, your options would include:
- A short timeout, plus retry handling
- Non-blocking I/O (but in this case you might want to investigate a framework based around this, like Twisted Python)
精彩评论