发布
社区首页 >问答首页 >在生产者/消费者ConcurrentDictionary C#中获取重复对象

在生产者/消费者ConcurrentDictionary C#中获取重复对象
EN

Stack Overflow用户
提问于 2014-07-28 18:08:36
回答 2查看 501关注 0票数 0

我被一个问题困住了,我想知道我是不是把什么代码写错了。应用程序每隔几秒钟轮询一次,并从表中抓取每条记录,该表的唯一目的是指示要对哪些记录进行操作。

请注意,我忽略了空间和可读性方面的错误处理代码

代码语言:javascript
代码运行次数:0
复制
    //Producing Thread, this is triggered every 5 seconds... UGH, I hate timers

    foreach (var Record in GetRecordsFromDataBase())  // returns a dictionary
    {
        if (!ConcurrentDictionary.Contains(Record.Key))
            ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
    }

这段代码工作得很好,它可能/将多次选择相同的记录,直到对所述记录进行/处理,这一事实令人恼火。经过处理,每个选定的记录都被写入到自己新创建的唯一命名文件中。然后,为该记录的键调用一个存储过程,将其从数据库中移除,此时该特定的键将从ConcurrentDictionary中删除。

代码语言:javascript
代码运行次数:0
复制
    // Consuming Thread, located within another loop to allow
    // the below code to continue to cycle until instructed
    // to terminate

    while (!ConcurrentDictionary.IsEmpty)
    {
        var Record = ConcurrentDictionary.Take(1).First();
        WriteToNewFile(Record.Value);
        RemoveFromDatabase(Record.Key);
        ConcurrentDictionary.TryRemove(Record.Key);
    }

对于吞吐量测试,我将20k+记录添加到表中,然后将应用程序松开。当我注意到22k+文件继续扩展到100k+领域时,我感到非常惊讶。

我做错什么了?我是否完全误解了并发字典的用途?我是不是忘了个分号?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2014-07-28 18:33:29

首先,消除要包含的调用。TryAdd已经检查了重复项,如果项目已经存在,则返回false。

代码语言:javascript
代码运行次数:0
复制
foreach (var Record in GetRecordsFromDataBase())  // returns a dictionary
{
        ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
}

我看到的下一个问题是,我不认为ConcurrentDictionary.Take(1).First()是从字典中获取条目的好方法,因为它不是原子的。我想你想用BlockingCollection()代替。它是专门为实现生产者-消费者模式而设计的。

最后,我认为您的问题与字典无关,而与数据库有关。字典本身是线程安全的,但是您的二分法与数据库不是原子的。假设记录A在数据库中。GetRecordsFromDataBase()提取它并将其添加到字典中。然后它开始处理记录A(我假设这是在另一个线程中)。然后,第一个循环再次调用GetRecordsFromDataBase()并再次获取记录A。同时,记录A被处理并从数据库中删除。但已经太迟了!GetRecordsFromDataBase()已经抓住它了!因此,初始循环在删除后再次将其添加到字典中。

我认为您可能需要获取要处理的记录,并将它们完全移到另一个表中。那样的话,他们就不会再被抓到了。在C#级别(而不是在数据库级别)这样做将是一个问题。或者,您不希望在处理记录时将记录添加到队列中。

票数 0
EN

Stack Overflow用户

发布于 2014-07-28 18:51:30

我做错什么了?

foreach ( add )循环试图将数据库中没有的任何记录添加到字典中。

while (remove)循环是从数据库中移除项,然后从字典中删除项,并将它们写入文件。

这个逻辑看起来是正确的。但是有一场比赛:

代码语言:javascript
代码运行次数:0
复制
GetRecordsFromDataBase(); // returns records 1 through 10.

切换上下文以删除循环。

代码语言:javascript
代码运行次数:0
复制
    WriteToNewFile(Record.Value);    // write record 5
    RemoveFromDatabase(Record.Key);  // remove record 5 from db
    ConcurrentDictionary.TryRemove(Record.Key); // remove record 5 from dictionary

切换回添加循环

代码语言:javascript
代码运行次数:0
复制
 ConcurrentDictionary.TryAdd(Record.Key, Record.Value); // add record 5 even though it is not in the DB becuase it was part of the records returned by ConcurrentDictionary.TryAdd(Record.Key, Record.Value);;

删除项后,foreach循环再次添加它。这就是文件计数倍增的原因。

代码语言:javascript
代码运行次数:0
复制
foreach (var Record in GetRecordsFromDataBase())  // returns a dictionary
    {
        if (!ConcurrentDictionary.Contains(Record.Key)) // this if is not required. try add will do.
            ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
    }

尝试如下所示:添加循环:

代码语言:javascript
代码运行次数:0
复制
   foreach (var Record in GetRecordsFromDataBase())  // returns a dictionary
            {
               if (ConcurrentDictionary.TryAdd(Record.Key, false)) // only adds the record if it has not been processed.
               {
                   ConcurrentQueue.Enque(record) // enqueue the record
               } 
            }

移除回路

代码语言:javascript
代码运行次数:0
复制
var record;//   you will need to specify the type

    if (ConcurrentQueue.TryDequeue(record))
    {
         if (ConcurrentDictionary.TryUpdate(record.key,true,false)) // update the value from true to false
         {
            WriteToNewFile(Record.Value);    // write record 5
            RemoveFromDatabase(Record.Key);  // remove record 5 from db
         }
    }

这将在字典中保留每个已处理记录的项。您最终可以将它们从字典中删除,但是涉及db的多线程处理可能很棘手。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/25001506

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档