ZMQ之异步管家模式
创始人
2024-03-09 00:20:33
0

        上文那种实现管家模式的方法比较简单,client还是简单海盗模式中的,仅仅是用API重写了一下。我在测试机上运行了程序,处理10万条请求大约需要14秒的时间,这和代码也有一些关系,因为复制消息帧的时间浪费了CPU处理时间。但真正的问题在于,我们总是逐个循环进行处理(round-trip),即发送-接收-发送-接收……ZMQ内部禁用了TCP发包优化算法(Nagle's algorithm),但逐个处理循环还是比较浪费。

        理论归理论,还是需要由实践来检验。我们用一个简单的测试程序来看看逐个处理循环是否真的耗时。这个测试程序会发送一组消息,第一次它发一条收一条,第二次则一起发送再一起接收。两次结果应该是一样的,但速度截然不同。

        tripping: Round-trip demonstrator in C

//
//  Round-trip 模拟
//
//  本示例程序使用多线程的方式启动client、worker、以及代理,
//  当client处理完毕时会发送信号给主程序。
//
#include "czmq.h"static void
client_task (void *args, zctx_t *ctx, void *pipe)
{void *client = zsocket_new (ctx, ZMQ_DEALER);zmq_setsockopt (client, ZMQ_IDENTITY, "C", 1);zsocket_connect (client, "tcp://localhost:5555");printf ("开始测试...\n");zclock_sleep (100);int requests;int64_t start;printf ("同步 round-trip 测试...\n");start = zclock_time ();for (requests = 0; requests < 10000; requests++) {zstr_send (client, "hello");char *reply = zstr_recv (client);free (reply);}printf (" %d 次/秒\n",(1000 * 10000) / (int) (zclock_time () - start));printf ("异步 round-trip 测试...\n");start = zclock_time ();for (requests = 0; requests < 100000; requests++)zstr_send (client, "hello");for (requests = 0; requests < 100000; requests++) {char *reply = zstr_recv (client);free (reply);}printf (" %d 次/秒\n",(1000 * 100000) / (int) (zclock_time () - start));zstr_send (pipe, "完成");
}static void *
worker_task (void *args)
{zctx_t *ctx = zctx_new ();void *worker = zsocket_new (ctx, ZMQ_DEALER);zmq_setsockopt (worker, ZMQ_IDENTITY, "W", 1);zsocket_connect (worker, "tcp://localhost:5556");while (1) {zmsg_t *msg = zmsg_recv (worker);zmsg_send (&msg, worker);}zctx_destroy (&ctx);return NULL;
}static void *
broker_task (void *args)
{//  准备上下文和套接字zctx_t *ctx = zctx_new ();void *frontend = zsocket_new (ctx, ZMQ_ROUTER);void *backend = zsocket_new (ctx, ZMQ_ROUTER);zsocket_bind (frontend, "tcp://*:5555");zsocket_bind (backend,  "tcp://*:5556");//  初始化轮询对象zmq_pollitem_t items [] = {{ frontend, 0, ZMQ_POLLIN, 0 },{ backend,  0, ZMQ_POLLIN, 0 }};while (1) {int rc = zmq_poll (items, 2, -1);if (rc == -1)break;              //  中断if (items [0].revents & ZMQ_POLLIN) {zmsg_t *msg = zmsg_recv (frontend);zframe_t *address = zmsg_pop (msg);zframe_destroy (&address);zmsg_pushstr (msg, "W");zmsg_send (&msg, backend);}if (items [1].revents & ZMQ_POLLIN) {zmsg_t *msg = zmsg_recv (backend);zframe_t *address = zmsg_pop (msg);zframe_destroy (&address);zmsg_pushstr (msg, "C");zmsg_send (&msg, frontend);}}zctx_destroy (&ctx);return NULL;
}int main (void)
{//  创建线程zctx_t *ctx = zctx_new ();void *client = zthread_fork (ctx, client_task, NULL);zthread_new (ctx, worker_task, NULL);zthread_new (ctx, broker_task, NULL);//  等待client端管道的信号char *signal = zstr_recv (client);free (signal);zctx_destroy (&ctx);return 0;
}

        在我的开发环境中运行结果如下:

Setting up test...
Synchronous round-trip test...9057 calls/second
Asynchronous round-trip test...173010 calls/second

        需要注意的是client在运行开始会暂停一段时间,这是因为在向ROUTER套接字发送消息时,若指定标识的套接字没有连接,那么ROUTER会直接丢弃该消息。这个示例中我们没有使用LRU算法,所以当worker连接速度稍慢时就有可能丢失数据,影响测试结果。

        我们可以看到,逐个处理循环比异步处理要慢将近20倍,让我们把它应用到管家模式中去。

        首先,让我们修改client的API,添加独立的发送和接收方法:

mdcli_t *mdcli_new     (char *broker);
void     mdcli_destroy (mdcli_t **self_p);
int      mdcli_send    (mdcli_t *self, char *service, zmsg_t **request_p);
zmsg_t  *mdcli_recv    (mdcli_t *self);

        然后花很短的时间就能将同步的client API改造成异步的API:

        mdcliapi2: Majordomo asynchronous client API in C

/*  =====================================================================mdcliapi2.cMajordomo Protocol Client API (async version)Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.---------------------------------------------------------------------Copyright (c) 1991-2011 iMatix Corporation Copyright other contributors as noted in the AUTHORS file.This file is part of the ZeroMQ Guide: http://zguide.zeromq.orgThis is free software; you can redistribute it and/or modify it underthe terms of the GNU Lesser General Public License as published bythe Free Software Foundation; either version 3 of the License, or (atyour option) any later version.This software is distributed in the hope that it will be useful, butWITHOUT ANY WARRANTY; without even the implied warranty ofMERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNULesser General Public License for more details.You should have received a copy of the GNU Lesser General PublicLicense along with this program. If not, see.=====================================================================
*/#include "mdcliapi2.h"//  类结构
//  使用成员函数访问属性struct _mdcli_t {zctx_t *ctx;                //  上下文char *broker;void *client;               //  连接至代理的套接字int verbose;                //  在标准输出打印运行状态int timeout;                //  请求超时时间
};//  ---------------------------------------------------------------------
//  连接或重连代理void s_mdcli_connect_to_broker (mdcli_t *self)
{if (self->client)zsocket_destroy (self->ctx, self->client);self->client = zsocket_new (self->ctx, ZMQ_DEALER);zmq_connect (self->client, self->broker);if (self->verbose)zclock_log ("I: 正在连接代理 %s...", self->broker);
}//  ---------------------------------------------------------------------
//  构造函数mdcli_t *
mdcli_new (char *broker, int verbose)
{assert (broker);mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t));self->ctx = zctx_new ();self->broker = strdup (broker);self->verbose = verbose;self->timeout = 2500;           //  毫秒s_mdcli_connect_to_broker (self);return self;
}//  ---------------------------------------------------------------------
//  析构函数void
mdcli_destroy (mdcli_t **self_p)
{assert (self_p);if (*self_p) {mdcli_t *self = *self_p;zctx_destroy (&self->ctx);free (self->broker);free (self);*self_p = NULL;}
}//  ---------------------------------------------------------------------
//  设置请求超时时间void
mdcli_set_timeout (mdcli_t *self, int timeout)
{assert (self);self->timeout = timeout;
}//  ---------------------------------------------------------------------
//  发送请求给代理
//  取得请求消息的所有权,发送后销毁int
mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
{assert (self);assert (request_p);zmsg_t *request = *request_p;//  在消息顶部加入协议规定的帧//  Frame 0: empty (模拟REQ套接字的行为)//  Frame 1: "MDPCxy" (6个字节, MDP/Client x.y)//  Frame 2: Service name (看打印字符串)zmsg_pushstr (request, service);zmsg_pushstr (request, MDPC_CLIENT);zmsg_pushstr (request, "");if (self->verbose) {zclock_log ("I: 发送请求给 '%s' 服务:", service);zmsg_dump (request);}zmsg_send (&request, self->client);return 0;
}//  ---------------------------------------------------------------------
//  获取应答消息,若无则返回NULL;
//  该函数不会尝试从代理的崩溃中恢复,
//  因为我们没有记录那些未收到应答的请求,所以也无法重发。zmsg_t *
mdcli_recv (mdcli_t *self)
{assert (self);//  轮询套接字以获取应答zmq_pollitem_t items [] = { { self->client, 0, ZMQ_POLLIN, 0 } };int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);if (rc == -1)return NULL;            //  中断//  收到应答后进行处理if (items [0].revents & ZMQ_POLLIN) {zmsg_t *msg = zmsg_recv (self->client);if (self->verbose) {zclock_log ("I: received reply:");zmsg_dump (msg);}//  不要处理错误,直接报出assert (zmsg_size (msg) >= 4);zframe_t *empty = zmsg_pop (msg);assert (zframe_streq (empty, ""));zframe_destroy (&empty);zframe_t *header = zmsg_pop (msg);assert (zframe_streq (header, MDPC_CLIENT));zframe_destroy (&header);zframe_t *service = zmsg_pop (msg);zframe_destroy (&service);return msg;     //  Success}if (zctx_interrupted)printf ("W: 收到中断消息,正在中止client...\n");elseif (self->verbose)zclock_log ("W: 严重错误,放弃请求");return NULL;
}

        下面是对应的测试代码:

        mdclient2: Majordomo client application in C

