123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580 |
- function hashSize(obj) {
- var size = 0, key;
- for (key in obj) {
- if (obj.hasOwnProperty(key)) size++;
- }
- return size;
- }
- function uuid() {
- //http://stackoverflow.com/questions/105034/how-to-create-a-guid-uuid-in-javascript
- return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) {
- var r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8);
- return v.toString(16);
- });
- }
- function string2Buffer(str) {
- var buf = new ArrayBuffer(str.length * 2); // 2 bytes for each char
- var bufView = new Int16Array(buf);
- for (var i = 0, strLen = str.length; i < strLen; i++) {
- bufView[i] = str.charCodeAt(i);
- }
- return bufView;
- }
- function buffer2String(buf) {
- return String.fromCharCode.apply(null, buf);
- }
- //First, checks if it isn't implemented yet.
- if (!String.prototype.format) {
- String.prototype.format = function () {
- var args = arguments;
- return this.replace(/{(\d+)}/g, function (match, number) {
- return typeof args[number] != 'undefined'
- ? args[number]
- : match
- ;
- });
- };
- }
- function inherits(ctor, superCtor) {
- ctor.super_ = superCtor;
- ctor.prototype = Object.create(superCtor.prototype, {
- constructor: {
- value: ctor,
- enumerable: false,
- writable: true,
- configurable: true
- }
- });
- };
- // /////////////////////////////////////////////////////////////////
- function Meta(meta) {
- this.status = null;
- this.method = "GET";
- this.url = "/";
- this.path = null;
- this.params = null;
- if (!meta || meta == "") return;
- var blocks = meta.split(" ");
- var method = blocks[0];
- if (Meta.HttpMethod.indexOf(method) == -1) {
- this.status = blocks[1];
- return;
- }
- this.url = blocks[1];
- this.decodeUrl(this.url);
- }
- Meta.HttpMethod = ["GET", "POST", "PUT", "DELETE", "HEAD", "OPTIONS"];
- Meta.HttpStatus = {
- "200": "OK",
- "201": "Created",
- "202": "Accepted",
- "204": "No Content",
- "206": "Partial Content",
- "301": "Moved Permanently",
- "304": "Not Modified",
- "400": "Bad Request",
- "401": "Unauthorized",
- "403": "Forbidden",
- "404": "Not Found",
- "405": "Method Not Allowed",
- "416": "Requested Range Not Satisfiable",
- "500": "Internal Server Error"
- };
- Meta.prototype.toString = function () {
- if (this.status) {
- var desc = Meta.HttpStatus[this.status];
- if (!desc) desc = "Unknown Status";
- return "HTTP/1.1 {0} {1}".format(this.status, desc);
- }
- return "{0} {1} HTTP/1.1".format(this.method, this.url);
- };
- Meta.prototype.getParam = function (key) {
- if (!this.params) {
- return undefined;
- }
- return this.params[key];
- };
- Meta.prototype.setParam = function (key, val) {
- if (!this.params) {
- this.params = {};
- }
- this.params[key] = val;
- };
- Meta.prototype.setUrl = function (url) {
- this.url = url;
- this.decodeUrl(url);
- }
- Meta.prototype.decodeUrl = function (cmdStr) {
- var idx = cmdStr.indexOf("?");
- if (idx < 0) {
- this.path = cmdStr;
- } else {
- this.path = cmdStr.substring(0, idx);
- }
- if (this.path.charAt(0) == '/') {
- this.path = this.path.substring(1);
- }
- if (idx < 0) return;
- var paramStr = cmdStr.substring(idx + 1);
- this.params = {};
- var kvs = paramStr.split("&");
- for (var i in kvs) {
- var kv = kvs[i];
- idx = kv.indexOf("=");
- if (idx < 0) {
- util.debug("omit: " + kv);
- continue;
- }
- var key = kv.substring(0, idx);
- var val = kv.substring(idx + 1);
- this.params[key] = val;
- }
- };
- //HTTP Message
- function Message(body) {
- this.meta = new Meta();
- this.head = {};
- this.setBody(body);
- }
- Message.HEARTBEAT = "heartbeat";
- Message.REMOTE_ADDR = "remote-addr";
- Message.ENCODING = "encoding";
- Message.CMD = "cmd";
- Message.BROKER = "broker";
- Message.TOPIC = "topic";
- Message.MQ = "mq";
- Message.ID = "id";
- Message.ACK = "ack";
- Message.SENDER = "sender";
- Message.RECVER = "recver";
- Message.ORIGIN_URL = "origin_url";
- Message.ORIGIN_ID = "rawid";
- Message.ORIGIN_STATUS = "reply_code"
- Message.prototype.getHead = function (key) {
- return this.head[key];
- };
- Message.prototype.setHead = function (key, val) {
- this.head[key] = val;
- };
- Message.prototype.removeHead = function (key) {
- delete this.head[key];
- };
- Message.prototype.getMq = function () {
- return this.getHead(Message.MQ);
- };
- Message.prototype.setMq = function (val) {
- this.setHead(Message.MQ, val);
- };
- Message.prototype.getId = function () {
- return this.getHead(Message.ID);
- };
- Message.prototype.setId = function (val) {
- this.setHead(Message.ID, val);
- };
- Message.prototype.getTopic = function () {
- return this.getHead(Message.TOPIC);
- };
- Message.prototype.setTopic = function (val) {
- this.setHead(Message.TOPIC, val);
- };
- Message.prototype.getEncoding = function () {
- return this.getHead(Message.ENCODING);
- };
- Message.prototype.setEncoding = function (val) {
- this.setHead(Message.ENCODING, val);
- };
- Message.prototype.isAck = function () {
- var ack = this.getHead(Message.ACK);
- if (!ack) return true;//default to true
- return ack == '1';
- };
- Message.prototype.setAck = function (val) {
- this.setHead(Message.ACK, val);
- };
- Message.prototype.getCmd = function () {
- return this.getHead(Message.CMD);
- };
- Message.prototype.setCmd = function (val) {
- this.setHead(Message.CMD, val);
- };
- Message.prototype.getSender = function () {
- return this.getHead(Message.SENDER);
- };
- Message.prototype.setSender = function (val) {
- this.setHead(Message.SENDER, val);
- };
- Message.prototype.getRecver = function () {
- return this.getHead(Message.RECVER);
- };
- Message.prototype.setRecver = function (val) {
- this.setHead(Message.RECVER, val);
- };
- Message.prototype.getOriginUrl = function () {
- return this.getHead(Message.ORIGIN_URL);
- };
- Message.prototype.setOriginUrl = function (val) {
- this.setHead(Message.ORIGIN_URL, val);
- };
- Message.prototype.getOriginStatus = function () {
- return this.getHead(Message.ORIGIN_STATUS);
- };
- Message.prototype.setOriginStatus = function (val) {
- this.setHead(Message.ORIGIN_STATUS, val);
- };
- Message.prototype.getOriginId = function () {
- return this.getHead(Message.ORIGIN_ID);
- };
- Message.prototype.setOriginId = function (val) {
- this.setHead(Message.ORIGIN_ID, val);
- };
- Message.prototype.getPath = function () {
- return this.meta.path;
- };
- Message.prototype.getUrl = function () {
- return this.meta.url;
- };
- Message.prototype.setUrl = function (url) {
- this.meta.status = null;
- return this.meta.setUrl(url);
- };
- Message.prototype.getStatus = function () {
- return this.meta.status;
- };
- Message.prototype.setStatus = function (val) {
- this.meta.status = val;
- };
- Message.prototype.getBodyString = function () {
- if (!this.body) return null;
- return buffer2String(this.body);
- };
- Message.prototype.getBody = function () {
- if (!this.body) return null;
- return this.body;
- };
- Message.prototype.setBody = function (val) {
- if (val === undefined) return;
- if (val instanceof Int16Array) {
- this.body = val;
- } else {
- this.body = string2Buffer(val);
- }
- this.setHead('content-length', this.body.length);
- };
- Message.prototype.setJsonBody = function (json) {
- this.setBody(json);
- this.setHead('content-type', 'application/json');
- }
- Message.prototype.setBodyFormat = function (format) {
- var args = Array.prototype.slice.call(arguments, 1);
- var body = format.replace(/{(\d+)}/g, function (match, number) {
- return typeof args[number] != 'undefined'
- ? args[number]
- : match
- ;
- });
- this.setBody(body);
- };
- Message.prototype.isStatus200 = function () {
- return "200" == this.getStatus();
- };
- Message.prototype.isStatus404 = function () {
- return "404" == this.getStatus();
- };
- Message.prototype.isStatus500 = function () {
- return "500" == this.getStatus();
- };
- Message.prototype.toString = function () {
- var lines = new Array();
- lines.push("{0}".format(this.meta.toString()));
- for (var key in this.head) {
- lines.push("{0}: {1}".format(key, this.head[key]));
- }
- var bodyLen = 0;
- if (this.body) {
- bodyLen = this.body.length;
- }
- var lenKey = "content-length";
- if (!(lenKey in this.head)) {
- lines.push("{0}: {1}".format(lenKey, bodyLen));
- }
- var bodyString = "\r\n";
- if (bodyLen > 0) {
- bodyString += buffer2String(this.body);
- }
- lines.push(bodyString);
- return lines.join("\r\n");
- };
- Message.parse = function (str) {
- var blocks = str.split("\r\n");
- var lines = [];
- for (var i in blocks) {
- var line = blocks[i];
- if (line == '') continue;
- lines.push(line);
- }
- var lenKey = "content-length";
- var lenVal = 0;
- var msg = new Message();
- msg.meta = new Meta(lines[0]);
- for (var i = 1; i < lines.length; i++) {
- var line = lines[i];
- if (i == lines.length - 1) {
- if (lenVal > 0) {
- msg.setBody(line);
- continue;
- }
- }
- var p = line.indexOf(":");
- if (p == -1) continue;
- var key = line.substring(0, p).trim().toLowerCase();
- var val = line.substring(p + 1).trim();
- if (key == lenKey) {
- lenVal = val;
- }
- msg.setHead(key, val);
- }
- return msg;
- };
- function Ticket(reqMsg, callback) {
- this.id = uuid();
- this.request = reqMsg;
- this.response = null;
- this.callback = callback;
- reqMsg.setId(this.id);
- }
- var WebSocket = window.WebSocket;
- if (!WebSocket) {
- WebSocket = window.MozWebSocket;
- }
- function MessageClient(address) {
- this.address = address;
- this.autoReconnect = true;
- this.reconnectInterval = 3000;
- this.ticketTable = {};
- }
- MessageClient.prototype.connect = function (connectedHandler) {
- console.log("Trying to connect to " + this.address);
- this.socket = new WebSocket(this.address);
- var client = this;
- this.socket.onopen = function (event) {
- console.log("Connected to " + client.address);
- if (connectedHandler) {
- connectedHandler(event);
- }
- client.heartbeatInterval = setInterval(function () {
- var msg = new Message();
- msg.setCmd(Message.HEARTBEAT);
- client.invokeAsync(msg);
- }, 300 * 1000);
- };
- this.socket.onclose = function (event) {
- clearInterval(client.heartbeatInterval);
- setTimeout(function () {
- try {
- client.connect(connectedHandler);
- } catch (e) {
- }//ignore
- }, client.reconnectInterval);
- };
- this.socket.onmessage = function (event) {
- var msg = Message.parse(event.data);
- var msgid = msg.getId();
- var ticket = client.ticketTable[msgid];
- if (ticket) {
- ticket.response = msg;
- if (ticket.callback) {
- ticket.callback(msg);
- }
- delete client.ticketTable[msgid];
- } else {
- console.log("Warn: drop message\n" + msg.toString());
- }
- }
- this.socket.onerror = function (data) {
- console.log("Error: " + data);
- }
- }
- MessageClient.prototype.invokeAsync = function (msg, callback) {
- if (this.socket.readyState != WebSocket.OPEN) {
- console.log("socket is not open, invalid");
- return;
- }
- if (callback) {
- var ticket = new Ticket(msg, callback);
- this.ticketTable[ticket.id] = ticket;
- }
- this.socket.send(msg);
- };
- function Proto() {
- }
- Proto.Produce = "produce";
- Proto.Consume = "consume";
- Proto.Route = "route";
- Proto.Heartbeat = "heartbeat";
- Proto.Admin = "admin";
- Proto.CreateMQ = "create_mq";
- function MqMode() {
- }
- MqMode.MQ = 1 << 0;
- MqMode.PubSub = 1 << 1;
- MqMode.Memory = 1 << 2;
- var Broker = MessageClient;
- //define more brokers
- function MqAdmin(broker, mq) {
- this.broker = broker;
- this.mq = mq;
- this.mode = 0;
- var args = Array.prototype.slice.call(arguments, 2);
- for (var i in args) {
- this.mode |= args[i];
- }
- }
- MqAdmin.prototype.createMq = function (callback) {
- var params = {};
- params["mq_name"] = this.mq;
- params["mq_mode"] = "" + this.mode;
- var msg = new Message();
- msg.setCmd(Proto.CreateMQ);
- msg.setHead("mq_name", this.mq);
- msg.setHead("mq_mode", "" + this.mode);
- this.broker.invokeAsync(msg, callback);
- };
- function Producer(broker, mq) {
- MqAdmin.call(this, broker, mq);
- }
- inherits(Producer, MqAdmin)
- Producer.prototype.sendAsync = function (msg, callback) {
- msg.setCmd(Proto.Produce);
- msg.setMq(this.mq);
- this.broker.invokeAsync(msg, callback);
- };
- function Consumer(broker, mq) {
- MqAdmin.call(this, broker, mq);
- }
- inherits(Consumer, MqAdmin);
- Consumer.prototype.take = function (callback) {
- var msg = new Message();
- msg.setCmd(Proto.Consume);
- msg.setMq(this.mq);
- if (this.topic) msg.setTopic(this.topic);
- var consumer = this;
- this.broker.invokeAsync(msg, function (res) {
- if (res.isStatus404()) {
- consumer.createMq(function (res) {
- if (res.isStatus200()) {
- console.log(consumer.mq + " created");
- }
- consumer.take(callback);
- });
- return;
- }
- if (res.isStatus200()) {
- var originUrl = res.getOriginUrl();
- var id = res.getOriginId();
- res.removeHead(Message.ORIGIN_ID);
- if (originUrl == null) {
- originUrl = "/";
- } else {
- res.removeHead(Message.ORIGIN_URL);
- }
- res.setId(id);
- res.setUrl(originUrl);
- try {
- callback(res);
- } catch (error) {
- console.log(error);
- }
- }
- return consumer.take(callback);
- });
- };
- Consumer.prototype.route = function (msg) {
- msg.setCmd(Proto.Route);
- msg.setAck(false);
- this.broker.invokeAsync(msg);
- };
|