我想实现一个流,目的是把数据从从一个地方搬运到另一个地方,实现类似cin的效果,
比如在两个线程中互相传递数据
thread1 ostream<<"Data"<<std::endl;
thread2 istream>>data;
目前有想法,是实现一个streambuf,但是同步出现了问题,不知道怎么搞
//template <typename T>
using T = bool;
using _Elem = char;
using _Traits = std::char_traits<_Elem>;
using _Alloc = std::allocator<_Elem>;
class PromiseAFuture {
public:
std::promise<T> Pro;
std::future<T> ip;
std::mutex lock;
std::atomic_bool canset = false;
PromiseAFuture() {
Pro = std::promise<T>();
ip = Pro.get_future();
canset = true;
}
bool set(T it) {
if (canset)
{
Pro.set_value(it);
canset = false;
return true;
}
return false;
}
template <class _Rep, class _Period>
std::future_status wait_for(T& getc, const std::chrono::duration<_Rep, _Period>& _Rel_time) {
auto icp = ip.wait_for(_Rel_time);
if (icp == std::future_status::ready)
{
getc = ip.get();
Pro = std::promise<T>();
ip = Pro.get_future();
canset = true;
}
return icp;
}
};
//template<class _Elem, class _Traits = std::char_traits<_Elem>, class _Alloc = std::allocator<_Elem>>
class Syncstreambuf : public std::basic_streambuf<_Elem, _Traits> {
public:
using char_type = _Elem;
using traits_type = _Traits;
using int_type = typename _Traits::int_type;
using off_type = typename _Traits::off_type;
using Buftype = std::basic_streambuf<_Elem, _Traits>;
using StringTpye = std::vector<_Elem, _Alloc>;
private:
StringTpye buffer;//输出区与输入区公用
std::mutex lock;
PromiseAFuture needallread;//需要输入区全部被读
PromiseAFuture needallwriter;//需要输出区被写入
public:
~Syncstreambuf()override {
}
protected:
//输入区已经读到结尾
int_type underflow()override {
std::unique_lock<std::mutex> lk(lock);
if (gptr() == epptr()) {
//输入区已经读到输出区结尾 需要输出区更新(需要输出区被写入)
auto statc = std::future_status::timeout; bool var = false;
while (statc == std::future_status::timeout) {
lk.unlock();
//等待 输出区被重新写入
statc = needallwriter.wait_for(var,1ms);
lk.lock();
}
//输入区指针重新设置 当前已经读到结尾 输入区当前指针设置为buf开始 结束指针为输出区当前指针
setg(&buffer.front(), &buffer.front(), pptr());
}
else {
while ((pptr() - gptr()) <= 0) {
//等待输出区pptr移动 (输出区更新)
lk.unlock();
std::this_thread::sleep_for(1ms);
lk.lock();
}
//输入区指针重新设置
setg(&buffer.front(), gptr(), pptr());
}
// 输入区已全部被读
needallread.set(true);
return *gptr();
}
std::streamsize xsputn(const char_type* s, std::streamsize count)override {
std::unique_lock<std::mutex> lk(lock);
const char_type* __s = s;
std::streamsize __i = 0;
std::streamsize __n = count;
while (__i < __n)
{
if (pptr() >= epptr())
{
//输出区满,等待输入区
auto statc = std::future_status::timeout; bool vaar = false;
while (statc == std::future_status::timeout) {
lk.unlock();
//等待 输入区全部被读
statc = needallread.wait_for(vaar,1ms);
lk.lock();
}
//输出区 设置为buf开始 重新写入
setp(&buffer.front(), &buffer.back());
}
else
{
std::streamsize __chunk_size = std::min(epptr() - pptr(), __n - __i);
traits_type::copy(pptr(), __s, __chunk_size);
pbump(__chunk_size);
__s += __chunk_size;
__i += __chunk_size;
//输出区指针已更新
needallwriter.set(true);
}
}
return __i;
}
int_type overflow(int_type ch)override {
std::unique_lock<std::mutex> lk(lock);
auto statc = std::future_status::timeout; bool var = false;
//输出区满,等待输入区
while (statc == std::future_status::timeout) {
lk.unlock();
statc = needallread.wait_for(var,1ms);
lk.lock();
}
//输出区 设置为buf开始 重新写入
setp(&buffer.front(), &buffer.back());
*pptr() = ch;
pbump(1);
//输出区指针已更新
needallwriter.set(true);
return 100;//不为eof即可
}
private:
public:
inline const StringTpye& CurrentMemory() const { return buffer; }
Syncstreambuf(size_t buffersize)
{
buffer.resize(buffersize, 0);
Buftype::setg(&buffer.front(), &buffer.front(), &buffer.front());//输入区设置
Buftype::setp(&buffer.front(), &buffer.back());//输出区设置
}
};
int main() {
std::locale::global(std::locale(""));
Syncstreambuf bufferc(4);
std::thread thoutput([&bufferc]() {
std::istream iut(&bufferc);
while (true)
{
std::string sadasd;
iut >> sadasd;
std::cout << "输出::" << sadasd.length() << ";context:" << sadasd << std::endl;
}
}
);
std::thread thread([&bufferc]() {
std::ostream out(&bufferc);
std::string sadasd = "1234567890TESTZXCVBNMPOL12345";
while (true) {
std::cout << "输入::" << sadasd.length() << ";context:" << sadasd <<
std::endl;
out << sadasd << std::endl;
std::this_thread::sleep_for(1200.0ms);
}
}
);
thread.detach();
thoutput.join();
}
有没有大佬帮忙一下,困扰几天了
相似问题