开发者

Is the result of itertools.tee() thread-safe (Python)

开发者 https://www.devze.com 2023-03-20 17:19 出处:网络
Suppose I have this Python code: from itertools import count, tee original = count()# just an example, can be another iterable

Suppose I have this Python code:

from itertools import count, tee
original = count()     # just an example, can be another iterable
a, b = tee(original)

The question is, will there be any problem if I start iterating "a" in one thread and, at the same time, iterating "b" in another thread? Clearly, a and b share some data (the original iterable, + some additional stuff, internal bu开发者_开发问答ffers or something). So, will a.next() and b.next() do the appropriate locking when they access this shared data?


Update! segfaults caused by tee have been fixed in late versions of python 2.7, 3.7, 3.8, and anything above. You still need to manage concurrent access yourself for thread safety, and you can use my solution below.

tl;dr

In CPython, `itertools.tee` is thread safe *if and only if* the original iterator is implemented in C/C++, i.e. doesn't use **any** python.

If an original iterator it was written in python, like a class instance or a generator, then itertools.tee(it) is not thread-safe. In the best case scenario you'll only get an exception (which you will), and in the worst python will crash.

Instead of using tee, here is a wrapper class and function that are thread-safe:

class safeteeobject(object):
    """tee object wrapped to make it thread-safe"""
    def __init__(self, teeobj, lock):
        self.teeobj = teeobj
        self.lock = lock
    def __iter__(self):
        return self
    def __next__(self):
        with self.lock:
            return next(self.teeobj)
    def __copy__(self):
        return safeteeobject(self.teeobj.__copy__(), self.lock)

def safetee(iterable, n=2):
    """tuple of n independent thread-safe iterators"""
    lock = Lock()
    return tuple(safeteeobject(teeobj, lock) for teeobj in tee(iterable, n))

I'll now expand (a lot) on when tee is and isn't thread-safe, and why.

Example where it's ok

Let's run some code (this is python 3 code, for python 2 use `itertools.izip` instead of `zip` to have same behavior):
>>> from itertools import tee, count
>>> from threading import Thread

>>> def limited_sum(it):
...     s = 0
...     for elem, _ in zip(it, range(1000000)):
...         s += elem
...     print(elem)

>>> a, b = tee(count())
>>> [Thread(target=limited_sum, args=(it,)).start() for it in [a, b]]
# prints 499999500000 twice, which is in fact the same 1+...+999999

itertools.count is written entirely in C in the file Modules/itertoolsmodule.c of the CPython project, so it works just fine.

The same is true for: lists, tuples, sets, range, dictionaries (keys, values and items), collections.defaultdict (keys, values and items), and a few others.

Example where it doesn't work - Generators

A very short example is using a generator:
>>> gen = (i for i in range(1000000))
>>> a, b = tee(gen)
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]

Exception in thread Thread-10:
Traceback (most recent call last):
  File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.4/threading.py", line 868, in run
    self._target(*self._args, **self._kwargs)
ValueError: generator already executing

Yes, tee is written in C, and it is true that that GIL executes one byte code at a time. But the above example shows that this is not enough to ensure thread safety. Somewhere along the line this is what happened:

  1. The two threads have called next on their tee_object instances the same amount of times,
  2. Thread 1 calls next(a),
  3. It needs to get a new element, so thread 1 now calls next(gen),
  4. gen is written in python. On, say, the first byte code of gen.__next__ CPython decides to switch threads,
  5. Thread 2 resumes and calls next(b),
  6. It needs to get a new element, so it calls next(gen)
  7. Since gen.__next__ is already running in thread 1, we get an exception.

Example where it doesn't work - Iterator object

Ok, maybe it's just not thread-safe to use generators inside `tee`. Then we run a variant of the above code that uses an iterator object:
>>> from itertools import tee
>>> from threading import Thread
>>> class countdown(object):
...     def __init__(self, n):
...         self.i = n
...     def __iter__(self):
...         return self
...     def __next__(self):
...         self.i -= 1
...         if self.i < 0:
...             raise StopIteration
...         return self.i
... 
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)

The above code crashes in python 2.7.13 and 3.6 (and probably all cpython versions), on Ubuntu, Windows 7 and OSX. I don't want to reveal the reason just yet, one more step before.

What if I use locks inside my iterator?

Maybe the above code crashes because our iterator itself was not thread-safe. Let's add a lock and see what happens:
>>> from itertools import tee
>>> from threading import Thread, Lock
>>> class countdown(object):
...     def __init__(self, n):
...         self.i = n
...         self.lock = Lock()
...     def __iter__(self):
...         return self
...     def __next__(self):
...         with self.lock:
...             self.i -= 1
...             if self.i < 0:
...                 raise StopIteration
...             return self.i
... 
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)

Adding a lock inside our iterator is not enough to make tee thread-safe.

Why tee is not thread-safe

The crux of the matter is the getitem method of teedataobject in the file Modules/itertoolsmodule.c of CPython. The implementation of tee is really cool, with an optimization that saves RAM calls: tee returns "tee objects", each of which saves a reference to a head teedataobject. These in turn are like links in a linked list, but instead of holding a single element - they hold 57. This isn't really important for our purposes, but it is what it is. Here is the getitem function of teedataobject:

static PyObject *
teedataobject_getitem(teedataobject *tdo, int i)
{
    PyObject *value;

    assert(i < LINKCELLS);
    if (i < tdo->numread)
        value = tdo->values[i];
    else {
        /* this is the lead iterator, so fetch more data */
        assert(i == tdo->numread);
        value = PyIter_Next(tdo->it);
        if (value == NULL)
            return NULL;
        tdo->numread++;
        tdo->values[i] = value;
    }
    Py_INCREF(value);
    return value;
}

