Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from . import HttpError 

2 

3_RE_REQUEST: regex = re"^(\w+) (([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))? HTTP/1.1$" 

4_RE_HEADER_LINE: regex = re"^([^:]+): (.*)$" 

5_HEADER_END: bytes = b"\r\n" 

6 

7class Request: 

8 """A HTTP request. 

9 

10 """ 

11 

12 method: string 

13 path: string 

14 query: string? 

15 params: {string: string} 

16 fragment: string? 

17 headers: {string: string} 

18 

19class _HeaderParser: 

20 """Parses HTTP headers. 

21 

22 """ 

23 

24 _header: bytes 

25 _offset: i64 

26 _size: i64 

27 

28 func __init__(self, header: bytes): 

29 """Initialize the parser with given header buffer. Call reset() before 

30 reading from the header. 

31 

32 """ 

33 

34 self._header = header 

35 self._offset = 0 

36 self._size = 0 

37 

38 func parse_request(self, size: i64) -> Request: 

39 """Parse a request of given size from start of header buffer. 

40 

41 """ 

42 

43 self._offset = 0 

44 self._size = i64(size) 

45 

46 line = self._read_line() 

47 mo = line.match(_RE_REQUEST) 

48 

49 if mo is None: 

50 raise HttpError(f"invalid header line '{line}'") 

51 

52 query = mo.group(8) 

53 

54 if query is None: 

55 params: {string: string} = {} 

56 else: 

57 params = parse_query(query) 

58 

59 request = Request(mo.group(1), 

60 mo.group(6), 

61 query, 

62 params, 

63 mo.group(10), 

64 self._parse_headers()) 

65 

66 if self._offset != self._size: 

67 raise HttpError("remaining header data") 

68 

69 return request 

70 

71 func _read_line(self) -> string: 

72 """Returns the next header line. Returns the empty string if no more 

73 lines are available. 

74 

75 """ 

76 

77 pos = self._header.find(_HEADER_END, self._offset, self._size) 

78 offset = self._offset 

79 self._offset = pos + 2 

80 

81 return string(self._header, offset, pos) 

82 

83 func _parse_headers(self) -> {string: string}: 

84 headers: {string: string} = {} 

85 

86 while True: 

87 line = self._read_line() 

88 

89 if line.length() == 0: 

90 break 

91 

92 mo = line.match(_RE_HEADER_LINE) 

93 

94 if mo is None: 

95 raise HttpError(f"invalid header line '{line}'") 

96 

97 headers[mo.group(1).lower()] = mo.group(2) 

98 

99 return headers 

100 

101func parse_request(header: bytes) -> Request: 

102 """Parse given request header. 

103 

104 """ 

105 

106 return parse_request(header, header.length()) 

107 

108func parse_request(header: bytes, size: i64) -> Request: 

109 """Parse given request header. 

110 

111 """ 

112 

113 parser = _HeaderParser(header) 

114 

115 return parser.parse_request(size) 

116 

117func parse_query(value: string) -> {string: string}: 

118 """Parse given parameters and return them as a dictionary. 

119 

120 """ 

121 

122 params: {string: string} = {} 

123 

124 for param in value.split('&'): 

125 if param.length() == 0: 

126 continue 

127 

128 pos = param.find('=') 

129 

130 if pos == -1: 

131 params[param] = "" 

132 else: 

133 params[param[0:pos]] = param[pos + 1:] 

134 

135 return params 

136 

137test request(): 

138 header = (b"GET /statistics.html?foo#bar HTTP/1.1\r\n" 

139 b"Host: mys-lang.org\r\n" 

140 b"Cache-Control: no-cache\r\n" 

141 b"Referer: https://mys-lang.org/\r\n" 

142 b"Accept-Encoding: gzip, deflate, br\r\n" 

143 b"\r\n") 

144 request = parse_request(header) 

145 assert request.method == "GET" 

146 assert request.path == "/statistics.html" 

147 assert request.query == "foo" 

148 assert request.fragment == "bar" 

149 assert request.headers == {"accept-encoding": "gzip, deflate, br", 

150 "referer": "https://mys-lang.org/", 

151 "cache-control": "no-cache", 

152 "host": "mys-lang.org"} 

153 

154test request_no_query_nor_fragment(): 

155 header = (b"GET / HTTP/1.1\r\n" 

156 b"\r\n") 

157 request = parse_request(header) 

158 assert request.method == "GET" 

159 assert request.path == "/" 

160 assert request.query is None 

161 assert request.params == {} 

162 assert request.fragment is None 

163 assert request.headers == {} 

164 

165test request_with_fragment(): 

166 header = (b"GET /#foo HTTP/1.1\r\n" 

167 b"\r\n") 

168 request = parse_request(header) 

169 assert request.method == "GET" 

170 assert request.path == "/" 

171 assert request.query is None 

172 assert request.params == {} 

173 assert request.fragment == "foo" 

174 assert request.headers == {} 

175 

176test request_with_query(): 

177 header = (b"GET /?foo HTTP/1.1\r\n" 

178 b"\r\n") 

179 request = parse_request(header) 

180 assert request.method == "GET" 

181 assert request.path == "/" 

182 assert request.query == "foo" 

183 assert request.params == {"foo": ""} 

184 assert request.fragment is None 

185 assert request.headers == {} 

186 

187test request_invalid_request_line(): 

188 header = (b" GET /statistics.html HTTP/1.1\r\n" 

189 b"\r\n") 

190 

191 try: 

192 message = "" 

193 parse_request(header) 

194 except HttpError as err: 

195 message = err.message 

196 

197 assert message == "invalid header line ' GET /statistics.html HTTP/1.1'" 

198 

199test request_invalid_header_line(): 

200 header = (b"GET / HTTP/1.1\r\n" 

201 b"Host mys-lang.org\r\n" 

202 b"\r\n") 

203 

204 try: 

205 message = "" 

206 parse_request(header) 

207 except HttpError as err: 

208 message = err.message 

209 

210 assert message == "invalid header line 'Host mys-lang.org'" 

211 

212test request_remaining_data(): 

213 header = (b"GET / HTTP/1.1\r\n" 

214 b"Host: mys-lang.org\r\n" 

215 b"\r\n" 

216 b"\r\n") 

217 

218 try: 

219 message = "" 

220 parse_request(header) 

221 except HttpError as err: 

222 message = err.message 

223 

224 assert message == "remaining header data" 

225 

226test request_with_size(): 

227 header = (b"GET / HTTP/1.1\r\n" 

228 b"Host: mys-lang.org\r\n" 

229 b"\r\n" 

230 b"\r\n") 

231 request = parse_request(header, header.length() - 2) 

232 assert request.method == "GET" 

233 assert request.path == "/" 

234 assert request.query is None 

235 assert request.params == {} 

236 assert request.fragment is None 

237 assert request.headers == {"host": "mys-lang.org"} 

238 

239test parse_query(): 

240 assert parse_query("") == {} 

241 assert parse_query("a=10") == {"a": "10"} 

242 assert parse_query("a=10&foo=bar") == {"a": "10", "foo": "bar"} 

243 assert parse_query("a=3&bbb=123ffds&&s=&1&=2=3") == {"1": "", 

244 "s": "", 

245 "bbb": "123ffds", 

246 "a": "3", 

247 "": "2=3"}