//
//  异步管家模式 - client示例程序
//  使用mdcli API隐藏MDP协议的具体实现
//
//  直接编译源码,而不创建类库
#include "mdcliapi2.c"int main (int argc, char *argv [])
{int verbose = (argc > 1 && streq (argv [1], "-v"));mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);int count;for (count = 0; count < 100000; count++) {zmsg_t *request = zmsg_new ();zmsg_pushstr (request, "Hello world");mdcli_send (session, "echo", &request);}for (count = 0; count < 100000; count++) {zmsg_t *reply = mdcli_recv (session);if (reply)zmsg_destroy (&reply);elsebreak;              //  使用Ctrl-C中断}printf ("收到 %d 个应答\n", count);mdcli_destroy (&session);return 0;
}

        代理和worker的代码没有变,因为我们并没有改变MDP协议。经过对client的改造,我们可以明显看到速度的提升。如以下是同步状况下处理10万条请求的时间:

$ time mdclient
100000 requests/replies processedreal    0m14.088s
user    0m1.310s
sys     0m2.670s

        以下是异步请求的情况:

$ time mdclient2
100000 replies receivedreal    0m8.730s
user    0m0.920s
sys     0m1.550s

        让我们建立10个worker,看看效果如何:

$ time mdclient2
100000 replies receivedreal    0m3.863s
user    0m0.730s
sys     0m0.470s

        由于worker获得消息需要通过LRU队列机制,所以并不能做到完全的异步。但是,worker越多其效果也会越好。在我的测试机上,当worker的数量达到8个时,速度就不再提升了——四核处理器只能做这么多。但是,我们仍然获得了近四倍的速度提升,而改造过程只有几分钟而已。此外,代理其实还没有进行优化,它仍会复制消息,而没有实现零拷贝。不过,我们已经做到每秒处理2.5万次请求-应答,已经很不错了。

        当然,异步的管家模式也并不完美,有一个显著的缺点:它无法从代理的崩溃中恢复。可以看到mdcliapi2的代码中并没有恢复连接的代码,重新连接需要有以下几点作为前提:

                1、每个请求都做了编号,每次应答也含有相应的编号,这就需要修改协议,明确定义。

                2、client的API需要保留并跟踪所有已发送、但仍未收到应答的请求。

                3、如果代理发生崩溃,client会重发所有消息。

        可以看到,高可靠性往往和复杂度成正比,值得在管家模式中应用这一机制吗?这就要看应用场景了。如果是一个名称查询服务,每次会话会调用一次,那不需要应用这一机制;如果是一个位于前端的网页服务,有数千个客户端相连,那可能就需要了。

