我被一个问题困住了,我想知道我是不是把什么代码写错了。应用程序每隔几秒钟轮询一次,并从表中抓取每条记录,该表的唯一目的是指示要对哪些记录进行操作。
请注意,我忽略了空间和可读性方面的错误处理代码
//Producing Thread, this is triggered every 5 seconds... UGH, I hate timers
foreach (var Record in GetRecordsFromDataBase()) // returns a dictionary
{
if (!ConcurrentDictionary.Contains(Record.Key))
ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
}
这段代码工作得很好,它可能/将多次选择相同的记录,直到对所述记录进行/处理,这一事实令人恼火。经过处理,每个选定的记录都被写入到自己新创建的唯一命名文件中。然后,为该记录的键调用一个存储过程,将其从数据库中移除,此时该特定的键将从ConcurrentDictionary中删除。
// Consuming Thread, located within another loop to allow
// the below code to continue to cycle until instructed
// to terminate
while (!ConcurrentDictionary.IsEmpty)
{
var Record = ConcurrentDictionary.Take(1).First();
WriteToNewFile(Record.Value);
RemoveFromDatabase(Record.Key);
ConcurrentDictionary.TryRemove(Record.Key);
}
对于吞吐量测试,我将20k+记录添加到表中,然后将应用程序松开。当我注意到22k+文件继续扩展到100k+领域时,我感到非常惊讶。
我做错什么了?我是否完全误解了并发字典的用途?我是不是忘了个分号?
发布于 2014-07-28 18:33:29
首先,消除要包含的调用。TryAdd已经检查了重复项,如果项目已经存在,则返回false。
foreach (var Record in GetRecordsFromDataBase()) // returns a dictionary
{
ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
}
我看到的下一个问题是,我不认为ConcurrentDictionary.Take(1).First()是从字典中获取条目的好方法,因为它不是原子的。我想你想用BlockingCollection()代替。它是专门为实现生产者-消费者模式而设计的。
最后,我认为您的问题与字典无关,而与数据库有关。字典本身是线程安全的,但是您的二分法与数据库不是原子的。假设记录A在数据库中。GetRecordsFromDataBase()提取它并将其添加到字典中。然后它开始处理记录A(我假设这是在另一个线程中)。然后,第一个循环再次调用GetRecordsFromDataBase()并再次获取记录A。同时,记录A被处理并从数据库中删除。但已经太迟了!GetRecordsFromDataBase()已经抓住它了!因此,初始循环在删除后再次将其添加到字典中。
我认为您可能需要获取要处理的记录,并将它们完全移到另一个表中。那样的话,他们就不会再被抓到了。在C#级别(而不是在数据库级别)这样做将是一个问题。或者,您不希望在处理记录时将记录添加到队列中。
发布于 2014-07-28 18:51:30
我做错什么了?
foreach ( add )循环试图将数据库中没有的任何记录添加到字典中。
while (remove)循环是从数据库中移除项,然后从字典中删除项,并将它们写入文件。
这个逻辑看起来是正确的。但是有一场比赛:
GetRecordsFromDataBase(); // returns records 1 through 10.
切换上下文以删除循环。
WriteToNewFile(Record.Value); // write record 5
RemoveFromDatabase(Record.Key); // remove record 5 from db
ConcurrentDictionary.TryRemove(Record.Key); // remove record 5 from dictionary
切换回添加循环
ConcurrentDictionary.TryAdd(Record.Key, Record.Value); // add record 5 even though it is not in the DB becuase it was part of the records returned by ConcurrentDictionary.TryAdd(Record.Key, Record.Value);;
删除项后,foreach循环再次添加它。这就是文件计数倍增的原因。
foreach (var Record in GetRecordsFromDataBase()) // returns a dictionary
{
if (!ConcurrentDictionary.Contains(Record.Key)) // this if is not required. try add will do.
ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
}
尝试如下所示:添加循环:
foreach (var Record in GetRecordsFromDataBase()) // returns a dictionary
{
if (ConcurrentDictionary.TryAdd(Record.Key, false)) // only adds the record if it has not been processed.
{
ConcurrentQueue.Enque(record) // enqueue the record
}
}
移除回路
var record;// you will need to specify the type
if (ConcurrentQueue.TryDequeue(record))
{
if (ConcurrentDictionary.TryUpdate(record.key,true,false)) // update the value from true to false
{
WriteToNewFile(Record.Value); // write record 5
RemoveFromDatabase(Record.Key); // remove record 5 from db
}
}
这将在字典中保留每个已处理记录的项。您最终可以将它们从字典中删除,但是涉及db的多线程处理可能很棘手。
https://stackoverflow.com/questions/25001506
复制相似问题