BlockingQueue
线程安全的队列, 作为caffe
训练时数据同步的重要数据结构,本文做简要分析。
template<typename T>
class BlockingQueue {
public:
explicit BlockingQueue();
void push(const T& t);
bool try_pop(T* t);// non-blocking
// This logs a message if the threads needs to be blocked
// useful for detecting e.g. when data feeding is too slow
T pop(const string& log_on_wait = "");
bool try_peek(T* t);
// Return element without removing it
T peek();
size_t size() const;
protected:
class sync; // class froward decalration
std::queue<T> queue_;
shared_ptr<sync> sync_;
DISABLE_COPY_AND_ASSIGN(BlockingQueue);
};
线程同步的条件变量:
template<typename T>
class BlockingQueue<T>::
sync {
public:
mutable boost::mutex mutex_;
boost::condition_variable condition_;
};
构造函数:
template<typename T>
BlockingQueue<T>::BlockingQueue()
: sync_(new sync()) {
}
下面仅仅给出阻塞pop与非阻塞的try_pop,说明BlockingQueue的使用:
template<typename T>
bool BlockingQueue<T>::try_pop(T* t) {
boost::mutex::scoped_lock lock(sync_->mutex_);
if (queue_.empty()) {
return false; // 立即返回
}
*t = queue_.front();
queue_.pop();
return true;
}
template<typename T>
T BlockingQueue<T>::pop(const string& log_on_wait) {
boost::mutex::scoped_lock lock(sync_->mutex_);
while (queue_.empty()) {
if (!log_on_wait.empty()) {
LOG_EVERY_N(INFO, 1000)<< log_on_wait;
}
sync_->condition_.wait(lock); //阻塞等待条件变量
}
T t = queue_.front();
queue_.pop();
return t;
}
模板特化:
template class BlockingQueue<Batch<float>*>;
template class BlockingQueue<Batch<double>*>;
template class BlockingQueue<Datum*>;
template class BlockingQueue<shared_ptr<DataReader::QueuePair> >;
其中:
template <typename Dtype>
class Batch {
public:
Blob<Dtype> data_, label_;
};
DataLayer中使用线程读取Batch(image,label)push到队列中,然后pop出来前向传播:
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::Forward_cpu(
const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
Batch<Dtype>* batch = prefetch_full_.pop("Data layer prefetch queue empty");
// Reshape to loaded data.
top[0]->ReshapeLike(batch->data_);
// Copy the data
caffe_copy(batch->data_.count(), batch->data_.cpu_data(),
top[0]->mutable_cpu_data());
if (this->output_labels_) {
// Reshape to loaded labels.
top[1]->ReshapeLike(batch->label_);
// Copy the labels.
caffe_copy(batch->label_.count(), batch->label_.cpu_data(),
top[1]->mutable_cpu_data());
}
prefetch_free_.push(batch);
}
BlockingQueue成员:
template <typename Dtype>
class BasePrefetchingDataLayer :
public BaseDataLayer<Dtype>, public InternalThread {
//.......
protected:
Batch<Dtype> prefetch_[PREFETCH_COUNT];
BlockingQueue<Batch<Dtype>*> prefetch_free_;
BlockingQueue<Batch<Dtype>*> prefetch_full_;
};
caffe
系列源码分析介绍本系列深度学习框架caffe
源码分析主要内容如下:
caffe源码分析-cmake 工程构建主要内容:
自己从头构建一遍工程,这样能让我更好的了解大型的项目的构建。当然原始的caffe的构建感觉还是比较复杂(主要是cmake),我这里仅仅使用cmake构建,而且简化点,当然最重要的是支持CLion直接运行调试(如果需要这个工程可以评论留下你的邮箱,我给你发送过去)。
SyncedMemory
, 以及类Blob
数据传输的媒介.主要内容:
其中Blob
分析给出了其直接与opencv的图片相互转化以及操作,可以使得我们更好的理解Blob
.
layer
的源码分析,包括从整体上说明了layer
类别以及其proto定义与核心函数.内容如下:
首先分析了最简单的layer
Relu
,然后在是inner_product_layer全连接层
, 最后是layer_factory
caffe中 以此工厂模式create各种Layer.
内容如下:
内容如下:
内容如下:
caffe c++示例(mnist 多层感知机c++训练,测试)
类似与caffe
一样按照layer、solver、loss、net
等模块构建的神经网络实现可以见下面这篇blog,相信看懂了这个python的代码理解caffe框架会更简单点.
最后如果需要**cmake
** + CLion
**直接运行调试**caffe
**的代码工程,可以评论留下你的邮箱,我给你发送过去.**