开发者

MultiThreading: limit the concurrent threads

开发者 https://www.devze.com 2023-03-06 00:37 出处:网络
I need to develop an app that is using multithreading. Basicly, I have a DataTable that contains around 200k rows.

I need to develop an app that is using multithreading.

Basicly, I have a DataTable that contains around 200k rows. From each row, I need to take a field, compare it to a webpage, and then remove it from the datatable.

The thing is, the server serving those pages has a limit on concurrent requests. so at max I can ask for 3 pages at the same time.

I want to do this by using the threadpool, I even managed building a simple app that does that ( locks the datatable ) but I couldn't limit the concurrent threads ( even with SetMaxThreads ) it seems like it just ignored the limit.

does anyone have something ready made that does something similar ? I would love to see.

i have tried using semaphores, but got into problems:

        static SemaphoreSlim _sem = new SemaphoreSlim(3);    // Capacity of 3
    static List<string> records = new List<string>();

    static void Main()
    {
        records.Add("aaa");
        records.Add("bbb");
        records.Add("ccc");
        records.Add("ddd");
        records.Add("eee");
        records.Add("fff");
        records.Add("ggg");
        records.Add("iii");
        records.Add("jjj");

        for (int i = 0; i < records.Count; i++ )
        {
            new Thread(ThreadJob).Start(records[i]);
        }

        Console.WriteLine(records.Count);
        Console.ReadLine();
    }

    static void ThreadJob(object id)
    {
        Console.WriteLine(id + " wants to enter");
        _sem.Wait();
        Console.WriteLine(id + " is in!");           // Only three threads
        //Thread.Sleep(1000 * (int)id);               // can be here at
        Console.WriteLine(id + " is leaving");       // a time.

        lock (records)
        {
            records.Remove((string)id);
        }

        _sem.Release();
    }

this runs quite nicely, the only problem is,

Console.WriteLine(records.count);

returns diffrent results. even due i understand that it happens since not all the threads have finished (开发者_运维百科 an i a m calling the records.count before all records have been removed) i couldnt find how to wait for all to finish.


To wait for multiple threads to finish, you can use multiple EventWaitHandle's and then call WaitHandle.WaitAll to block the main thread until all events are signalled:

// we need to keep a list of synchronization events
var finishEvents = new List<EventWaitHandle>();

for (int i = 0; i < records.Count; i++ )
{
    // for each job, create an event and add it to the list
    var signal = new EventWaitHandle(false, EventResetMode.ManualReset);
    finishEvents.Add(signal);

    // we need to catch the id in a separate variable
    // for the closure to work as expected
    var id = records[i];

    var thread = new Thread(() =>
        {
            // do the job
            ThreadJob(id);

            // signal the main thread
            signal.Set();
        });
}

WaitHandle.WaitAll(finishEvents.ToArray());

Since most of these threads would end up suspended most of the time, it would be better to use ThreadPool in this case, so you can replace new Thread with:

    ThreadPool.QueueUserWorkItem(s =>
    {
        ThreadJob(id);
        signal.Set();
    });

When you are done with the events, don't forget to Dispose them:

foreach (var evt in finishEvents)
{
    evt.Dispose();
}

[Edit]

To put it all in one place, here is what your example code should look like:

static Semaphore _sem = new Semaphore(3, 3);    // Capacity of 3
static List<string> _records = new List<string>(new string[] { "aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg", "hhh" });

static void Main()
{
    var finishEvents = new List<EventWaitHandle>();

    for (int i = 0; i < _records.Count; i++)
    {
        var signal = new EventWaitHandle(false, EventResetMode.ManualReset);
        finishEvents.Add(signal);

        var id = _records[i];
        var t = new Thread(() =>
        {
            ThreadJob(id);
            signal.Set();
        });

        t.Start();
    }

    WaitHandle.WaitAll(finishEvents.ToArray());

    Console.WriteLine(_records.Count);
    Console.ReadLine();
}

static void ThreadJob(object id)
{
    Console.WriteLine(id + " wants to enter");
    _sem.WaitOne();

    Console.WriteLine(id + " is in!");
    Thread.Sleep(1000);
    Console.WriteLine(id + " is leaving");

    lock (_records)
    {
        _records.Remove((string)id);
    }

    _sem.Release();
}

(note that I've used Semaphore instead of SemaphoreSlim because I don't have .NET 4 on this machine and I wanted to test the code before updating the answer)


Why not use the Parallel Extensions - That would make things a lot easier.

Anyway, what you probably want to look at is something like Semaphores. I wrote a blog post on this subject a month or two back that you might find useful: https://colinmackay.scot/2011/03/30/using-semaphores-to-restrict-access-to-resources/


you can use Semaphore if you are under .net 3.5

or

SemaphoreSlim in .net 4.0


First, should Console.WriteLine(id + " is leaving"); not be a bit later, after the lock and just before it releases the semaphore?

As to the actual waiting for all of the threads to finish, Groo's answer looks better and more robust in the long term, but as a quicker/simpler solution to this specific piece of code, I think you could also get away with just calling .Join() on all of the threads you want to wait for, in sequence.

static List<Thread> ThreadList = new List<Thread>(); // To keep track of them

then when starting the threads, replace the current new Thread line with:

ThreadList.Add(new Thread(ThreadJob).Start(records[i]));

and then just before the Console.WriteLine:

foreach( Thread t in ThreadList )
{
    t.Join();
}

This will lock up if any of the threads don't terminate though, and if you ever want to know -which- threads haven't finished, this method won't work.

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号