boost::asio async_receive_from UDP endpoint shared between threads?(boost::asio async_receive_from UDP 端点在线程之间共享?)
问题描述
Boost asio 特别允许多个线程调用 io_service 上的 run() 方法.这似乎是创建多线程 UDP 服务器的好方法.但是,我遇到了一个我正在努力寻找答案的障碍.
Boost asio specifically allows multiple threads to call the run() method on an io_service. This seems like a great way to create a multithreaded UDP server. However, I've hit a snag that I'm struggling to get an answer to.
看一个典型的 async_receive_from 调用:
Looking at a typical async_receive_from call:
m_socket->async_receive_from(
boost::asio::buffer(m_recv_buffer),
m_remote_endpoint,
boost::bind(
&udp_server::handle_receive,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
远程端点和消息缓冲区不会传递给处理程序,而是处于更高的范围级别(在我的示例中为成员变量).UDP 消息到达时处理它的代码如下所示:
The remote endpoint and message buffer are not passed through to the handler, but are at a higher scope level (member variable in my example). The code to handle the UDP message when it arrives will look something like:
void dns_server::handle_receive(const boost::system::error_code &error, std::size_t size)
{
// process message
blah(m_recv_buffer, size);
// send something back
respond(m_remote_endpoint);
}
如果有多个线程在运行,同步是如何工作的?在线程之间共享单个端点和接收缓冲区意味着 asio 在消息同时到达的情况下在另一个线程中调用处理程序之前等待处理程序在单个线程中完成.这似乎否定了允许多个线程首先调用 run 的意义.
If there are multiple threads running, how does the synchronisation work? Having a single end point and receive buffer shared between the threads implies that asio waits for a handler to complete within a single thread before calling the handler in another thread in the case that a message arrived in the meantime. That seems to negate the point of allowing multiple threads to call run in the first place.
如果我想获得请求的并发服务,看起来我需要将工作包连同端点的副本一起交给一个单独的线程,允许处理程序方法立即返回,以便 asio 可以得到on 并将另一条消息并行传递给另一个调用 run() 的线程.
If I want to get concurrent serving of requests, it looks like I need to hand off the work packets, along with a copy of the end point, to a separate thread allowing the handler method to return immediately so that asio can get on and pass another message in parallel to another one of the threads that called run().
这似乎有点令人讨厌.我在这里错过了什么?
That seems more than somewhat nasty. What am I missing here?
推荐答案
在线程之间共享一个端点和接收缓冲区意味着 asio 等待处理程序在单个线程内完成
Having a single end point and receive buffer shared between the threads implies that asio waits for a handler to complete within a single thread
如果您的意思是使用单线程运行服务时",那么这是正确的.
If you mean "when running the service with a a single thread" then this is correct.
否则,情况并非如此.相反,当您同时调用单个服务对象(即套接字,而不是 io_service)上的操作时,Asio 只是说行为是未定义的".
Otherwise, this isn't the case. Instead Asio just says behaviour is "undefined" when you call operations on a single service object (i.e. the socket, not the io_service) concurrently.
这似乎否定了首先允许多个线程调用 run 的意义.
That seems to negate the point of allowing multiple threads to call run in the first place.
除非处理需要相当长的时间.
Not unless processing takes a considerable amount of time.
第一段介绍Timer.5 示例 似乎是关于您的主题的一个很好的阐述.
The first paragraphs of the introduction of the Timer.5 sample seem like a good exposition about your topic.
要分离特定于请求的数据(缓冲区和端点),您需要一些会话概念.Asio 中的一种流行机制是绑定 shared_ptr
s 或 shared-from-this 会话类(boost bind 支持直接绑定到 boost::shared_ptr 实例).
To separate the request-specific data (buffer and endpoint) you want some notion of a session. A popular mechanism in Asio is either bound shared_ptr
s or a shared-from-this session class (boost bind supports binding to boost::shared_ptr instances directly).
为避免对 m_socket
成员的并发、非同步访问,您可以添加锁或使用上面链接的 Timer.5 示例中记录的 strand
方法.
To avoid concurrent, unsynchronized access to members of m_socket
you can either add locks or use the strand
approach as documented in the Timer.5 sample linked above.
Daytime.6 异步 UDP 日间服务器,修改为与许多服务 IO 线程一起使用.
Here for your enjoyment is the Daytime.6 asynchronous UDP daytime server, modified to work with many service IO threads.
请注意,从逻辑上讲,仍然只有一个 IO 线程(strand
),因此我们不会违反套接字类记录的线程安全性.
Note that, logically, there's still only a single IO thread (the strand
) so we don't violate the socket class's documented thread-safety.
但是,与官方示例不同,响应可能会乱序排队,具体取决于 udp_session::handle_request
中实际处理所花费的时间.
However, unlike the official sample, the responses may get queued out of order, depending on the time taken by the actual processing in udp_session::handle_request
.
注意
- 一个
udp_session
类来保存每个请求的缓冲区和远程端点 - 一个线程池,能够在多个内核上扩展实际处理(而不是 IO)的负载.
- a
udp_session
class to hold the buffers and remote endpoint per request - a pool of threads, which are able to scale the load of actual processing (not the IO) over multiple cores.
#include <ctime>
#include <iostream>
#include <string>
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
using namespace boost;
using asio::ip::udp;
using system::error_code;
std::string make_daytime_string()
{
using namespace std; // For time_t, time and ctime;
time_t now = time(0);
return ctime(&now);
}
class udp_server; // forward declaration
struct udp_session : enable_shared_from_this<udp_session> {
udp_session(udp_server* server) : server_(server) {}
void handle_request(const error_code& error);
void handle_sent(const error_code& ec, std::size_t) {
// here response has been sent
if (ec) {
std::cout << "Error sending response to " << remote_endpoint_ << ": " << ec.message() << "
";
}
}
udp::endpoint remote_endpoint_;
array<char, 100> recv_buffer_;
std::string message;
udp_server* server_;
};
class udp_server
{
typedef shared_ptr<udp_session> shared_session;
public:
udp_server(asio::io_service& io_service)
: socket_(io_service, udp::endpoint(udp::v4(), 1313)),
strand_(io_service)
{
receive_session();
}
private:
void receive_session()
{
// our session to hold the buffer + endpoint
auto session = make_shared<udp_session>(this);
socket_.async_receive_from(
asio::buffer(session->recv_buffer_),
session->remote_endpoint_,
strand_.wrap(
bind(&udp_server::handle_receive, this,
session, // keep-alive of buffer/endpoint
asio::placeholders::error,
asio::placeholders::bytes_transferred)));
}
void handle_receive(shared_session session, const error_code& ec, std::size_t /*bytes_transferred*/) {
// now, handle the current session on any available pool thread
socket_.get_io_service().post(bind(&udp_session::handle_request, session, ec));
// immediately accept new datagrams
receive_session();
}
void enqueue_response(shared_session const& session) {
socket_.async_send_to(asio::buffer(session->message), session->remote_endpoint_,
strand_.wrap(bind(&udp_session::handle_sent,
session, // keep-alive of buffer/endpoint
asio::placeholders::error,
asio::placeholders::bytes_transferred)));
}
udp::socket socket_;
asio::strand strand_;
friend struct udp_session;
};
void udp_session::handle_request(const error_code& error)
{
if (!error || error == asio::error::message_size)
{
message = make_daytime_string(); // let's assume this might be slow
// let the server coordinate actual IO
server_->enqueue_response(shared_from_this());
}
}
int main()
{
try {
asio::io_service io_service;
udp_server server(io_service);
thread_group group;
for (unsigned i = 0; i < thread::hardware_concurrency(); ++i)
group.create_thread(bind(&asio::io_service::run, ref(io_service)));
group.join_all();
}
catch (std::exception& e) {
std::cerr << e.what() << std::endl;
}
}
结束的想法
有趣的是,在大多数情况下,您会看到单线程版本的性能也一样,没有理由让设计复杂化.
Closing thoughts
Interestingly, in most cases you'll see the single-thread version performing just as well, and there's no reason to complicate the design.
或者,如果这确实是 CPU 密集型部分,您可以使用专用于 IO 的单线程 io_service
并使用老式工作池对请求进行后台处理.首先,这简化了设计,其次,这可能会提高 IO 任务的吞吐量,因为不再需要协调发布在链上的任务.
Alternatively, you can use a single-threaded io_service
dedicated to the IO and use an old fashioned worker pool to do the background processing of the requests if this is indeed the CPU intensive part. Firstly, this simplifies the design, secondly this might improve the throughput on the IO tasks because there is no more need to coordinate the tasks posted on the strand.
这篇关于boost::asio async_receive_from UDP 端点在线程之间共享?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:boost::asio async_receive_from UDP 端点在线程之间共享
基础教程推荐
- C++,'if' 表达式中的变量声明 2021-01-01
- 如何在 C++ 中处理或避免堆栈溢出 2022-01-01
- C++ 标准:取消引用 NULL 指针以获取引用? 2021-01-01
- 调用std::Package_TASK::Get_Future()时可能出现争用情况 2022-12-17
- 您如何将 CreateThread 用于属于类成员的函数? 2021-01-01
- 如何定义双括号/双迭代器运算符,类似于向量的向量? 2022-01-01
- 设计字符串本地化的最佳方法 2022-01-01
- C++ 程序在执行 std::string 分配时总是崩溃 2022-01-01
- 运算符重载的基本规则和习语是什么? 2022-10-31
- 什么是T&&(双与号)在 C++11 中是什么意思? 2022-11-04