Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from os.path import Path
3c"""header-before-namespace
4#include <sqlite3.h>
5#include <string.h>
6"""
8enum _Result:
9 Ok = c"SQLITE_OK"
10 Row = c"SQLITE_ROW"
11 Done = c"SQLITE_DONE"
13enum Type(u8):
14 """Value type.
16 """
18 Integer = c"SQLITE_INTEGER"
19 Float = c"SQLITE_FLOAT"
20 String = c"SQLITE_TEXT"
21 Null = c"SQLITE_NULL"
22 Bytes = c"SQLITE_BLOB"
24class SqlError(Error):
25 message: string
27func _to_utf8(value: string) -> bytes:
28 value_utf8 = value.to_utf8()
29 value_utf8 += 0
31 return value_utf8
33class Database:
34 c"sqlite3 *database_p;"
36 func __init__(self, path: Path):
37 """Create or open a database.
39 """
41 path_utf8 = _to_utf8(str(path))
42 res = _Result.Ok
44 c"""
45 res = sqlite3_open((const char *)path_utf8.m_bytes->data(), &this->database_p);
46 """
48 if res != _Result.Ok:
49 raise SqlError("failed to open the database")
51 func __del__(self):
52 c"sqlite3_close(this->database_p);"
54 func execute(self, sql: string):
55 """Execute given statement.
57 """
59 sql_utf8 = _to_utf8(sql)
60 res = _Result.Ok
62 c"""
63 res = sqlite3_exec(this->database_p,
64 (const char *)sql_utf8.m_bytes->data(),
65 NULL,
66 NULL,
67 NULL);
68 """
70 if res != _Result.Ok:
71 raise SqlError(_create_error_message(self))
73 func prepare(self, sql: string) -> Statement:
74 """Prepare a statement. Safer than execute(), and faster if used more
75 than once.
77 """
79 return Statement(sql, self)
81func _create_error_message(database: Database) -> string:
82 message: string? = None
84 c"message = String(sqlite3_errmsg(database->database_p));"
86 return message
88class Statement:
89 database: Database
90 _number_of_columns: u32
91 c"sqlite3_stmt *stmt_p;"
93 func __init__(self, sql: string, database: Database):
94 self.database = database
95 self._number_of_columns = 0
96 sql_utf8 = _to_utf8(sql)
97 res = _Result.Ok
99 c"""
100 res = sqlite3_prepare(database->database_p,
101 (const char *)sql_utf8.m_bytes->data(),
102 -1,
103 &this->stmt_p,
104 NULL);
105 """
107 if res != _Result.Ok:
108 raise SqlError(_create_error_message(database))
110 func __del__(self):
111 c"sqlite3_finalize(this->stmt_p);"
113 func bind_int(self, column: u32, value: i64):
114 """Bind given integer to given column.
116 """
118 res = _Result.Ok
120 c"res = sqlite3_bind_int64(this->stmt_p, column, value);"
122 if res != _Result.Ok:
123 raise SqlError(_create_error_message(self.database))
125 func bind_float(self, column: u32, value: f64):
126 """Bind given float to given column.
128 """
130 res = _Result.Ok
132 c"res = sqlite3_bind_double(this->stmt_p, column, value);"
134 if res != _Result.Ok:
135 raise SqlError(_create_error_message(self.database))
137 func bind_string(self, column: u32, value: string):
138 """Bind given string to given column.
140 """
142 value_utf8 = _to_utf8(value)
143 res = _Result.Ok
145 c"""
146 res = sqlite3_bind_text(this->stmt_p,
147 column,
148 (const char *)value_utf8.m_bytes->data(),
149 -1,
150 SQLITE_TRANSIENT);
151 """
153 if res != _Result.Ok:
154 raise SqlError(_create_error_message(self.database))
156 func bind_bytes(self, column: u32, value: bytes):
157 """Bind given bytes to given column.
159 """
161 res = _Result.Ok
163 c"""
164 res = sqlite3_bind_blob(this->stmt_p,
165 column,
166 (const char *)value.m_bytes->data(),
167 value.m_bytes->size(),
168 SQLITE_TRANSIENT);
169 """
171 if res != _Result.Ok:
172 raise SqlError(_create_error_message(self.database))
174 func bind_null(self, column: u32):
175 """Bind null to given column.
177 """
179 res = _Result.Ok
181 c"res = sqlite3_bind_null(this->stmt_p, column);"
183 if res != _Result.Ok:
184 raise SqlError(_create_error_message(self.database))
186 func execute(self):
187 """Execute the statement. Bind any values to columns before executing
188 it. Calls reset() once complete.
190 """
192 result = self._step()
194 try:
195 if result != _Result.Done:
196 message = _create_error_message(self.database)
198 raise SqlError(_create_error_message(self.database))
199 finally:
200 self.reset()
202 func fetch(self) -> bool:
203 """Fetch the next row from the database. Returns True if a row was
204 fetched, or calls reset() and returns False when there are no
205 more rows available. If fetched, get column values with
206 column_*() methods.
208 """
210 result = self._step()
212 if result == _Result.Row:
213 c"this->_number_of_columns = sqlite3_data_count(this->stmt_p); "
215 return True
216 else:
217 self.reset()
219 if result == _Result.Done:
220 return False
221 else:
222 raise SqlError("fetch")
224 func _step(self) -> _Result:
225 res = 0
227 c"res = sqlite3_step(this->stmt_p);"
229 return _Result(res)
231 func reset(self):
232 """Reset the statement so it can be used again.
234 """
236 self._number_of_columns = 0
238 c"sqlite3_reset(this->stmt_p);"
240 func column_type(self, column: u32) -> Type:
241 """Get the type of given column.
243 """
245 if column >= self._number_of_columns:
246 raise SqlError(f"bad column {column}")
248 ctype: u8 = 0
250 c"ctype = sqlite3_column_type(this->stmt_p, column);"
252 return Type(ctype)
254 func column_int(self, column: u32) -> i64:
255 """Get the value of given column as an integer.
257 """
259 if column >= self._number_of_columns:
260 raise SqlError(f"bad column {column}")
262 value = 0
264 c"value = sqlite3_column_int64(this->stmt_p, column);"
266 return value
268 func column_float(self, column: u32) -> f64:
269 """Get the value of given column as a float.
271 """
273 if column >= self._number_of_columns:
274 raise SqlError(f"bad column {column}")
276 value = 0.0
278 c"value = sqlite3_column_double(this->stmt_p, column);"
280 return value
282 func column_string(self, column: u32) -> string?:
283 """Get the value of given column as a string.
285 """
287 if column >= self._number_of_columns:
288 raise SqlError(f"bad column {column}")
290 value: bytes? = b""
292 c"""
293 const unsigned char *value_p = sqlite3_column_text(this->stmt_p, column);
295 if (value_p != NULL) {
296 for (size_t i = 0; i < strlen((const char *)value_p); i++) {
297 value += value_p[i];
298 }
299 } else {
300 value = nullptr;
301 }
302 """
304 if value is None:
305 return None
307 return string(value)
309 func column_bytes(self, column: u32) -> bytes:
310 """Get the value of given column as bytes.
312 """
314 if column >= self._number_of_columns:
315 raise SqlError(f"bad column {column}")
317 value: bytes? = None
319 c"""
320 value = Bytes(sqlite3_column_bytes(this->stmt_p, column));
321 memcpy(value.m_bytes->data(),
322 sqlite3_column_blob(this->stmt_p, column),
323 value.m_bytes->size());
324 """
326 return value
328 func column_value_string(self, column: u32) -> string:
329 """Get given columns value as a string.
331 """
333 column_type = self.column_type(column)
335 if column_type == Type.Integer:
336 return str(self.column_int(column))
337 elif column_type == Type.Float:
338 return str(self.column_float(column))
339 elif column_type == Type.String:
340 return f"\"{self.column_string(column)}\""
341 elif column_type == Type.Bytes:
342 return str(self.column_bytes(column))
343 elif column_type == Type.Null:
344 return "null"
345 else:
346 raise SqlError(f"invalid column type {column_type}")
348test basics():
349 Path("the.db").rm(force=True)
351 database = Database(Path("the.db"))
353 database.execute("CREATE TABLE tab(foo, bar, baz)")
354 database.execute("INSERT INTO tab VALUES(1, 'one', null)")
355 database.execute("INSERT INTO tab VALUES(2, 2.2, 'two')")
356 database.execute("INSERT INTO tab VALUES(3, 'three', null)")
357 database.execute("INSERT INTO tab VALUES(4, X'89', null)")
359 statement = database.prepare("SELECT * FROM tab WHERE foo >= ? ORDER BY foo")
360 statement.bind_int(1, 2)
362 assert statement.fetch()
363 assert statement.column_type(0) == Type.Integer
364 assert statement.column_int(0) == 2
365 assert statement.column_type(1) == Type.Float
366 assert statement.column_float(1) == 2.2
367 assert statement.column_type(2) == Type.String
368 assert statement.column_string(2) == "two"
370 assert statement.fetch()
371 assert statement.column_type(0) == Type.Integer
372 assert statement.column_int(0) == 3
373 assert statement.column_type(1) == Type.String
374 assert statement.column_string(1) == "three"
375 assert statement.column_type(2) == Type.Null
377 assert statement.fetch()
378 assert statement.column_type(0) == Type.Integer
379 assert statement.column_int(0) == 4
380 assert statement.column_type(1) == Type.Bytes
381 assert statement.column_bytes(1) == b"\x89"
382 assert statement.column_type(2) == Type.Null
384 assert not statement.fetch()
386 Path("the.db").rm(force=True)
388test advanced():
389 Path("the.db").rm(force=True)
391 database = Database(Path("the.db"))
393 database.execute("CREATE TABLE tab(foo, bar, baz)")
394 database.execute("INSERT INTO tab VALUES(1, 'one', null)")
395 database.execute("INSERT INTO tab VALUES(2, 2.2, 'two')")
397 statement = database.prepare("INSERT INTO tab VALUES(?, ?, ?)")
399 statement.bind_int(1, 3)
400 statement.bind_string(2, "three")
401 statement.bind_int(3, 333)
402 statement.execute()
404 statement.bind_int(1, 4)
405 statement.bind_string(2, "four")
406 statement.bind_null(3)
407 statement.execute()
409 statement.bind_int(1, 5)
410 statement.bind_bytes(2, b"\x12\x34")
411 statement.bind_null(3)
412 statement.execute()
414 statement = database.prepare("SELECT * FROM tab WHERE foo >= ? ORDER BY foo")
415 statement.bind_int(1, 2)
417 assert statement.fetch()
418 assert statement.column_type(0) == Type.Integer
419 assert statement.column_int(0) == 2
420 assert statement.column_type(1) == Type.Float
421 assert statement.column_float(1) == 2.2
422 assert statement.column_type(2) == Type.String
423 assert statement.column_string(2) == "two"
425 assert statement.fetch()
426 assert statement.column_type(0) == Type.Integer
427 assert statement.column_int(0) == 3
428 assert statement.column_type(1) == Type.String
429 assert statement.column_string(1) == "three"
430 assert statement.column_type(2) == Type.Integer
431 assert statement.column_int(2) == 333
433 assert statement.fetch()
434 assert statement.column_type(0) == Type.Integer
435 assert statement.column_int(0) == 4
436 assert statement.column_type(1) == Type.String
437 assert statement.column_string(1) == "four"
438 assert statement.column_type(2) == Type.Null
440 assert statement.fetch()
441 assert statement.column_type(0) == Type.Integer
442 assert statement.column_int(0) == 5
443 assert statement.column_type(1) == Type.Bytes
444 assert statement.column_bytes(1) == b"\x12\x34"
445 assert statement.column_type(2) == Type.Null
447 assert not statement.fetch()
449 Path("the.db").rm(force=True)
451test try_to_create_existing_table():
452 Path("the.db").rm(force=True)
454 database = Database(Path("the.db"))
456 database.execute("CREATE TABLE tab(foo, bar, baz)")
457 message: string? = None
459 try:
460 database.execute("CREATE TABLE tab(foo, bar, baz)")
461 except SqlError as e:
462 message = e.message
464 assert message == "table tab already exists"
466 Path("the.db").rm(force=True)
468test prepare_bad_statement():
469 Path("the.db").rm(force=True)
471 database = Database(Path("the.db"))
473 try:
474 message = ""
475 database.prepare("FOOBAR 123")
476 except SqlError as e:
477 message = e.message
479 assert message == "near \"FOOBAR\": syntax error"
481 Path("the.db").rm(force=True)
483test bad_column():
484 Path("the.db").rm(force=True)
486 database = Database(Path("the.db"))
488 database.execute("CREATE TABLE tab(foo, bar, baz)")
489 database.execute("INSERT INTO tab VALUES(1, 'one', null)")
491 statement = database.prepare("SELECT * FROM tab")
493 assert statement.fetch()
494 assert statement.column_type(2) == Type.Null
496 try:
497 message = ""
498 statement.column_type(3)
499 except SqlError as e:
500 message = e.message
502 assert message == "bad column 3"
504 try:
505 message = ""
506 statement.column_int(4)
507 except SqlError as e:
508 message = e.message
510 assert message == "bad column 4"
512 try:
513 message = ""
514 statement.column_string(10)
515 except SqlError as e:
516 message = e.message
518 assert message == "bad column 10"
520 try:
521 message = ""
522 statement.column_float(3)
523 except SqlError as e:
524 message = e.message
526 assert message == "bad column 3"
528 try:
529 message = ""
530 statement.column_bytes(3)
531 except SqlError as e:
532 message = e.message
534 assert message == "bad column 3"
536 Path("the.db").rm(force=True)
538test column_value_string():
539 Path("the.db").rm(force=True)
541 database = Database(Path("the.db"))
543 database.execute("CREATE TABLE tab(foo, bar, baz, bak, bat)")
544 database.execute("INSERT INTO tab VALUES(1, 'one', null, 1.0, X'012345')")
546 statement = database.prepare("SELECT * FROM tab")
548 assert statement.fetch()
549 assert statement.column_value_string(0) == "1"
550 assert statement.column_value_string(1) == "\"one\""
551 assert statement.column_value_string(2) == "null"
552 assert statement.column_value_string(3) == "1.000000"
553 assert statement.column_value_string(4) == "b\"\\x01#E\""
555test statement():
556 Path("the.db").rm(force=True)
558 database = Database(Path("the.db"))
560 database.execute("CREATE TABLE tab(x)")
561 database.execute("INSERT INTO tab VALUES(5)")
562 database.execute("INSERT INTO tab VALUES(6)")
563 database.execute("INSERT INTO tab VALUES(7)")
565 statement = database.prepare("SELECT * FROM tab WHERE x >= ? ORDER BY x")
567 # Get all.
568 statement.bind_int(1, 2)
569 assert statement.fetch()
570 assert statement.column_int(0) == 5
571 assert statement.fetch()
572 assert statement.column_int(0) == 6
573 assert statement.fetch()
574 assert not statement.fetch()
576 # Get one then reset.
577 statement.bind_int(1, 2)
578 assert statement.fetch()
579 assert statement.column_int(0) == 5
580 statement.reset()
581 assert statement.fetch()
582 assert statement.column_int(0) == 5
584 # Bind before reset.
585 try:
586 ok = False
587 statement.bind_int(1, 2)
588 except SqlError:
589 ok = True
591 assert ok
592 statement.reset()
594 # Fetch from beginning once all rows read.
595 statement.bind_int(1, 2)
596 assert statement.fetch()
597 assert statement.column_int(0) == 5
598 assert statement.fetch()
599 assert statement.fetch()
600 assert not statement.fetch()
601 assert statement.fetch()
602 assert statement.column_int(0) == 5
604 Path("the.db").rm(force=True)
606test two_database_connections():
607 Path("the.db").rm(force=True)
609 database_1 = Database(Path("the.db"))
610 database_2 = Database(Path("the.db"))
612 database_1.execute("CREATE TABLE tab(x)")
613 database_1.execute("INSERT INTO tab VALUES(5)")
614 database_1.execute("INSERT INTO tab VALUES(6)")
616 statement_1 = database_1.prepare("SELECT * FROM tab WHERE x = ?")
617 statement_2 = database_2.prepare("SELECT * FROM tab WHERE x = ?")
619 # Fetch interleaved.
620 statement_1.bind_int(1, 5)
621 statement_2.bind_int(1, 6)
622 assert statement_2.fetch()
623 assert statement_2.column_int(0) == 6
624 assert statement_1.fetch()
625 assert statement_1.column_int(0) == 5
626 assert not statement_1.fetch()
627 assert not statement_2.fetch()
629 Path("the.db").rm(force=True)
631test string():
632 path = Path("the.db")
633 path.rm(force=True)
634 database = Database(path)
635 database.execute("CREATE TABLE tab(x)")
636 statement = database.prepare("INSERT INTO tab VALUES(?)")
637 statement.bind_string(1, "📦")
638 statement.execute()
639 statement = database.prepare("SELECT * FROM tab")
640 assert statement.fetch()
641 assert statement.column_string(0) == "📦"