Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from fiber import Fiber
2from fiber import Queue
3from http import get as http_get
4from http.header_parser import Request
5from json import decode as json_decode
6from json import Value as JsonValue
7from time import LocalDateTime
8from collections.fifo import Fifo
9from . import Status
10from .activities import Activities
12RE_LOCATION: regex = re"^([\d.-]+),([\d.-]+)$"
13RE_BOT: regex = re"bot"i
15class Location:
16 latitude: f64
17 longitude: f64
18 response_status: Status
20class _ClientIpLookupFiber(Fiber):
21 token: string?
22 queue: Queue[(string, i64)]
23 locations: {string: Location}
24 _locations_order: Fifo[string]
26 func __init__(self, token: string?, locations: {string: Location}):
27 self.token = token
28 # i64 instead of Status as generics does only work with common
29 # types.
30 self.queue = Queue[(string, i64)]()
31 self.locations = locations
32 self._locations_order = Fifo[string](100)
34 func run(self):
35 while True:
36 client_ip_address, response_status = self.queue.get()
38 if self.token is None:
39 continue
41 location = self.locations.get(client_ip_address, None)
43 if location is not None:
44 location.response_status = Status(response_status)
45 continue
47 try:
48 content = string(http_get("ipinfo.io",
49 80,
50 f"/{client_ip_address}?token={self.token}",
51 secure=False).content)
53 mo = json_decode(content).get("loc").string().match(RE_LOCATION)
55 if mo is None:
56 continue
58 latitude = f64(mo.group(1))
59 longitude = f64(mo.group(2))
61 if self._locations_order.is_full():
62 self.locations.pop(self._locations_order.pop(), None)
64 self.locations[client_ip_address] = Location(latitude,
65 longitude,
66 Status(response_status))
67 self._locations_order.push(client_ip_address)
68 except Error as e:
69 print(e)
71class OrderedCounter:
72 order: Fifo[string]
73 count: {string: i64}
75 func __init__(self):
76 self.order = Fifo[string](100)
77 self.count = {}
79 func increment(self, name: string):
80 if name in self.count:
81 self.count[name] += 1
82 else:
83 if self.order.is_full():
84 self.count.pop(self.order.pop(), 0)
86 self.count[name] = 1
87 self.order.push(name)
89class Statistics:
90 start_date_time: LocalDateTime
91 requests: OrderedCounter
92 number_of_requests: i64
93 next_number_of_requests_activity: i64
94 locations: {string: Location}
95 clients_ip_addresses: {string}
96 client_ip_lookup_fiber: _ClientIpLookupFiber
97 referrers: OrderedCounter
98 no_idle_client_handlers: i64
99 number_of_graphql_requests: i64
100 activities: Activities?
102 func __init__(self, ipinfo_token: string?, activities: Activities?):
103 self.activities = activities
104 self.start_date_time = LocalDateTime()
105 self.requests = OrderedCounter()
106 self.number_of_requests = 0
107 self.next_number_of_requests_activity = 10
108 self.locations = {}
109 self.clients_ip_addresses = {}
110 self.referrers = OrderedCounter()
111 self.no_idle_client_handlers = 0
112 self.number_of_graphql_requests = 0
113 self.client_ip_lookup_fiber = _ClientIpLookupFiber(ipinfo_token,
114 self.locations)
115 self.client_ip_lookup_fiber.start()
117 func handle_request(self, request: Request, response_status: Status):
118 if request.method != "GET":
119 return
121 path = request.path
123 if not (path.ends_with(".html") or path == "/"):
124 return
126 headers = request.headers
128 if headers.get("user-agent", "bot").match(RE_BOT) is not None:
129 return
131 self.requests.increment(path)
132 self.increment_number_of_requests()
133 referrer = headers.get("referer", None)
135 if referrer is not None:
136 if not referrer.starts_with("https://mys-lang.org/"):
137 self.referrers.increment(referrer)
139 client_ip_address = headers.get("x-forwarded-for", None)
141 if client_ip_address is not None:
142 if self.clients_ip_addresses.length() < 100:
143 self.clients_ip_addresses.add(client_ip_address)
145 self.client_ip_lookup_fiber.queue.put((client_ip_address,
146 i64(response_status)))
148 func increment_number_of_requests(self):
149 self.number_of_requests += 1
151 if self.number_of_requests == self.next_number_of_requests_activity:
152 self.activities.add("🔥", f"{self.number_of_requests} requests served.")
153 self.next_number_of_requests_activity *= 10
155 func unique_clients(self) -> string:
156 count = self.clients_ip_addresses.length()
158 if count < 100:
159 return str(count)
160 else:
161 return f"{count} (capped)"
163test bot_user_agent():
164 statistics = Statistics(None, None)
166 statistics.handle_request(
167 Request("GET",
168 "/",
169 None,
170 {},
171 None,
172 {
173 "user-agent": ("Mozilla/5.0 (compatible; bingbot/2.0; "
174 "+http://www.bing.com/bingbot.htm)")
175 }),
176 Status.Ok)
177 assert statistics.requests.count.length() == 0
179 statistics.handle_request(
180 Request("GET",
181 "/",
182 None,
183 {},
184 None,
185 {
186 "user-agent": "Mozilla/5.0"
187 }),
188 Status.Ok)
189 assert statistics.requests.count.length() == 1
191 statistics.handle_request(
192 Request("GET",
193 "/",
194 None,
195 {},
196 None,
197 {
198 }),
199 Status.Ok)
200 assert statistics.requests.count.length() == 1