相关内容

热门资讯

政策护航,智能建造企业“出海”... 长沙晚报掌上长沙12月21日讯(全媒体记者 刘嘉)近日,长沙市智能建造产业链推进办公室印发了《关于推...
男子一家三口被发小抢劫杀害!律... 河南中牟男子一家三口被发小抢劫杀害案12月23日将开庭。 受害者家属梁先生称,2025年7月,他的...
尹锡悦辩称对妻子涉嫌受贿“不知... 当地时间12月21日,韩国“金建希特检组”称已完成对前总统尹锡悦及其妻子金建希的面对面讯问,将在一周...
原创 古... 如果你穿越回古代,又穷又病还没家人,会不会直接凉凉? 不要着急,说不定当时的朝廷会给你分房住、发米粮...
湖北一男子当街拦车砸玻璃,警方... 新京报记者 贺俊怡 编辑 罗伟伟 ▲新京报我们视频出品(ID:wevideo) 12月20日,湖北大...
企业眼中的“海南封关”:积极抓... “现在去海南能做些什么?” 近半年来,凯撒旅业(SZ000796)副总经理卢晓蕾频繁接到这样的咨询电...
湖北一男子当街拦车砸玻璃,警方... 来源:新京报 编辑:王伟 版权归原作者所有 如有侵权请及时联系
员工偷拍单位需要担责吗?被偷拍... 近日,互联网上出现“天津一医院女护士被偷拍”的视频,引发社会关注。12月19日,天津市公安局西青分局...
俄大使:俄罗斯不再将伯尔尼视为... 【俄罗斯驻瑞士大使谢尔盖·加尔莫宁向卫星通讯社表示,瑞士完全采取亲乌克兰的立场,俄罗斯不再将伯尔尼视...