connection.js 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. /*!
  2. * Module dependencies.
  3. */
  4. var MongooseConnection = require('../../connection');
  5. var mongo = require('mongodb');
  6. var Db = mongo.Db;
  7. var Server = mongo.Server;
  8. var Mongos = mongo.Mongos;
  9. var STATES = require('../../connectionstate');
  10. var ReplSetServers = mongo.ReplSet;
  11. var DisconnectedError = require('../../error/disconnected');
  12. /**
  13. * A [node-mongodb-native](https://github.com/mongodb/node-mongodb-native) connection implementation.
  14. *
  15. * @inherits Connection
  16. * @api private
  17. */
  18. function NativeConnection() {
  19. MongooseConnection.apply(this, arguments);
  20. this._listening = false;
  21. }
  22. /**
  23. * Expose the possible connection states.
  24. * @api public
  25. */
  26. NativeConnection.STATES = STATES;
  27. /*!
  28. * Inherits from Connection.
  29. */
  30. NativeConnection.prototype.__proto__ = MongooseConnection.prototype;
  31. /**
  32. * Opens the connection to MongoDB.
  33. *
  34. * @param {Function} fn
  35. * @return {Connection} this
  36. * @api private
  37. */
  38. NativeConnection.prototype.doOpen = function(fn) {
  39. var _this = this;
  40. var server = new Server(this.host, this.port, this.options.server);
  41. if (this.options && this.options.mongos) {
  42. var mongos = new Mongos([server], this.options.mongos);
  43. this.db = new Db(this.name, mongos, this.options.db);
  44. } else {
  45. this.db = new Db(this.name, server, this.options.db);
  46. }
  47. this.db.open(function(err) {
  48. listen(_this);
  49. if (!mongos) {
  50. server.s.server.on('error', function(error) {
  51. if (/after \d+ attempts/.test(error.message)) {
  52. _this.emit('error', new DisconnectedError(server.s.server.name));
  53. }
  54. });
  55. }
  56. if (err) return fn(err);
  57. fn();
  58. });
  59. return this;
  60. };
  61. /**
  62. * Switches to a different database using the same connection pool.
  63. *
  64. * Returns a new connection object, with the new db.
  65. *
  66. * @param {String} name The database name
  67. * @return {Connection} New Connection Object
  68. * @api public
  69. */
  70. NativeConnection.prototype.useDb = function(name) {
  71. // we have to manually copy all of the attributes...
  72. var newConn = new this.constructor();
  73. newConn.name = name;
  74. newConn.base = this.base;
  75. newConn.collections = {};
  76. newConn.models = {};
  77. newConn.replica = this.replica;
  78. newConn.hosts = this.hosts;
  79. newConn.host = this.host;
  80. newConn.port = this.port;
  81. newConn.user = this.user;
  82. newConn.pass = this.pass;
  83. newConn.options = this.options;
  84. newConn._readyState = this._readyState;
  85. newConn._closeCalled = this._closeCalled;
  86. newConn._hasOpened = this._hasOpened;
  87. newConn._listening = false;
  88. // First, when we create another db object, we are not guaranteed to have a
  89. // db object to work with. So, in the case where we have a db object and it
  90. // is connected, we can just proceed with setting everything up. However, if
  91. // we do not have a db or the state is not connected, then we need to wait on
  92. // the 'open' event of the connection before doing the rest of the setup
  93. // the 'connected' event is the first time we'll have access to the db object
  94. var _this = this;
  95. if (this.db && this._readyState === STATES.connected) {
  96. wireup();
  97. } else {
  98. this.once('connected', wireup);
  99. }
  100. function wireup() {
  101. newConn.db = _this.db.db(name);
  102. newConn.onOpen();
  103. // setup the events appropriately
  104. listen(newConn);
  105. }
  106. newConn.name = name;
  107. // push onto the otherDbs stack, this is used when state changes
  108. this.otherDbs.push(newConn);
  109. newConn.otherDbs.push(this);
  110. return newConn;
  111. };
  112. /*!
  113. * Register listeners for important events and bubble appropriately.
  114. */
  115. function listen(conn) {
  116. if (conn.db._listening) {
  117. return;
  118. }
  119. conn.db._listening = true;
  120. conn.db.on('close', function() {
  121. if (conn._closeCalled) return;
  122. // the driver never emits an `open` event. auto_reconnect still
  123. // emits a `close` event but since we never get another
  124. // `open` we can't emit close
  125. if (conn.db.serverConfig.autoReconnect) {
  126. conn.readyState = STATES.disconnected;
  127. conn.emit('close');
  128. return;
  129. }
  130. conn.onClose();
  131. });
  132. conn.db.on('error', function(err) {
  133. conn.emit('error', err);
  134. });
  135. conn.db.on('reconnect', function() {
  136. conn.readyState = STATES.connected;
  137. conn.emit('reconnected');
  138. conn.onOpen();
  139. });
  140. conn.db.on('timeout', function(err) {
  141. var error = new Error(err && err.err || 'connection timeout');
  142. conn.emit('error', error);
  143. });
  144. conn.db.on('open', function(err, db) {
  145. if (STATES.disconnected === conn.readyState && db && db.databaseName) {
  146. conn.readyState = STATES.connected;
  147. conn.emit('reconnected');
  148. }
  149. });
  150. conn.db.on('parseError', function(err) {
  151. conn.emit('parseError', err);
  152. });
  153. }
  154. /**
  155. * Opens a connection to a MongoDB ReplicaSet.
  156. *
  157. * See description of [doOpen](#NativeConnection-doOpen) for server options. In this case `options.replset` is also passed to ReplSetServers.
  158. *
  159. * @param {Function} fn
  160. * @api private
  161. * @return {Connection} this
  162. */
  163. NativeConnection.prototype.doOpenSet = function(fn) {
  164. var servers = [],
  165. _this = this;
  166. this.hosts.forEach(function(server) {
  167. var host = server.host || server.ipc;
  168. var port = server.port || 27017;
  169. servers.push(new Server(host, port, _this.options.server));
  170. });
  171. var server = this.options.mongos
  172. ? new Mongos(servers, this.options.mongos)
  173. : new ReplSetServers(servers, this.options.replset || this.options.replSet);
  174. this.db = new Db(this.name, server, this.options.db);
  175. this.db.on('fullsetup', function() {
  176. _this.emit('fullsetup');
  177. });
  178. this.db.on('all', function() {
  179. _this.emit('all');
  180. });
  181. this.db.open(function(err) {
  182. if (err) return fn(err);
  183. fn();
  184. listen(_this);
  185. });
  186. return this;
  187. };
  188. /**
  189. * Closes the connection
  190. *
  191. * @param {Function} fn
  192. * @return {Connection} this
  193. * @api private
  194. */
  195. NativeConnection.prototype.doClose = function(fn) {
  196. this.db.close(fn);
  197. return this;
  198. };
  199. /**
  200. * Prepares default connection options for the node-mongodb-native driver.
  201. *
  202. * _NOTE: `passed` options take precedence over connection string options._
  203. *
  204. * @param {Object} passed options that were passed directly during connection
  205. * @param {Object} [connStrOptions] options that were passed in the connection string
  206. * @api private
  207. */
  208. NativeConnection.prototype.parseOptions = function(passed, connStrOpts) {
  209. var o = passed || {};
  210. o.db || (o.db = {});
  211. o.auth || (o.auth = {});
  212. o.server || (o.server = {});
  213. o.replset || (o.replset = o.replSet) || (o.replset = {});
  214. o.server.socketOptions || (o.server.socketOptions = {});
  215. o.replset.socketOptions || (o.replset.socketOptions = {});
  216. o.mongos || (o.mongos = (connStrOpts && connStrOpts.mongos));
  217. (o.mongos === true) && (o.mongos = {});
  218. var opts = connStrOpts || {};
  219. Object.keys(opts).forEach(function(name) {
  220. switch (name) {
  221. case 'ssl':
  222. o.server.ssl = opts.ssl;
  223. o.replset.ssl = opts.ssl;
  224. o.mongos && (o.mongos.ssl = opts.ssl);
  225. break;
  226. case 'poolSize':
  227. if (typeof o.server[name] === 'undefined') {
  228. o.server[name] = o.replset[name] = opts[name];
  229. }
  230. break;
  231. case 'slaveOk':
  232. if (typeof o.server.slave_ok === 'undefined') {
  233. o.server.slave_ok = opts[name];
  234. }
  235. break;
  236. case 'autoReconnect':
  237. if (typeof o.server.auto_reconnect === 'undefined') {
  238. o.server.auto_reconnect = opts[name];
  239. }
  240. break;
  241. case 'socketTimeoutMS':
  242. case 'connectTimeoutMS':
  243. if (typeof o.server.socketOptions[name] === 'undefined') {
  244. o.server.socketOptions[name] = o.replset.socketOptions[name] = opts[name];
  245. }
  246. break;
  247. case 'authdb':
  248. if (typeof o.auth.authdb === 'undefined') {
  249. o.auth.authdb = opts[name];
  250. }
  251. break;
  252. case 'authSource':
  253. if (typeof o.auth.authSource === 'undefined') {
  254. o.auth.authSource = opts[name];
  255. }
  256. break;
  257. case 'retries':
  258. case 'reconnectWait':
  259. case 'rs_name':
  260. if (typeof o.replset[name] === 'undefined') {
  261. o.replset[name] = opts[name];
  262. }
  263. break;
  264. case 'replicaSet':
  265. if (typeof o.replset.rs_name === 'undefined') {
  266. o.replset.rs_name = opts[name];
  267. }
  268. break;
  269. case 'readSecondary':
  270. if (typeof o.replset.read_secondary === 'undefined') {
  271. o.replset.read_secondary = opts[name];
  272. }
  273. break;
  274. case 'nativeParser':
  275. if (typeof o.db.native_parser === 'undefined') {
  276. o.db.native_parser = opts[name];
  277. }
  278. break;
  279. case 'w':
  280. case 'safe':
  281. case 'fsync':
  282. case 'journal':
  283. case 'wtimeoutMS':
  284. if (typeof o.db[name] === 'undefined') {
  285. o.db[name] = opts[name];
  286. }
  287. break;
  288. case 'readPreference':
  289. if (typeof o.db.readPreference === 'undefined') {
  290. o.db.readPreference = opts[name];
  291. }
  292. break;
  293. case 'readPreferenceTags':
  294. if (typeof o.db.read_preference_tags === 'undefined') {
  295. o.db.read_preference_tags = opts[name];
  296. }
  297. break;
  298. case 'sslValidate':
  299. o.server.sslValidate = opts.sslValidate;
  300. o.replset.sslValidate = opts.sslValidate;
  301. o.mongos && (o.mongos.sslValidate = opts.sslValidate);
  302. }
  303. });
  304. if (!('auto_reconnect' in o.server)) {
  305. o.server.auto_reconnect = true;
  306. }
  307. // mongoose creates its own ObjectIds
  308. o.db.forceServerObjectId = false;
  309. // default safe using new nomenclature
  310. if (!('journal' in o.db || 'j' in o.db ||
  311. 'fsync' in o.db || 'safe' in o.db || 'w' in o.db)) {
  312. o.db.w = 1;
  313. }
  314. if (o.promiseLibrary) {
  315. o.db.promiseLibrary = o.promiseLibrary;
  316. }
  317. validate(o);
  318. return o;
  319. };
  320. /*!
  321. * Validates the driver db options.
  322. *
  323. * @param {Object} o
  324. */
  325. function validate(o) {
  326. if (o.db.w === -1 || o.db.w === 0) {
  327. if (o.db.journal || o.db.fsync || o.db.safe) {
  328. throw new Error(
  329. 'Invalid writeConcern: '
  330. + 'w set to -1 or 0 cannot be combined with safe|fsync|journal');
  331. }
  332. }
  333. }
  334. /*!
  335. * Module exports.
  336. */
  337. module.exports = NativeConnection;