Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from fiber import Fiber 

2from fiber import current 

3from fiber import suspend 

4from io import Reader 

5from .. import NetError 

6 

7c""" 

8class WriteRequest { 

9public: 

10 uv_buf_t m_buf; 

11 uv_write_t m_request; 

12 Bytes m_data; 

13 Client *m_client; 

14}; 

15 

16static void on_client_connected(uv_stream_t *socket_p, int status) 

17{ 

18 Server *server_p = (Server *)(socket_p->data); 

19 

20 if (status != 0) { 

21 return; 

22 } 

23 

24 if (server_p->_accepted_clients->length() == 10) { 

25 return; 

26 } 

27 

28 mys::shared_ptr<Client> client = mys::make_shared<Client>(); 

29 

30 if (uv_accept(socket_p, (uv_stream_t *)&client->m_socket) == 0) { 

31 server_p->_accepted_clients->append(client); 

32 

33 if (server_p->_fiber) { 

34 resume(server_p->_fiber); 

35 server_p->_fiber = nullptr; 

36 } 

37 } 

38} 

39 

40static void on_read_complete(uv_stream_t *request_p, 

41 ssize_t nread, 

42 const uv_buf_t* buf_p) 

43{ 

44 Client *client_p = (Client *)(request_p->data); 

45 bool completed = false; 

46 

47 if (nread > 0) { 

48 client_p->_read_offset += nread; 

49 

50 if (client_p->_read_offset == client_p->_read_end) { 

51 completed = true; 

52 } 

53 } else if (nread < 0) { 

54 completed = true; 

55 client_p->_read_data.m_bytes->resize(client_p->_read_offset); 

56 client_p->_error = true; 

57 } 

58 

59 if (completed) { 

60 uv_read_stop(request_p); 

61 resume(client_p->_fiber); 

62 } 

63} 

64 

65static void read_alloc(uv_handle_t *handle_p, size_t size, uv_buf_t *buf_p) 

66{ 

67 Client *client_p = (Client *)(handle_p->data); 

68 

69 buf_p->base = ((char *)client_p->_read_data.m_bytes->data() 

70 + client_p->_read_offset); 

71 buf_p->len = client_p->_read_data.m_bytes->size() - client_p->_read_offset; 

72} 

73 

74static void on_try_read_into_complete(uv_stream_t *request_p, 

75 ssize_t nread, 

76 const uv_buf_t* buf_p) 

77{ 

78 Client *client_p = (Client *)(request_p->data); 

79 

80 if (nread > 0) { 

81 client_p->_read_offset += nread; 

82 } else if (nread < 0) { 

83 client_p->_error = true; 

84 } 

85 

86 uv_read_stop(request_p); 

87 resume(client_p->_fiber); 

88} 

89 

90static void try_read_into_alloc(uv_handle_t *handle_p, size_t size, uv_buf_t *buf_p) 

91{ 

92 Client *client_p = (Client *)(handle_p->data); 

93 

94 buf_p->base = ((char *)client_p->_read_data.m_bytes->data() 

95 + client_p->_read_offset); 

96 buf_p->len = client_p->_read_end - client_p->_read_offset; 

97} 

98 

99static void on_write_complete(uv_write_t *request_p, int status) 

100{ 

101 WriteRequest *write_request_p = (WriteRequest *)(request_p->data); 

102 

103 if (status != 0) { 

104 write_request_p->m_client->_error = true; 

105 } 

106 

107 delete write_request_p; 

108} 

109 

110static void on_close_complete(uv_handle_t *handle_p) 

111{ 

112 Client *client_p = (Client *)(handle_p->data); 

113 

114 resume(client_p->_fiber); 

115} 

116 

117static void on_shutdown_complete(uv_shutdown_t *request_p, int status) 

118{ 

119 Client *client_p = (Client *)(request_p->data); 

120 

121 uv_close((uv_handle_t*)&client_p->m_socket, on_close_complete); 

122} 

123""" 

124 

125class Client(Reader): 

126 c""" 

127 uv_tcp_t m_socket; 

128 uv_buf_t m_buf; 

129 uv_shutdown_t m_shutdown; 

130 """ 

131 _read_data: bytes? 

132 _read_offset: i64 

133 _read_end: i64 

134 _fiber: Fiber? 

135 _connected: bool 

136 _error: bool 

137 

138 func __init__(self): 

139 c""" 

140 uv_tcp_init(uv_default_loop(), &m_socket); 

141 m_socket.data = this; 

142 m_shutdown.data = this; 

143 """ 

144 

145 self._fiber = None 

146 self._connected = True 

147 self._error = False 

148 

149 func __del__(self): 

150 self.disconnect() 

151 

