Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from bson import Array 

2from bson import Boolean 

3from bson import BsonError 

4from bson import Document 

5from bson import Int32 

6from bson import Int64 

7from bson import String 

8from bson import decode as bson_decode 

9from bson import encode as bson_encode 

10from net.tcp.client import Client as TcpClient 

11 

12class MongodbError(Error): 

13 message: string 

14 

15func _pack_u32(data: bytes, offset: i64, value: u32): 

16 data[offset + 0] = u8(value) 

17 data[offset + 1] = u8(value >> 8) 

18 data[offset + 2] = u8(value >> 16) 

19 data[offset + 3] = u8(value >> 24) 

20 

21func _unpack_u32(data: bytes, offset: i64) -> u32: 

22 return (u32(data[offset + 0]) 

23 | (u32(data[offset + 1]) << 8) 

24 | (u32(data[offset + 2]) << 16) 

25 | (u32(data[offset + 3]) << 24)) 

26 

27class Client: 

28 """A MongoDB client. 

29 

30 """ 

31 

32 _client: TcpClient 

33 

34 func __init__(self): 

35 self._client = TcpClient() 

36 

37 func connect(self, host: string = "127.0.0.1", port: i64 = 27017): 

38 """Connect to given server. 

39 

40 """ 

41 

42 self._client.connect(host, port) 

43 

44 func disconnect(self): 

45 """Disconnect from the server. 

46 

47 """ 

48 

49 self._client.disconnect() 

50 

51 func list_databases(self) -> [string]: 

52 """List all databases. 

53 

54 """ 

55 

56 document = Document([ 

57 ("listDatabases", Int32(1)), 

58 ("nameOnly", Boolean(True)), 

59 ("$db", String("admin")) 

60 ]) 

61 

62 return [ 

63 element.get("name").string() 

64 for element in self.run_command(document).get("databases").array() 

65 ] 

66 

67 func list_collections(self, database: string) -> [string]: 

68 """List all collections in given database. 

69 

70 """ 

71 

72 document = Document([ 

73 ("listCollections", Int32(1)), 

74 ("nameOnly", Boolean(True)), 

75 ("$db", String(database)) 

76 ]) 

77 

78 return [ 

79 element.get("name").string() 

80 for element in (self 

81 .run_command(document) 

82 .get("cursor") 

83 .get("firstBatch").array()) 

84 ] 

85 

86 func drop(self, database: string, collection: string): 

87 """Drop given collection in given database. 

88 

89 """ 

90 

91 document = Document([ 

92 ("drop", String(collection)), 

93 ("$db", String(database)) 

94 ]) 

95 self.run_command(document) 

96 

97 func insert(self, database: string, collection: string, documents: Array): 

98 """Insert given documents in given collection in given database. 

99 

100 """ 

101 

102 document = Document([ 

103 ("insert", String(collection)), 

104 ("documents", documents), 

105 ("$db", String(database)) 

106 ]) 

107 self.run_command(document) 

108 

109 func delete(self, database: string, collection: string, deletes: Array): 

110 """Delete documents in given collection in given database. 

111 

112 """ 

113 

114 document = Document([ 

115 ("delete", String(collection)), 

116 ("deletes", deletes), 

117 ("$db", String(database)) 

118 ]) 

119 self.run_command(document) 

120 

121 func find_many(self, 

122 database: string, 

123 collection: string, 

124 filter: Document? = None, 

125 sort: Document? = None, 

126 projection: Document? = None) -> [Document]: 

127 """Find documents in given collection in given database matching given 

128 filter. 

129 

130 """ 

131 

132 batch = self.find(database, collection, filter, sort, projection) 

133 documents: [Document] = [] 

134 documents.extend(batch.get("cursor").get("firstBatch").array()) 

135 cursor_id = batch.get("cursor").get("id") 

136 

137 while cursor_id.int64() != 0: 

138 batch = self.get_more(database, collection, cursor_id) 

139 cursor_id = batch.get("cursor").get("id") 

140 documents.extend(batch.get("cursor").get("nextBatch").array()) 

141 

142 return documents 

143 

