不要按值传递大型消息有效负载
在某些情况下,运行时会创建从一个消息缓冲区传递到另一个消息缓冲区的每条消息的副本。 例如,concurrency::overwrite_buffer 类将它收到的每条消息的副本提供给每个目标。 使用消息传递函数(如 concurrency::send 和 concurrency::receive)将消息写入消息缓冲区并从消息缓冲区读取消息时,运行时还会创建消息数据的副本。 尽管此机制有助于消除并发写入共享数据的风险,但当消息负载相对较大时,它可能导致内存性能较差。
在传递具有较大负载的消息时,可以使用指针或引用来提高内存性能。 下面的示例将按值传递大型消息与将指针传递到同一消息类型进行比较。 该示例定义了两种代理类型(producer 和 consumer),它们作用于 message_data 对象。 该示例将生成者向使用者发送多个 message_data 对象所需的时间与生成者代理向使用者发送多个指向 message_data 对象的指针所需的时间进行比较。
// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>using namespace concurrency;
using namespace std;// Calls the provided work function and returns the number of milliseconds
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{__int64 begin = GetTickCount();f();return GetTickCount() - begin;
}// A message structure that contains large payload data.
struct message_data
{int id;string source;unsigned char binary_data[32768];
};// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:explicit producer(ITarget<T>& target, unsigned int message_count): _target(target), _message_count(message_count){}
protected:void run();private:// The target buffer to write to.ITarget<T>& _target;// The number of messages to send.unsigned int _message_count;
};// Template specialization for message_data.
template <>
void producer<message_data>::run()
{// Send a number of messages to the target buffer.while (_message_count > 0){message_data message;message.id = _message_count;message.source = "Application";send(_target, message);--_message_count;}// Set the agent to the finished state.done();
}// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{// Send a number of messages to the target buffer.while (_message_count > 0){message_data* message = new message_data;message->id = _message_count;message->source = "Application";send(_target, message);--_message_count;}// Set the agent to the finished state.done();
}// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:explicit consumer(ISource<T>& source, unsigned int message_count): _source(source), _message_count(message_count){}protected:void run();private:// The source buffer to read from.ISource<T>& _source;// The number of messages to receive.unsigned int _message_count;
};// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{// Receive a number of messages from the source buffer.while (_message_count > 0){message_data message = receive(_source);--_message_count;// TODO: Do something with the message. // ...}// Set the agent to the finished state.done();
}template <>
void consumer<message_data*>::run()
{// Receive a number of messages from the source buffer.while (_message_count > 0){message_data* message = receive(_source);--_message_count;// TODO: Do something with the message.// ...// Release the memory for the message.delete message; }// Set the agent to the finished state.done();
}int wmain()
{// The number of values for the producer agent to send.const unsigned int count = 10000;__int64 elapsed;// Run the producer and consumer agents.// This version uses message_data as the message payload type.wcout << L"Using message_data..." << endl;elapsed = time_call([count] {// A message buffer that is shared by the agents.unbounded_buffer<message_data> buffer;// Create and start the producer and consumer agents.producer<message_data> prod(buffer, count);consumer<message_data> cons(buffer, count);prod.start();cons.start();// Wait for the agents to finish.agent::wait(&prod);agent::wait(&cons);});wcout << L"took " << elapsed << L"ms." << endl;// Run the producer and consumer agents a second time.// This version uses message_data* as the message payload type.wcout << L"Using message_data*..." << endl;elapsed = time_call([count] {// A message buffer that is shared by the agents.unbounded_buffer<message_data*> buffer;// Create and start the producer and consumer agents.producer<message_data*> prod(buffer, count);consumer<message_data*> cons(buffer, count);prod.start();cons.start();// Wait for the agents to finish.agent::wait(&prod);agent::wait(&cons);});wcout << L"took " << elapsed << L"ms." << endl;
}
此示例产生以下示例输出:
Using message_data...
took 437ms.
Using message_data*...
took 47ms.
使用指针的版本性能更好,因为它消除了运行时创建从生成者传递到使用者的每个 message_data 对象的完整副本的要求。
不要在数据管道中执行细粒度工作
当数据管道执行的工作相当粗粒度时,代理库最有用。 例如,一个应用程序组件可能从文件或网络连接中读取数据,有时还会将该数据发送到另一个组件。 代理库用于传播消息的协议会导致消息传递机制比并行模式库 (PPL) 提供的任务并行构造具有更多的开销。 因此,请确保数据管道执行的工作足够长,可以抵消此开销。
尽管数据管道在任务是粗粒度时最有效,但数据管道的每个阶段都可以使用 PPL 构造(如任务组和并行算法)来执行更细粒度的工作。