152 func _ensure_one_caller(self): 

153 if self._fiber is not None: 

154 raise NetError("Only one fiber may perform blocking operations.") 

155 

156 func _wait_for_completion(self): 

157 self._fiber = current() 

158 suspend() 

159 self._fiber = None 

160 

161 func is_connected(self) -> bool: 

162 """Returns true if conencted to the server, false otherwise. 

163 

164 """ 

165 

166 return self._connected and not self._error 

167 

168 func disconnect(self): 

169 """Disconnect from the server. 

170 

171 """ 

172 

173 if not self._connected: 

174 return 

175 

176 self._ensure_one_caller() 

177 

178 c""" 

179 uv_shutdown(&m_shutdown, (uv_stream_t *)&m_socket, on_shutdown_complete); 

180 """ 

181 

182 self._wait_for_completion() 

183 self._connected = False 

184 self._error = False 

185 

186 func write(self, data: bytes): 

187 """Write given data to the server. Never blocks. Raises an error if 

188 disconnected. 

189 

190 """ 

191 

192 if not self.is_connected(): 

193 raise NetError("Not connected.") 

194 

195 c""" 

196 WriteRequest *request_p = new WriteRequest(); 

197 request_p->m_buf = uv_buf_init((char *)data.m_bytes->data(), 

198 data.m_bytes->size()); 

199 request_p->m_request.data = request_p; 

200 request_p->m_data = data; 

201 request_p->m_client = this; 

202 uv_write(&request_p->m_request, 

203 (uv_stream_s *)&m_socket, 

204 &request_p->m_buf, 

205 1, 

206 on_write_complete); 

207 """ 

208 

209 func read(self, size: i64) -> bytes: 

210 """Read data from the server. Always returns size number of bytes, 

211 unless the connection was closed, in which case the remaining 

212 data is returned. 

213 

214 """ 

215 

216 self._ensure_one_caller() 

217 

218 self._read_offset = 0 

219 self._read_end = size 

220 self._read_data = bytes(size) 

221 

222 c""" 

223 if (uv_read_start((uv_stream_t*)&m_socket, 

224 read_alloc, 

225 on_read_complete) != 0) { 

226 std::cout << "uv_read_start failed" << std::endl; 

227 exit(1); 

228 } 

229 """ 

230 

231 self._wait_for_completion() 

232 

233 data = self._read_data 

234 self._read_data = None 

235 

236 return data 

237 

238 func try_read_into(self, data: bytes, offset: i64, size: i64) -> i64: 

239 """Try to read data from the server into given buffer. Returns number 

240 of read bytes, which is at least one and at most given size 

241 bytes, unless the connection was closed, in which case the 

242 remaining number of bytes is returned. 

243 

244 """ 

245 

246 self._ensure_one_caller() 

247 

248 self._read_offset = offset 

249 self._read_end = offset + size 

250 self._read_data = data 

251 

252 c""" 

253 if (uv_read_start((uv_stream_t*)&m_socket, 

254 try_read_into_alloc, 

255 on_try_read_into_complete) != 0) { 

256 std::cout << "uv_read_start failed" << std::endl; 

257 exit(1); 

258 } 

259 """ 

260 

261 self._wait_for_completion() 

262 

263 self._read_data = None 

264 

265 return self._read_offset - offset 

266 

267class Server: 

268 """A TCP server, listening for clients to connect. 

269 

270 """ 

271 

272 c"uv_tcp_t m_socket;" 

273 c"uv_connect_t m_listen;" 

274 c"uv_buf_t m_buf;" 

275 _fiber: Fiber? 

276 _status: i32 

277 _accepted_clients: [Client] 

278 

279 func __init__(self): 

280 c""" 

281 uv_tcp_init(uv_default_loop(), &m_socket); 

282 m_socket.data = this; 

283 """ 

284 

285 self._fiber = None 

286 self._accepted_clients = [] 

287 

288 func listen(self, port: i64): 

289 """Start listening for clients to connect to given `port` on any 

290 interface. 

291 

292 """ 

293 

294 c""" 

295 struct sockaddr_in address; 

296 

297 address.sin_family = AF_INET; 

298 address.sin_addr.s_addr = INADDR_ANY; 

299 address.sin_port = htons(port); 

300 

301 uv_tcp_bind(&m_socket, (const struct sockaddr*)&address, 0); 

302 uv_listen((uv_stream_t *)&m_socket, 10, on_client_connected); 

303 """ 

304 

305 func accept(self) -> Client: 

306 """Wait for a client to connect and return it. 

307 

308 """ 

309 

310 if self._accepted_clients.length() == 0: 

311 self._fiber = current() 

312 suspend() 

313 

314 return self._accepted_clients.pop()