Topic.php
6.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
<?php
namespace App\Common\TencentMQ;
class Topic
{
private $topic_name;
private $cmq_client;
private $encoding;
public function __construct($topic_name, $cmq_client, $encoding=false){
$this->topic_name = $topic_name;
$this->cmq_client = $cmq_client;
$this->encoding = $encoding ;
}
public function set_encoding($encoding){
$this->encoding = $encoding;
}
/*
* create topic
* @type topic_meta : TopicMeta
* @param topic_meta :
*/
public function create($topic_meta){
$params = array(
'topicName' => $this->topic_name,
'filterType'=>$topic_meta->filterType
);
if ( $topic_meta->maxMsgSize >0 ){
$params['maxMsgSize'] = $topic_meta->maxMsgSize;
}
$this->cmq_client->create_topic($params);
}
/*
* get attributes
*
* @return topic_meta :TopicMeta
*
*/
public function get_attributes(){
$params = array(
'topicName' => $this->topic_name,
);
$resp = $this->cmq_client->get_topic_attributes($params);
$topic_meta = new TopicMeta();
$this->__resp2meta($topic_meta, $resp);
return $topic_meta;
}
/*
* set attributes
*
* @type topic_meta :TopicMeta
* @param topic_meta :
*/
public function set_attributes($topic_meta){
$params = array(
'topicName' => $this->topic_name,
'maxMsgSize' => strval($topic_meta->maxMsgSize)
);
$this->cmq_client->set_topic_attributes($params);
}
/*
* delete topic
*/
public function delete(){
$params = array(
'topicName' => $this->topic_name
);
$this->cmq_client->delete_topic($params);
}
/*
* 推送消息 非批量
* @type message :string
* @param message
*
* @type vTagList :list
* @param vTagList 标签
*
* @return message handle
*/
public function publish_message($message ,$vTagList= null , $routingKey = null){
$params=array(
'topicName' => $this->topic_name,
'msgBody' => $message,
);
if( $routingKey != null){
$params['routingKey'] = $routingKey;
}
if ($vTagList != null && is_array($vTagList) && !empty($vTagList))
{
$n = 1 ;
foreach ($vTagList as $tag){
$key = 'msgTag.' . $n;
$params[$key]=$tag;
$n += 1 ;
}
}
$msgId = $this->cmq_client->publish_message($params);
return $msgId;
}
/*
* 批量推送消息
* @type vmessageList :list
* @param vmessageList:
*
* @type vtagList :list
* @param vtagList
*
* @return : return message handle list
*/
public function batch_publish_message($vmessageList, $vtagList = null ,$routingKey=null){
$params = array(
'topicName' => $this->topic_name,
);
if($routingKey !=null){
$params['routingKey'] = $routingKey;
}
$n = 1 ;
if( is_array($vmessageList) && !empty($vmessageList))
{
foreach ($vmessageList as $msg){
$key = 'msgBody.' . $n ;
if($this->encoding){
$params[$key] = base64_encode($msg);
}
else
{
$params[$key] = $msg;
}
$n += 1 ;
}
}
if ($vtagList != null && is_array($vtagList) && !empty($vtagList))
{
$n = 1 ;
foreach ($vtagList as $tag){
$key = 'msgTag.' . $n ;
$params[$key] = $tag ;
$n += 1 ;
}
}
$msgList = $this->cmq_client->batch_publish_message($params);
$retMessageList = array();
foreach ($msgList as $msg){
if(isset($msg['msgId'])){
$retmsgId = $msg['msgId'];
$retMessageList [] = $retmsgId;
}
}
return $retMessageList;
}
/* 列出Topic的Subscriptoin
@type topic_name :string
@param topic_name:
@type searchWord: string
@param searchWord: 订阅关键字
@type limit: int
@param limit: 最多返回的订阅数目
@type offset: string
@param offset: list_subscription的起始位置,上次list_subscription返回的next_offset
@rtype: tuple
@return: subscriptionURL的列表和下次list subscription的起始位置; 如果所有subscription都list出来,next_offset为"".
*/
public function list_subscription($searchWord ="", $limit = -1, $offset =""){
$params = array('topicName' => $this->topic_name);
if($searchWord != ""){
$params['searchWord'] = $searchWord;
}
if($limit != -1){
$params['limit'] = $limit;
}
if($offset != ""){
$params['offset'] = $offset;
}
$resp = $this->cmq_client->list_subscription($params);
if ($offset == ""){
$next_offset = count($resp['subscriptionList']);
}
else{
$next_offset = $offset + count($resp['subscriptionList']);
}
if($next_offset >= $resp['totalCount']){
$next_offset = "";
}
return array("totalCoult" => $resp['totalCount'],
"subscriptionList" =>$resp['subscriptionList'],
"next_offset" => $next_offset);
}
protected function __resp2meta($topic_meta, $resp){
if(isset($resp['maxMsgSize'])){
$topic_meta->maxMsgSize = $resp['maxMsgSize'];
}
if(isset($resp['msgRetentionSeconds'])){
$topic_meta->msgRetentionSeconds = $resp['msgRetentionSeconds'];
}
if(isset($resp['createTime'])){
$topic_meta->createTime = $resp['createTime'];
}
if(isset($resp['lastModifyTime'])){
$topic_meta->lastModifyTime = $resp['lastModifyTime'];
}
if(isset($resp['filterType'])){
$topic_meta->filterType = $resp['filterType'];
}
}
}
class TopicMeta
{
// default maxMsgSize 65536
// default msgRetentionSeconds 86400, one day
/* 主题属性
@note: 可修改
:: maxMsgSize 消息最大值
@note: 不可修改
:: msgRetentionSeconds 消息最长保存时间,默认为 一天
:: createTime 创建时间
:: lastModifyTime 上次修改时间
*/
public $maxMsgSize;
public $msgRetentionSeconds;
public $createTime;
public $lastModifyTime;
public function __construct(){
$this->maxMsgSize = 65536 ;
$this->msgRetentionSeconds= 86400;
$this->createTime = 0;
$this->lastModifyTime = 0;
$this->filterType = 1;
}
public function __toString(){
$info = array(
"maxMsgSize" => $this->maxMsgSize,
"msgRetentionSeconds" => $this->msgRetentionSeconds,
"createTime" => $this->createTime,
"lastModifyTime" => $this->lastModifyTime,
"filterType" => $this->filterType
);
return json_encode($info);
}
}