Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from fiber import Fiber 

2from fiber import Queue 

3from http import get as http_get 

4from http.header_parser import Request 

5from json import decode as json_decode 

6from json import Value as JsonValue 

7from time import LocalDateTime 

8from collections.fifo import Fifo 

9from . import Status 

10from .activities import Activities 

11 

12RE_LOCATION: regex = re"^([\d.-]+),([\d.-]+)$" 

13RE_BOT: regex = re"bot"i 

14 

15class Location: 

16 latitude: f64 

17 longitude: f64 

18 response_status: Status 

19 

20class _ClientIpLookupFiber(Fiber): 

21 token: string? 

22 queue: Queue[(string, i64)] 

23 locations: {string: Location} 

24 _locations_order: Fifo[string] 

25 

26 func __init__(self, token: string?, locations: {string: Location}): 

27 self.token = token 

28 # i64 instead of Status as generics does only work with common 

29 # types. 

30 self.queue = Queue[(string, i64)]() 

31 self.locations = locations 

32 self._locations_order = Fifo[string](100) 

33 

34 func run(self): 

35 while True: 

36 client_ip_address, response_status = self.queue.get() 

37 

38 if self.token is None: 

39 continue 

40 

41 location = self.locations.get(client_ip_address, None) 

42 

43 if location is not None: 

44 location.response_status = Status(response_status) 

45 continue 

46 

47 try: 

48 content = string(http_get("ipinfo.io", 

49 80, 

50 f"/{client_ip_address}?token={self.token}", 

51 secure=False).content) 

52 

53 mo = json_decode(content).get("loc").string().match(RE_LOCATION) 

54 

55 if mo is None: 

56 continue 

57 

58 latitude = f64(mo.group(1)) 

59 longitude = f64(mo.group(2)) 

60 

61 if self._locations_order.is_full(): 

62 self.locations.pop(self._locations_order.pop(), None) 

63 

64 self.locations[client_ip_address] = Location(latitude, 

65 longitude, 

66 Status(response_status)) 

67 self._locations_order.push(client_ip_address) 

68 except Error as e: 

69 print(e) 

70 

71class OrderedCounter: 

72 order: Fifo[string] 

73 count: {string: i64} 

74 

75 func __init__(self): 

76 self.order = Fifo[string](100) 

77 self.count = {} 

78 

79 func increment(self, name: string): 

80 if name in self.count: 

81 self.count[name] += 1 

82 else: 

83 if self.order.is_full(): 

84 self.count.pop(self.order.pop(), 0) 

85 

86 self.count[name] = 1 

87 self.order.push(name) 

88 

89class Statistics: 

90 start_date_time: LocalDateTime 

91 requests: OrderedCounter 

92 number_of_requests: i64 

93 next_number_of_requests_activity: i64 

94 locations: {string: Location} 

95 clients_ip_addresses: {string} 

96 client_ip_lookup_fiber: _ClientIpLookupFiber 

97 referrers: OrderedCounter 

98 no_idle_client_handlers: i64 

99 number_of_graphql_requests: i64 

100 activities: Activities? 

101 

102 func __init__(self, ipinfo_token: string?, activities: Activities?): 

103 self.activities = activities 

104 self.start_date_time = LocalDateTime() 

105 self.requests = OrderedCounter() 

106 self.number_of_requests = 0 

107 self.next_number_of_requests_activity = 10 

108 self.locations = {} 

109 self.clients_ip_addresses = {} 

110 self.referrers = OrderedCounter() 

111 self.no_idle_client_handlers = 0 

112 self.number_of_graphql_requests = 0 

113 self.client_ip_lookup_fiber = _ClientIpLookupFiber(ipinfo_token, 

114 self.locations) 

115 self.client_ip_lookup_fiber.start() 

116 

117 func handle_request(self, request: Request, response_status: Status): 

118 if request.method != "GET": 

119 return 

120 

121 path = request.path 

122 

123 if not (path.ends_with(".html") or path == "/"): 

124 return 

125 

126 headers = request.headers 

127 

128 if headers.get("user-agent", "bot").match(RE_BOT) is not None: 

129 return 

130 

131 self.requests.increment(path) 

132 self.increment_number_of_requests() 

133 referrer = headers.get("referer", None) 

134 

135 if referrer is not None: 

136 if not referrer.starts_with("https://mys-lang.org/"): 

137 self.referrers.increment(referrer) 

138 

139 client_ip_address = headers.get("x-forwarded-for", None) 

140 

141 if client_ip_address is not None: 

142 if self.clients_ip_addresses.length() < 100: 

143 self.clients_ip_addresses.add(client_ip_address) 

144 

145 self.client_ip_lookup_fiber.queue.put((client_ip_address, 

146 i64(response_status))) 

147 

148 func increment_number_of_requests(self): 

149 self.number_of_requests += 1 

150 

151 if self.number_of_requests == self.next_number_of_requests_activity: 

152 self.activities.add("🔥", f"{self.number_of_requests} requests served.") 

153 self.next_number_of_requests_activity *= 10 

154 

155 func unique_clients(self) -> string: 

156 count = self.clients_ip_addresses.length() 

157 

158 if count < 100: 

159 return str(count) 

160 else: 

161 return f"{count} (capped)" 

162 

163test bot_user_agent(): 

164 statistics = Statistics(None, None) 

165 

166 statistics.handle_request( 

167 Request("GET", 

168 "/", 

169 None, 

170 {}, 

171 None, 

172 { 

173 "user-agent": ("Mozilla/5.0 (compatible; bingbot/2.0; " 

174 "+http://www.bing.com/bingbot.htm)") 

175 }), 

176 Status.Ok) 

177 assert statistics.requests.count.length() == 0 

178 

179 statistics.handle_request( 

180 Request("GET", 

181 "/", 

182 None, 

183 {}, 

184 None, 

185 { 

186 "user-agent": "Mozilla/5.0" 

187 }), 

188 Status.Ok) 

189 assert statistics.requests.count.length() == 1 

190 

191 statistics.handle_request( 

192 Request("GET", 

193 "/", 

194 None, 

195 {}, 

196 None, 

197 { 

198 }), 

199 Status.Ok) 

200 assert statistics.requests.count.length() == 1