When asked for an element, teedataobject checks if it has one prepared. If it does then it returns it. If it doesn't then it calls next on the original iterator. This is where, if the iterator is written in python, the code can hang. So here's the problem:

  1. Two threads have called next the same amount of times,
  2. Thread 1 calls next(a), and the C code gets to the PyIter_Next call above. On, say, the first byte code of next(gen), CPython decides to switch threads.
  3. Thread 2 calls next(b), and since it still needs a new element, the C code gets to the PyIter_Next call,

At this point both threads are in the same place, with the same values for i and tdo->numread. Note that tdo->numread is simply a variable to keep track of where in the 57-cells link the teedataobject should write to next.

  1. Thread 2 finishes its call to PyIter_Next and returns an element. At some point CPython decides to switch threads again,

  2. Thread 1 resumes, finishes its call to PyIter_Next, and then runs the two lines:

         tdo->numread++;
         tdo->values[i] = value;
    
  3. But thread 2 has already set tdo->values[i]!

This is already enough to show that tee is not thread-safe since we lose the value that thread 2 put in tdo->values[i]. But this doesn't explain the crashing.

Say i was 56. Since both threads call tdo->numread++, it now gets to 58 - above 57, the allocated size of tdo->values. After thread 1 moves on as well, the object tdo has no more references and is ready to be deleted. This is the clear function for teedataobject:

static int
teedataobject_clear(teedataobject *tdo)
{
    int i;
    PyObject *tmp;

    Py_CLEAR(tdo->it);
    for (i=0 ; i<tdo->numread ; i++)
        Py_CLEAR(tdo->values[i]); // <----- PROBLEM!!!
    tmp = tdo->nextlink;
    tdo->nextlink = NULL;
    teedataobject_safe_decref(tmp);
    return 0;
}

At the line marked "PROBLEM", CPython will try to clear tdo->values[57]. This is where the crash happens. Well, some of the times. There's more than one place for crashing, I just wanted to show one.

Now you know - itertools.tee is not thread safe.

One solution - External lock

Instead of locking inside our iterator's __next__, we can put a lock around tee.__next__. This means that the entire teedataobject.__getitem__ method will be called by a single thread each time. I gave a short implementation at the beginning of this answer. It is a drop-in replacement for tee that is thread-safe. The only thing it doesn't implement which tee does - is pickling. Since locks aren't picklable, it's not trivial to add this. But, of course, it can be done.


If the equivalent code shown in the documentation, here:

  • http://docs.python.org/library/itertools.html#itertools.tee

is correct, then no, it will not be thread-safe.

Note that though deque is documented to have thread-safe append and pop, it does not make any guarantees for the code that uses it.

Since the main code could end up asking the underlying iterator for elements on multiple threads, you need to have a thread-safe collection and iterator as input in order for tee to be safe.


In C-Python, itertools.tee() and the iterator it returns are implemented with C code. That means the GIL should protected it against being called simultaneously by multiple threads. It will probably work correctly, and it won't crash the interpreter, but it is not guaranteed to be thread safe.

Simply put, don't take the risk.


I would like to share my experience when using itertools.tee to split a large size plat text file into multiple csv files from/to s3 at Python 3.6.9 and 3.7.4 environment.

My data flow are from s3 zipfile, s3fs read iter, map iter for dataclass transform, tee iter, map iter for dataclass filter, loop over the iter and capture data and write to s3 in csv format with s3fs write and/or local write and s3fs put to s3.

The itertools.tee was failed on the zipfile process stack.

Above, Dror Speiser, safetee worked fine, but memory usage increased for any unbalances between tee object as dataset not good distribution or processing delays. Also, it was not properly work with multiprocessing-logging, might be related this bug: https://bugs.python.org/issue34410

Below code is just to add simple flow control in between tee object to prevent memory increment and OOM Killer situation.

Hope to be helpful for future reference.

import time
import threading
import logging
from itertools import tee
from collections import Counter

logger = logging.getLogger(__name__)


FLOW_WAIT_GAP = 1000  # flow gap for waiting
FLOW_WAIT_TIMEOUT = 60.0  # flow wait timeout


class Safetee:
    """tee object wrapped to make it thread-safe and flow controlled"""

    def __init__(self, teeobj, lock, flows, teeidx):
        self.teeobj = teeobj
        self.lock = lock
        self.flows = flows
        self.mykey = teeidx
        self.logcnt = 0

    def __iter__(self):
        return self

    def __next__(self):
        waitsec = 0.0
        while True:
            with self.lock:
                flowgap = self.flows[self.mykey] - self.flows[len(self.flows) - 1]
                if flowgap < FLOW_WAIT_GAP or waitsec > FLOW_WAIT_TIMEOUT:
                    nextdata = next(self.teeobj)
                    self.flows[self.mykey] += 1
                    return nextdata

            waitthis = min(flowgap / FLOW_WAIT_GAP, FLOW_WAIT_TIMEOUT / 3)
            waitsec += waitthis

            time.sleep(waitthis)

            if waitsec > FLOW_WAIT_TIMEOUT and self.logcnt < 5:
                self.logcnt += 1
                logger.debug(f'tee wait seconds={waitsec:.2f}, mykey={self.mykey}, flows={self.flows}')

    def __copy__(self):
        return Safetee(self.teeobj.__copy__(), self.lock, self.flows, self.teeidx)


def safetee(iterable, n=2):
    """tuple of n independent thread-safe and flow controlled iterators"""
    lock = threading.Lock()
    flows = Counter()
    return tuple(Safetee(teeobj, lock, flows, teeidx) for teeidx, teeobj in enumerate(tee(iterable, n)))


0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号