Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from fiber import Fiber
2from fiber import current
3from fiber import suspend
4from io import Reader
5from .. import NetError
7c"""
8class WriteRequest {
9public:
10 uv_buf_t m_buf;
11 uv_write_t m_request;
12 Bytes m_data;
13 Client *m_client;
14};
16static void on_client_connected(uv_stream_t *socket_p, int status)
17{
18 Server *server_p = (Server *)(socket_p->data);
20 if (status != 0) {
21 return;
22 }
24 if (server_p->_accepted_clients->length() == 10) {
25 return;
26 }
28 mys::shared_ptr<Client> client = mys::make_shared<Client>();
30 if (uv_accept(socket_p, (uv_stream_t *)&client->m_socket) == 0) {
31 server_p->_accepted_clients->append(client);
33 if (server_p->_fiber) {
34 resume(server_p->_fiber);
35 server_p->_fiber = nullptr;
36 }
37 }
38}
40static void on_read_complete(uv_stream_t *request_p,
41 ssize_t nread,
42 const uv_buf_t* buf_p)
43{
44 Client *client_p = (Client *)(request_p->data);
45 bool completed = false;
47 if (nread > 0) {
48 client_p->_read_offset += nread;
50 if (client_p->_read_offset == client_p->_read_end) {
51 completed = true;
52 }
53 } else if (nread < 0) {
54 completed = true;
55 client_p->_read_data.m_bytes->resize(client_p->_read_offset);
56 client_p->_error = true;
57 }
59 if (completed) {
60 uv_read_stop(request_p);
61 resume(client_p->_fiber);
62 }
63}
65static void read_alloc(uv_handle_t *handle_p, size_t size, uv_buf_t *buf_p)
66{
67 Client *client_p = (Client *)(handle_p->data);
69 buf_p->base = ((char *)client_p->_read_data.m_bytes->data()
70 + client_p->_read_offset);
71 buf_p->len = client_p->_read_data.m_bytes->size() - client_p->_read_offset;
72}
74static void on_try_read_into_complete(uv_stream_t *request_p,
75 ssize_t nread,
76 const uv_buf_t* buf_p)
77{
78 Client *client_p = (Client *)(request_p->data);
80 if (nread > 0) {
81 client_p->_read_offset += nread;
82 } else if (nread < 0) {
83 client_p->_error = true;
84 }
86 uv_read_stop(request_p);
87 resume(client_p->_fiber);
88}
90static void try_read_into_alloc(uv_handle_t *handle_p, size_t size, uv_buf_t *buf_p)
91{
92 Client *client_p = (Client *)(handle_p->data);
94 buf_p->base = ((char *)client_p->_read_data.m_bytes->data()
95 + client_p->_read_offset);
96 buf_p->len = client_p->_read_end - client_p->_read_offset;
97}
99static void on_write_complete(uv_write_t *request_p, int status)
100{
101 WriteRequest *write_request_p = (WriteRequest *)(request_p->data);
103 if (status != 0) {
104 write_request_p->m_client->_error = true;
105 }
107 delete write_request_p;
108}
110static void on_close_complete(uv_handle_t *handle_p)
111{
112 Client *client_p = (Client *)(handle_p->data);
114 resume(client_p->_fiber);
115}
117static void on_shutdown_complete(uv_shutdown_t *request_p, int status)
118{
119 Client *client_p = (Client *)(request_p->data);
121 uv_close((uv_handle_t*)&client_p->m_socket, on_close_complete);
122}
123"""
125class Client(Reader):
126 c"""
127 uv_tcp_t m_socket;
128 uv_buf_t m_buf;
129 uv_shutdown_t m_shutdown;
130 """
131 _read_data: bytes?
132 _read_offset: i64
133 _read_end: i64
134 _fiber: Fiber?
135 _connected: bool
136 _error: bool
138 func __init__(self):
139 c"""
140 uv_tcp_init(uv_default_loop(), &m_socket);
141 m_socket.data = this;
142 m_shutdown.data = this;
143 """
145 self._fiber = None
146 self._connected = True
147 self._error = False
149 func __del__(self):
150 self.disconnect()
152 func _ensure_one_caller(self):
153 if self._fiber is not None:
154 raise NetError("Only one fiber may perform blocking operations.")
156 func _wait_for_completion(self):
157 self._fiber = current()
158 suspend()
159 self._fiber = None
161 func is_connected(self) -> bool:
162 """Returns true if conencted to the server, false otherwise.
164 """
166 return self._connected and not self._error
168 func disconnect(self):
169 """Disconnect from the server.
171 """
173 if not self._connected:
174 return
176 self._ensure_one_caller()
178 c"""
179 uv_shutdown(&m_shutdown, (uv_stream_t *)&m_socket, on_shutdown_complete);
180 """
182 self._wait_for_completion()
183 self._connected = False
184 self._error = False
186 func write(self, data: bytes):
187 """Write given data to the server. Never blocks. Raises an error if
188 disconnected.
190 """
192 if not self.is_connected():
193 raise NetError("Not connected.")
195 c"""
196 WriteRequest *request_p = new WriteRequest();
197 request_p->m_buf = uv_buf_init((char *)data.m_bytes->data(),
198 data.m_bytes->size());
199 request_p->m_request.data = request_p;
200 request_p->m_data = data;
201 request_p->m_client = this;
202 uv_write(&request_p->m_request,
203 (uv_stream_s *)&m_socket,
204 &request_p->m_buf,
205 1,
206 on_write_complete);
207 """
209 func read(self, size: i64) -> bytes:
210 """Read data from the server. Always returns size number of bytes,
211 unless the connection was closed, in which case the remaining
212 data is returned.
214 """
216 self._ensure_one_caller()
218 self._read_offset = 0
219 self._read_end = size
220 self._read_data = bytes(size)
222 c"""
223 if (uv_read_start((uv_stream_t*)&m_socket,
224 read_alloc,
225 on_read_complete) != 0) {
226 std::cout << "uv_read_start failed" << std::endl;
227 exit(1);
228 }
229 """
231 self._wait_for_completion()
233 data = self._read_data
234 self._read_data = None
236 return data
238 func try_read_into(self, data: bytes, offset: i64, size: i64) -> i64:
239 """Try to read data from the server into given buffer. Returns number
240 of read bytes, which is at least one and at most given size
241 bytes, unless the connection was closed, in which case the
242 remaining number of bytes is returned.
244 """
246 self._ensure_one_caller()
248 self._read_offset = offset
249 self._read_end = offset + size
250 self._read_data = data
252 c"""
253 if (uv_read_start((uv_stream_t*)&m_socket,
254 try_read_into_alloc,
255 on_try_read_into_complete) != 0) {
256 std::cout << "uv_read_start failed" << std::endl;
257 exit(1);
258 }
259 """
261 self._wait_for_completion()
263 self._read_data = None
265 return self._read_offset - offset
267class Server:
268 """A TCP server, listening for clients to connect.
270 """
272 c"uv_tcp_t m_socket;"
273 c"uv_connect_t m_listen;"
274 c"uv_buf_t m_buf;"
275 _fiber: Fiber?
276 _status: i32
277 _accepted_clients: [Client]
279 func __init__(self):
280 c"""
281 uv_tcp_init(uv_default_loop(), &m_socket);
282 m_socket.data = this;
283 """
285 self._fiber = None
286 self._accepted_clients = []
288 func listen(self, port: i64):
289 """Start listening for clients to connect to given `port` on any
290 interface.
292 """
294 c"""
295 struct sockaddr_in address;
297 address.sin_family = AF_INET;
298 address.sin_addr.s_addr = INADDR_ANY;
299 address.sin_port = htons(port);
301 uv_tcp_bind(&m_socket, (const struct sockaddr*)&address, 0);
302 uv_listen((uv_stream_t *)&m_socket, 10, on_client_connected);
303 """
305 func accept(self) -> Client:
306 """Wait for a client to connect and return it.
308 """
310 if self._accepted_clients.length() == 0:
311 self._fiber = current()
312 suspend()
314 return self._accepted_clients.pop()