144 func find(self, 

145 database: string, 

146 collection: string, 

147 filter: Document? = None, 

148 sort: Document? = None, 

149 projection: Document? = None) -> Document: 

150 """Find first batch of documents in given collection in given database 

151 matching given filter. 

152 

153 """ 

154 

155 document = Document([ 

156 ("find", String(collection)), 

157 ("batchSize", Int64(100)), 

158 ("$db", String(database)) 

159 ]) 

160 

161 if filter is not None: 

162 document.elements.append(("filter", filter)) 

163 

164 if sort is not None: 

165 document.elements.append(("sort", sort)) 

166 

167 if projection is not None: 

168 document.elements.append(("projection", projection)) 

169 

170 return self.run_command(document) 

171 

172 func get_more(self, 

173 database: string, 

174 collection: string, 

175 cursor_id: Int64) -> Document: 

176 """Get documents at given cursor id in given collection in given 

177 database starting. Call find() to get first batch of documents and 

178 the first cursor. 

179 

180 """ 

181 

182 document = Document([ 

183 ("getMore", cursor_id), 

184 ("batchSize", Int64(100)), 

185 ("collection", String(collection)), 

186 ("$db", String(database)) 

187 ]) 

188 

189 return self.run_command(document) 

190 

191 func run_command(self, command: Document) -> Document: 

192 """Run given command and returns its response. 

193 

194 """ 

195 

196 encoded = bson_encode(command) 

197 header = bytes(21) 

198 _pack_u32(header, 0, u32(header.length() + encoded.length())) 

199 _pack_u32(header, 4, 1) 

200 _pack_u32(header, 8, 0) 

201 _pack_u32(header, 12, 2013) 

202 _pack_u32(header, 16, 0) 

203 header[20] = 0 

204 self._client.write(header) 

205 self._client.write(encoded) 

206 

207 if self._client.read_into(header, 0, header.length()) != header.length(): 

208 raise MongodbError("Connection closed.") 

209 

210 length = i64(_unpack_u32(header, 0)) 

211 

212 if _unpack_u32(header, 12) != 2013: 

213 raise MongodbError("Unsupported message type.") 

214 

215 length -= header.length() 

216 data = self._client.read(length) 

217 

218 if data.length() != length: 

219 raise MongodbError("Connection closed.") 

220 

221 response = bson_decode(data) 

222 

223 if response.get("ok").double() == 0.0: 

224 message = response.get("errmsg").string() 

225 code = response.get("code").int32() 

226 code_name = response.get("codeName").string() 

227 

228 raise MongodbError(f"Command error: {message} ({code_name}({code}))") 

229 

230 return response 

231 

232test basic(): 

233 database = "mys-mongodb-test" 

234 collection = "foo" 

235 

236 client = Client() 

237 client.connect() 

238 

239 try: 

240 client.drop(database, collection) 

241 except MongodbError: 

242 pass 

243 

244 try: 

245 client.drop(database, collection) 

246 assert False 

247 except MongodbError as error: 

248 assert "ns not found" in error.message 

249 

250 # More than 100 documents (batch size). 

251 for _ in range(55): 

252 client.insert(database, 

253 collection, 

254 Array([ 

255 Document([("x", Boolean(True))]), 

256 Document([("x", Boolean(False))]) 

257 ])) 

258 

259 assert database in client.list_databases() 

260 assert collection in client.list_collections(database) 

261 

262 assert client.find_many(database, collection).length() == 110 

263 filter = Document([("x", Document([("$eq", Boolean(False))]))]) 

264 projection = Document([("_id", Int32(0))]) 

265 documents = client.find_many(database, 

266 collection, 

267 filter=filter, 

268 projection=projection) 

269 assert documents.length() == 55 

270 assert not documents[0].get("x").boolean() 

271 

272 try: 

273 documents[0].get("_id") 

274 assert False 

275 except BsonError: 

276 pass 

277 

278 assert client.find_many(database, collection).length() == 110 

279 client.delete(database, 

280 collection, 

281 Array([ 

282 Document([ 

283 ("q", Document([("x", Boolean(True))])), 

284 ("limit", Int32(0)) 

285 ]) 

286 ])) 

287 assert client.find_many(database, collection).length() == 55 

288 

289 client.disconnect()