-
Notifications
You must be signed in to change notification settings - Fork 2
/
index.js
111 lines (96 loc) · 3.32 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
var Q = require("q");
var MonetDB = require("monetdb")();
module.exports = function MonetDBPool(poolOptions, connOptions) {
var self = this;
var _connections = [];
if(typeof(poolOptions) != "object" || typeof(connOptions) != "object") {
throw new Error("Need two options objects to construct a MonetDBPool object");
}
if(!poolOptions.nrConnections || parseInt(poolOptions.nrConnections) <= 0) {
throw new Error("Need a valid nrConnections argument");
}
for(var i=0; i<parseInt(poolOptions.nrConnections); ++i) {
_connections.push(new MonetDB(connOptions));
}
_connections.forEach(function(conn) {
conn._runningQueries = 0;
conn._reserved = false;
conn.free = function() { conn._reserved = false; }
});
self.nextConnection = function(reserve) {
var available = _connections
.filter(function(d) { return !d._reserved; })
.map(function(d) { return d._runningQueries; });
if(!available.length) {
return null;
}
var minRunning = Math.min.apply(null, available);
for(var i=0; i<_connections.length; ++i) {
var conn = _connections[i];
if(!conn._reserved && conn._runningQueries == minRunning) {
if(reserve) _connections[i]._reserved = true;
return _connections[i];
}
}
};
["connect", "close"].forEach(function(d) {
self[d] = function() {
var args = arguments;
return Q.all(
_connections.map(function (conn) {
return conn[d].apply(conn[d], args);
})
);
};
});
["query", "querystream", "prepare"].forEach(function(d) {
self[d] = function() {
var args = arguments;
var nextConn = self.nextConnection();
if(!nextConn) {
var deferred = Q.defer();
deferred.reject(new Error("No available connection"));
return deferred.promise;
}
++nextConn._runningQueries;
var conn = nextConn[d].apply(nextConn[d], args);
conn.release = function (){
--nextConn._runningQueries;
}
//if streaming, we decrease the runningQueries number on the end event
if (conn.on){
conn.on('end', function() {
conn.release();
});
//if promise, we decrease the runningQueries number on the resolve
} else {
conn = conn.fin(function() {
--nextConn._runningQueries;
});
}
return conn;
/*Legacy
return nextConn[d].apply(nextConn[d], args).fin(function() {
--nextConn._runningQueries;
});*/
};
});
// destroy is separated from the above two functions generators because it does not return a promise
self.destroy = function(msg) {
_connections.forEach(function(d) {
d.destroy(msg);
});
};
//function to set the load of the pool
self.getRunningQueries = function() {
var available = _connections
.filter(function(d) { return !d._reserved; })
.map(function(d) { return d._runningQueries; });
return available;
};
if(poolOptions.testing) {
self.getConnections = function() {
return _connections;
}
}
};