Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from net.stcp.client import Client as StcpClient 

2from net.tcp.client import Client as TcpClient 

3from io.buffered_reader import BufferedReader 

4 

5class HttpError(Error): 

6 message: string 

7 

8class Response: 

9 status_code: i64 

10 headers: {string: string} 

11 content: bytes 

12 

13trait _Io: 

14 

15 func write(self, data: bytes): 

16 pass 

17 

18 func read(self, size: i64) -> bytes: 

19 pass 

20 

21 func read_line(self) -> string: 

22 pass 

23 

24 func read_headers(self) -> {string: string}: 

25 headers: {string: string} = {} 

26 

27 while True: 

28 line = self.read_line() 

29 

30 if line.length() == 0: 

31 break 

32 

33 mo = line.match(re"^([^:]+): (.*)$") 

34 

35 if mo is not None: 

36 headers[mo.group(1).lower()] = mo.group(2) 

37 

38 return headers 

39 

40func _read_line(buffered_reader: BufferedReader) -> string: 

41 line = buffered_reader.read_until(b"\r\n", keep_pattern=False) 

42 

43 if line is None: 

44 return "" 

45 else: 

46 return string(line) 

47 

48class _TcpIo(_Io): 

49 _client: TcpClient 

50 _buffered_reader: BufferedReader 

51 

52 func __init__(self, client: TcpClient): 

53 self._client = client 

54 self._buffered_reader = BufferedReader(client) 

55 

56 func write(self, data: bytes): 

57 self._client.write(data) 

58 

59 func read(self, size: i64) -> bytes: 

60 return self._buffered_reader.read(size) 

61 

62 func read_line(self) -> string: 

63 return _read_line(self._buffered_reader) 

64 

65class _StcpIo(_Io): 

66 _client: StcpClient 

67 _buffered_reader: BufferedReader 

68 

69 func __init__(self, client: StcpClient): 

70 self._client = client 

71 self._buffered_reader = BufferedReader(client) 

72 

73 func write(self, data: bytes): 

74 self._client.write(data) 

75 

76 func read(self, size: i64) -> bytes: 

77 return self._buffered_reader.read(size) 

78 

79 func read_line(self) -> string: 

80 return _read_line(self._buffered_reader) 

81 

82func _execute(method: string, 

83 host: string, 

84 port: i64, 

85 path: string, 

86 secure: bool, 

87 headers: {string: string}, 

88 data: bytes? = None, 

89 check: bool = True) -> Response: 

90 io: _Io? = None 

91 

92 if secure: 

93 client = StcpClient() 

94 client.connect(host, port) 

95 io = _StcpIo(client) 

96 else: 

97 client = TcpClient() 

98 client.connect(host, port) 

99 io = _TcpIo(client) 

100 

101 if "host" not in headers: 

102 headers["host"] = host 

103 

104 if "user-agent" not in headers: 

105 headers["user-agent"] = "mys/0.1.0" 

106 

107 if "accept" not in headers: 

108 headers["accept"] = "*/*" 

109 

110 if data is not None: 

111 headers["content-length"] = f"{data.length()}" 

112 

113 request = f"{method} {path} HTTP/1.1\r\n" 

114 

115 for name, value in headers: 

116 request += f"{name}: {value}\r\n" 

117 

118 request += "\r\n" 

119 request_bytes = request.to_utf8() 

120 

121 if data is not None: 

122 request_bytes += data 

123 

124 io.write(request_bytes) 

125 line = io.read_line() 

126 mo = line.match(re"^HTTP/1.1 (\d+) ") 

127 

128 if mo is None: 

129 raise HttpError(f"Invalid response '{line}'.") 

130 

131 status_code = i64(mo.group(1)) 

132 

133 if check and status_code != 200: 

134 raise HttpError(f"Unsuccessful status code in response ({status_code}).") 

135 

136 headers = io.read_headers() 

137 content_length = headers.get("content-length", None) 

138 transfer_encoding = headers.get("transfer-encoding", None) 

139 

140 if content_length is None: 

141 content = b"" 

142 

143 if transfer_encoding is None: 

144 while True: 

145 data = io.read(1) 

146 

147 if data.length() == 0: 

148 break 

149 

150 content += data 

151 elif "chunked" in transfer_encoding: 

152 while True: 

153 chunk_length = i64(io.read_line(), 16) 

154 

155 if chunk_length == 0: 

156 break 

157 

158 content += io.read(chunk_length) 

159 

160 if io.read_line().length() != 0: 

161 raise HttpError("Bad chunked content in response.") 

162 else: 

163 content = io.read(i64(content_length)) 

164 

165 return Response(status_code, headers, content) 

166 

167func get(host: string, 

168 port: i64 = 80, 

169 path: string = "/", 

170 secure: bool = False, 

171 headers: {string: string} = {}, 

172 check: bool = True) -> Response: 

173 """Get given path from given address. 

174 

175 """ 

176 

177 return _execute("GET", host, port, path, secure, headers, None, check) 

178 

179func post(host: string, 

180 content: bytes?, 

181 port: i64 = 80, 

182 path: string = "/", 

183 secure: bool = False, 

184 headers: {string: string} = {}, 

185 check: bool = True) -> Response: 

186 """Post given path and content to given address. 

187 

188 """ 

189 

190 return _execute("POST", host, port, path, secure, headers, content, check)