QMCS消息服务是千米开放平台(QMOP)为了提高应用的API调用效率而推出的一种主动推送服务(以下简称QMCS),基于QMCS,应用获取千米的数据不需要不停的轮询API,仅需要在收到QMCS主动推送的消息时调用API获取数据即可,大大提高了API的调用效率并且节省了应用的API流量包。
推送的内容包括:E生活话费、水电煤、固话宽带、流量、加油卡、银行转账、信用卡还款等充值订单充值状态变更消息;火车票、汽车票、飞机票等票务订单预订状态变更消息;新零售会员、商品、交易、退款等消息。
那么如何使用QMCS呢?下面将基于SDK方式接受消息和接口方式接受消息做详细说明:
登录千米开放平台ISV控制台,在"我的应用-消息服务-订阅消息"页面,选择需要订阅的消息类型,点击消息后面的"订阅"即可。
消息订阅成功以后,可以在"我的订阅"中看到已经成功订阅的消息。如果需要取消消息的订阅,直接点击"取消订阅"即可。消息取消以后,应用将不会接受到该主题的消息推送。
点击消息主题名称,可以查看每个消息返回的详细字段信息。
调用qianmi.qmcs.user.permit接口给用户(E生活直销商,新零售商家)开通,可以选择只给用户开通部分消息主题,也可以全部开通,具体请看该API的详细入参说明。
备注:应用订阅消息以后,并且为用户开通了消息,就可以获取消息内容进行处理了,获取消息有两种方式:通过SDK方式和API接口方式,下面将对这两种方式的使用情况详细说明:
通过SDK接受消息目前仅支持JAVA和.NET语言,其他语言建议采用API接受消息。通过SDK接受消息,只需要关注接受消息后的业务处理,不需要关心消息重发,确认,长连接的心跳,重连接等操作,SDK内部会自动处理好这一切,开放平台推荐使用SDK接受消息。
public interface MessageHandler { /** * 消息通道客户端收到消息后,会回调该方法处理具体的业务,处理结果可以通过以下两种方式来表述: * 1.抛出异常或设置status.fail()表明消息处理失败,需要消息通道服务端重发 * 2.不抛出异常,也没有设置status信息,则表明消息处理成功,消息通道服务端不会再投递此消息 * * @param message 消息内容 * @param status 处理结果,如果调用status.fail(),通道将会择机重发消息;否则,通道认为消息处理成功 * @throws Exception 消息处理失败,消息通道将会择机重发消息 */ public void onMessage(Message message, MessageStatus status) throws Exception; }
//创建链接客户端,使用默认分组 QmcsClient client = new QmcsClient(appkey,appSecret,"default"); //消息处理 client.setMessageHandler(new MessageHandler(){ @Override public void onMessage(Message message, MessageStatus status) throws Exception { try { //以下仅为示例代码,实际业务处理根据自身需要开发 System.out.println(message.getTopic()); System.out.println(message.getContent()); } catch (Exception e){ e.printStackTrace(); status.fail();//消息处理失败,需要服务端重新推送 } } }); client.connect("ws://mc.api.qianmi.com");//发起连接请求
备注: 采用Java main方法在Eclipse或idea里面运行上面的代码测试时,需要让线程保持一段时间,请在client.connect()后面加上Thread.sleep或者System.in.read让main线程暂时不结束,以便观察消息的实时接收情况,否则main线程结束后,QMCS长连接也会跟着断开。如果是在web服务器上运行上面的代码,则不用在 client.connect()后面加任何线程等待代码,也不需要在外面包一层while(true)循环,因为web服务器上的主线程只要服务器不关闭都是不会结束的,QMCS的长连接会一直保持。
.NET-SDK代码示例//创建链接客户端,使用默认分组 QmcsClient client = new QmcsClient(appkey,appSecret,"default"); //消息处理 client.OnMessage += (s,e) => { try { //以下仅为示例代码,实际业务处理根据自身需要开发 Console.WriteLine(e.Message.Topic); Console.WriteLine(e.Message.Content); } catch (Exception exp) { Console.WriteLine(exp.StackTrace); e.Fail(); //消息处理失败,需要服务端重新推送 } }; client.Connect("ws://mc.api.qianmi.com");//发起连接请求
备注: 采用C# Main方法在VS控制台工程里面运行上面的代码测试时,请在client.Connect后面加上Console.Read()或 Thread.Sleep让main线程暂时不结束,以便观察消息的实时接收情况,否则Main线程结束后,QMCS长连接也会跟着断开。如果是在IIS服务器或C#应用程序里面运行上面的代码,则不用在client.Connect后面加任何线程等待的代码,也不需要在外面包一层while(true)循环,只要保持IIS服务器或C#应用程序不关闭,QMCS的长连接会一直保持。
通过API接受消息由于很多语言无法保障多线程和长连接处理,或者处理起来非常不方便,比如PHP,Python等,这些语言官方暂时未提供SDK接受消息方式,可以通过API方式来消费和确认消息,下面将对如何使用API方式获取消息做详细说明:
OpenClient client = new DefaultOpenClient(url,appkey,appSecret); do { Integer quantity = 100; String groupName = "default"; QmcsMessagesConsumeResponse rsp = null; do { QmcsMessagesConsumeRequest req = new QmcsMessagesConsumeRequest(); req.setQuantity(quantity); req.setGroupName(groupName); rsp = client.execute(req); if (rsp.isSuccess() && rsp.getQmcs_messages() != null) { for (QmcsMessage msg : rsp.getQmcs_messages()) { // 处理消息,这里仅为示例代码,实际业务根据自身需要开发 System.out.println(msg.getContent()); System.out.println(msg.getTopic()); // 确认消费状态 QmcsMessagesConfirmRequest cReq = new QmcsMessagesConfirmRequest(); cReq.setSMessageIds(msg.getId()); QmcsMessagesConfirmResponse cRsp = client.execute(cReq); System.out.println(cRsp.isSuccess()); } } System.out.println(rsp.getBody()); } while (rsp != null && rsp.isSuccess() && rsp.getQmcs_messages() != null ); Thread.sleep(5000L); //根据自身服务器处理能力合理设置轮训时间 } while (true);
IOpenClient client = new DefaultOpenClient(url,appkey,appSecret); do { int quantity = 100; string groupName = "default"; QmcsMessagesConsumeResponse rsp = null; do { QmcsMessagesConsumeRequest req = new QmcsMessagesConsumeRequest(); req.setQuantity(quantity); req.setGroupName(groupName); rsp = client.execute(req); if (!rsp.IsError && rsp.QmcsMessages != null) { foreach (QmcsMessage msg in rsp.QmcsMessages) { // 处理消息,这里仅为示例代码,实际业务根据自身需要开发 Console.WriteLine(msg.Topic); Console.WriteLine(msg.Content); // 确认消费状态 QmcsMessagesConfirmRequest cReq = new QmcsMessagesConfirmRequest(); cReq.setSMessageIds(msg.Id); QmcsMessagesConfirmResponse cRsp = client.execute(cReq); Console.WriteLine(cRsp.IsError); } } Console.WriteLine(rsp.Body); } while (rsp != null && !rsp.IsError && rsp.QmcsMessages != null ); Thread.sleep(new TimeSpan(0, 0, 5)); //根据自身服务器处理能力合理设置轮训时间 } while (true);
备注:通过API拉取消息的平均RT在网络好的情况下能达到100毫秒之内,请根据自身业务情况,以及自身服务器处理消息的能力,合理设置消息消费接口的轮训时间,如果设置轮训时间过短,可能会经常性的获取消息列表为空,产生很多无谓的请求,不仅浪费了服务端的资源,还浪费了应用自身的API流量包。
消息分组说明一般在线订购应用,如果用户很多的情况,应用部署在多台服务器上面,组成了集群来接受消息,或者需要对当前用户进行消息隔离,分别处理,都可以通过设置分组的方式通过多连接来获取消息。
如果消息通过SDK主动推送方式获取,多连接有两种方式:1.创建多个用户分组,为每个分组创建一个连接通道;2.为同一个分组创建多个连接通道(同一应用使用同一分组最多可以创建5个连接通道)。
为了方便ISV解析消息,下面对获取到的完整的消息内容示例进行数据解析,以E生活充值订单状态变更消息举例:消息topic为"qianmi_elife_rechargeStateChange"。
{ "id":"MU20150417111623368", "topic":"qianmi_elife_rechargeStateChange", "pub_app_key":"10000000", "user_id":"A854000", "pub_time":"2015-04-17 11:16:23", "content":"{\"recharge_state\": \"1\", \"tid\": \"S00022112544\", \"timestamp\": \"2015-04-17 11:16:23\", \"user_id\": \"A854000\"}", }
消息体字段说明 | |||
名称 | 示例 | 描述 | |
id | MU20150417111623368 | 消息ID | |
topic | qianmi_elife_rechargeStateChange | 消息类型topic | |
pub_app_key | 10000000 | 消息所属appKey | |
user_id | A854000 | 消息所属的用户编号 | |
pub_time | 2015-04-17 11:16:23 | 消息发布时间 | |
content | {\"recharge_state\": \"1\", \"tid\": \"S00022112544\", \"timestamp\": \"2015-04-17 11:16:23\", \"user_id\": \"A854000\"} | 消息业务内容,不同topic请参考消息文档说明 |
QMCS是千米开放平台推出的一种消息主动推送服务,之所以推出QMCS,典型的场景如下:
1:E生活充值缴费订单充值,在调用支付接口以后,由于充值订单的特殊性,例如话费充值订单成功到账有一定的时差,并不是立即返回到账的,需要供货商回调千米以后订单充值状态才会变更(一般在1分钟内),在消息服务推出之前开发者只能定时轮询订单详情接口,这种方式有很大的局限性:商家不知道订单何时成功到账,只能不停轮询订单接口,并且只能单个订单处理,在订单量很大的情况下,不仅浪费了流量包,而且满足不了业务处理需要的,消息服务推出以后,如果开发者使用JAVA和C#等语言就可以使用SDK接受主动推送的消息,其他语言可以基于API定时轮询消费消息接口 qianmi.qmcs.messages.consume,根据消息的TOPIC类型,解析消息的内容,更新相应的订单充值状态。
2:新零售商品上下架通知、订单状态变更通知消息等,通过消息服务推送,可以获取实时的数据变更消息,不仅可以减少减轻千米开放平台服务器压力,而且可以减少应用自身的API流量包消耗。
2:什么是分组,是否需要添加分组?消息分组是进行消息隔离的手段,同一个分组内用户的消息只会通过分组获取信息,同一个分组支持多连接,随机发送到组内的某一个连接上面,例如在线订购应用,如果用户需要对不同用户的消息进行区别对待,比如,优先保证VIP缴费用户,然后在保证免费用户,就可以通过不同的分组进行消费消息,每个应用最多只能建50个自定义分组,每个分组用户数量不限,某个授权用户只能属于某一个分组,新的分组建立将会导致旧的分组失效。
E生活商家后台应用如果存在多直销商共用一个appKey情况,建议各直销商为自己建一个单独的分组,分组名称建议用各直销商A编号,以免消息被其他用户消费到。
3:什么是多连接,如何建立多连接?多连接收消息是指同对一分组,ISV服务器与千米开放平台的消息服务器建立多个连接来收消息。多链接是对同一个分组而言,消息在下发时随机选择从分组内的多个连接中选择一个连接下发消息。多链接有的随机下发消息的功能,可以用同一分组多连接来实现集群,负载功能。
建立多链接只需要用相同的代码重新启动一个QmcsClient实例。可在同一个ISV服务器上,也可在不同的ISV服务器上,建立同一个分组的多链接。
4:在什么情况下需要建立多连接?如果消息量很多,ISV客户端在单负载情况下处理不了堆积的消息,就可以建立多连接,一般情况下,单个分组是不需要建立多个连接的,单个连接就能把网卡泡满,消息服务的多连接,一般是应用在多个分组的情况下,或者做多负载集群部署的情况下。
5:如果用户Token过期,消息会不会推送?消息推送是有2个条件的,1:授权用户的accessToken在有效期内,2:用户有没有开通该主题的消息,只有2个条件都满足的情况下消息才会推送,如果accessToken过期一个月后还没有进行刷新,那么消息开通的关系将会被删除。如果在一个月内重新授权,那么就不需要为这个用户再次开通消息服务。
6:消息服务有延时吗?一般来说,开通消息接受功能以后1S之内便生效,使用中,如果消息产生,基本不会超过1秒就会收到消息,如果消息发送堆积或者程序处理不及时,就会有消息延迟。取消接受消息功能1秒内生效,已经产生的消息可以继续消费,新的消息将不会推送。
7:消息重新推送是什么场景?一般来说,通过SDK获取消息时,如果客户端断开了与服务端的连接,服务端的消息就会堆积,等应用重新连接以后,服务端会把已经堆积的消息按时间顺序推送给客户端,一条消息从发布之日起,服务端只会保留3天,如果客户端一直不接受消息,超过3天,服务端会自动清除,对于正常连接的客户端,如果消息处理失败,即调用status.fail(),那么服务端会隔10分钟进行第一次重新推送,如果一直处理失败,服务端会定时每隔10分钟重新推送,直到消息被清除。
对于API方式获取消息来说,如果客户端在获取消息处理以后不调用消息确认接口进行确认,那么服务端会每隔10分钟重新推送一次消息,直至消息被确认消费或者过期被清除。
8:消息断开和心跳检测?客户端如果需要断开与服务端的连接,则需:QmcsClient.close(),判断心跳检测是否正常,则需:QmcsClient.isOnLine()