boost :: asio多线程异步接受并阻塞读/写服务器
||
我的想法是创建X个线程,使用KeepRunning方法运行该线程,该方法具有无限循环调用_io_service.run(),并在async_accept处理程序中使用_io_service.poll()接收到新连接时将任务发送到_io_service。
我使用以下代码运行服务器:
oh::msg::OHServer s(\"0.0.0.0\", \"9999\", 200);
ConsoleStopServer = boost::bind(&oh::msg::OHServer::Stop, &s);
SetConsoleCtrlHandler(bConsoleHandler, TRUE);
s.Run();
但是当我收到一个连接时,然后使用MsgWorker类中的阻止读/写操作在Post()方法中提供服务,则所有线程都被关闭。
我有下面的代码(这是来自http server3 asio示例和我的混合):
OHServer::OHServer(const std::string& sAddress, const std::string& sPort, std::size_t tps)
: _nThreadPoolSize(tps), _acceptor(_io_service), _sockClient(new boost::asio::ip::tcp::socket(_io_service))
{
// Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
boost::asio::ip::tcp::resolver resolver(_io_service);
boost::asio::ip::tcp::resolver::query query(sAddress, sPort);
boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
_acceptor.open(endpoint.protocol());
_acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
_acceptor.bind(endpoint);
_acceptor.listen();
_acceptor.async_accept(
*_sockClient,
boost::bind(
&OHServer::AcceptConnection,
this,
boost::asio::placeholders::error
)
);
}
void OHServer::KeepRunning()
{
global_stream_lock.lock();
std::cout << \"[\" << boost::this_thread::get_id()
<< \"] Thread Start\" << std::endl;
global_stream_lock.unlock();
while( true )
{
try
{
boost::system::error_code ec;
_io_service.run( ec );
if( ec )
{
global_stream_lock.lock();
std::cout << \"[\" << boost::this_thread::get_id()
<< \"] Error: \" << ec << std::endl;
global_stream_lock.unlock();
}
break;
}
catch( std::exception & ex )
{
global_stream_lock.lock();
std::cout << \"[\" << boost::this_thread::get_id()
<< \"] Exception: \" << ex.what() << std::endl;
global_stream_lock.unlock();
}
}
global_stream_lock.lock();
std::cout << \"[\" << boost::this_thread::get_id()
<< \"] Thread Finish\" << std::endl;
global_stream_lock.unlock();
}
void OHServer::Run()
{
// Create a pool of threads to run all of the io_services.
for (std::size_t i = 0; i < _nThreadPoolSize; ++i)
{
boost::shared_ptr<boost::thread> thread(new boost::thread(
boost::bind(&OHServer::KeepRunning, this)));
threads.push_back(thread);
}
cout << \"Hit enter to close server\" << endl;
cin.get();
}
void OHServer::Stop()
{
boost::system::error_code ec;
_acceptor.close(ec);
_sockClient->shutdown( boost::asio::ip::tcp::socket::shutdown_both, ec );
_sockClient->close( ec );
_io_service.stop();
// Wait for all threads in the pool to exit.
for (std::size_t i = 0; i < threads.size(); ++i)
{
threads[i]->join();
cout << \"threads[ \"<< i << \"]->join();\" << endl;
}
}
void OHServer::Post()
{
std::cout << \"Accepted new connection.\" << std::endl;
CMsgWorker *msgWorker = new CMsgWorker(_sockClient);
msgWorker->Start();
delete msgWorker;
}
void OHServer::AcceptConnection(const boost::system::error_code& e)
{
if (!e)
{
_io_service.post(boost::bind(&OHServer::Post, this));
_acceptor.async_accept(
*_sockClient,
boost::bind(
&OHServer::AcceptConnection,
this,
boost::asio::placeholders::error
)
);
}
}
我应该怎么做才能使线程仍在等待_io_service的工作?
谢谢你的帮助!
没有找到相关结果
已邀请:
2 个回复
古擅坛犯
有关如何执行此操作的想法,请参见此处的timer.cc示例:https://github.com/sean-/Boost.Examples/tree/master/asio/timer
扑北爱
TCPServer.hpp文件:
TCPServer.cpp文件:
修改使其与IPv6兼容也非常容易。 它已经过测试并且运行良好。只需复制并使用!