Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from fiber import Fiber
2from fiber import current
3from fiber import suspend
4from .path import Path
5from .subprocess import run
6from .utils import to_utf8
8c"""source-before-namespace
9#include <fstream>
10#include <filesystem>
11#include <cstdlib>
12#include <sys/types.h>
13#include <sys/stat.h>
14#include <unistd.h>
15#include <string.h>
16#include <stdio.h>
18extern char **environ;
20static inline void setenv_wrapper(const char *name_p, const char *value_p)
21{
22 setenv(name_p, value_p, 1);
23}
25static inline void unsetenv_wrapper(const char *name_p)
26{
27 unsetenv(name_p);
28}
30static inline i64 geteuid_wrapper()
31{
32 return geteuid();
33}
35static inline struct passwd *getpwuid_wrapper(i64 uid)
36{
37 return getpwuid(uid);
38}
40static inline struct passwd *getpwnam_wrapper(const char *name_p)
41{
42 return getpwnam(name_p);
43}
44"""
46class Stdin:
48 func get(self) -> i32:
49 """Returns the next byte from standard input (stdin), or -1 if end of
50 input is reached.
52 """
54 value: i32 = 0
56 c"""
57 value = getchar();
59 if (value == EOF) {
60 value = -1;
61 }
62 """
64 return value
66 func read(self) -> bytes:
67 """Reads from standard input until end of file is reached.
69 """
71 data = b""
73 while True:
74 value = self.get()
76 if value == -1:
77 break
79 data += u8(value)
81 return data
83class Stdout:
85 func put(self, value: u8):
86 """Writes given byte to standard output (stdout).
88 """
90 c"putchar(value);"
92 func write(self, data: bytes):
93 """Writes given bytes to standard output (stdout).
95 """
97 c"fwrite(data.m_bytes->data(), 1, data.m_bytes->size(), stdout);"
99 func write(self, data: bytes, offset: i64, count: i64):
100 """Writes given bytes to standard output (stdout).
102 """
104 length = data.length()
105 offset = min(offset, length)
106 count = min(count, length - offset)
108 if count > 0:
109 c"fwrite(&data.m_bytes->data()[offset], 1, count, stdout);"
111 func flush(self):
112 """Flush output buffer.
114 """
116 c"fflush(stdout);"
118STDIN: Stdin = Stdin()
119STDOUT: Stdout = Stdout()
121_ENV: {string: string} = _load_env()
123func _load_env() -> {string: string}:
124 variables: {string: string} = {}
125 line: string? = None
127 c"""
128 char **envvar = environ;
130 while (*envvar != NULL) {
131 line = String(*envvar);
132 """
134 name, separator, value = line.partition("=")
136 if separator == "=":
137 variables[name] = value
139 c"""
140 envvar++;
141 }
142 """
144 return variables
146class OsError(Error):
147 message: string
149c"""
150static void read_data(BinaryFile *binary_file_p);
152static void on_open_complete(uv_fs_t *request_p)
153{
154 BinaryFile *binary_file_p = (BinaryFile *)(request_p->data);
156 uv_fs_req_cleanup(request_p);
157 binary_file_p->m_file = request_p->result;
158 resume(binary_file_p->_fiber);
159}
161static void on_read_complete(uv_fs_t *request_p)
162{
163 BinaryFile *binary_file_p = (BinaryFile *)(request_p->data);
165 uv_fs_req_cleanup(request_p);
167 if (request_p->result < 0) {
168 fprintf(stderr, "error: %s\n", uv_strerror(request_p->result));
169 } else if (request_p->result == 0) {
170 resume(binary_file_p->_fiber);
171 } else {
172 for (int i = 0; i < request_p->result; i++) {
173 binary_file_p->_data += binary_file_p->m_data[i];
174 }
176 read_data(binary_file_p);
177 }
178}
180static void read_data(BinaryFile *binary_file_p)
181{
182 uv_fs_read(uv_default_loop(),
183 &binary_file_p->m_request,
184 binary_file_p->m_file,
185 &binary_file_p->m_buf,
186 1,
187 -1,
188 on_read_complete);
189}
191static void on_write_complete(uv_fs_t *request_p)
192{
193 BinaryFile *binary_file_p = (BinaryFile *)(request_p->data);
195 uv_fs_req_cleanup(request_p);
196 resume(binary_file_p->_fiber);
197}
198"""
200class BinaryFile:
201 """A binary file.
203 """
205 c"uv_fs_t m_request;"
206 c"uv_buf_t m_buf;"
207 c"char m_data[256];"
208 c"uv_file m_file;"
209 _data: bytes
210 _fiber: Fiber?
212 func __init__(self, path: Path, mode: string = "r"):
213 """Opens given file. Mode can be `r` or `w`.
215 """
217 path_utf8 = to_utf8(str(path))
218 self._fiber = current()
220 c"""
221 m_request.data = this;
222 int flags;
224 if (mode == String("r")) {
225 flags = O_RDONLY;
226 } else if (mode == String("w")) {
227 flags = O_WRONLY | O_CREAT | O_TRUNC;
228 } else {
229 flags = 0;
230 }
232 uv_fs_open(uv_default_loop(),
233 &m_request,
234 (const char *)path_utf8.m_bytes->data(),
235 flags,
236 0664,
237 on_open_complete);
238 """
240 suspend()
241 message: string? = None
243 c"""
244 if (m_file < 0) {
245 message = String(uv_strerror(m_file));
246 }
247 """
249 if message is not None:
250 raise OsError(f"failed to open '{path}': {message}")
252 self._fiber = None
254 func __del__(self):
255 self.close()
257 func close(self):
258 """Close the file. The file is closed automatically in the destructor
259 if not already closed.
261 """
263 c"""
264 if (m_file != -1) {
265 uv_fs_close(uv_default_loop(), &m_request, m_file, NULL);
266 m_file = -1;
267 }
268 """
270 func read(self) -> bytes:
271 """Read from the file.
273 """
275 if self._fiber is not None:
276 raise OsError("Only one fiber can use the file.")
278 self._fiber = current()
279 self._data = b""
281 c"""
282 m_buf = uv_buf_init(m_data, sizeof(m_data));
283 uv_fs_read(uv_default_loop(),
284 &m_request,
285 m_file,
286 &m_buf,
287 1,
288 -1,
289 on_read_complete);
290 """
292 suspend()
293 self._fiber = None
295 return self._data
297 func write(self, data: bytes):
298 """Write to the file.
300 """
302 if self._fiber is not None:
303 raise OsError("Only one fiber can use the file.")
305 self._fiber = current()
307 c"""
308 m_buf = uv_buf_init((char *)data.m_bytes->data(),
309 data.m_bytes->size());
310 uv_fs_write(uv_default_loop(),
311 &m_request,
312 m_file,
313 &m_buf,
314 1,
315 -1,
316 on_write_complete);
317 """
319 suspend()
320 self._fiber = None
322class TextFile:
323 """A text file.
325 """
327 _binary_file: BinaryFile
329 func __init__(self, path: Path, mode: string = "r"):
330 """Opens given file. Mode can be `r` or `w`.
332 """
334 self._binary_file = BinaryFile(path, mode)
336 func close(self):
337 """Close the file. The file is closed automatically in the destructor
338 if not already closed.
340 """
342 self._binary_file.close()
344 func read(self) -> string:
345 """Read from the file.
347 """
349 return string(self._binary_file.read())
351 func write(self, data: string):
352 """Write to the file.
354 """
356 self._binary_file.write(data.to_utf8())
358func which(executable: string) -> Path:
359 """Locate given executable in the user's path.
361 """
363 for path in getenv("PATH").split(":"):
364 full_path = Path(path).join(executable)
366 if full_path.exists():
367 return full_path
369 raise OsError(f"Executable '{executable}' not found.")
371func cwd() -> Path:
372 path: Path? = None
374 c"""
375 path = mys::make_shared<path::Path>(String(std::filesystem::current_path()));
376 """
378 return path
380func env() -> {string: string}:
381 """Returns all environment variables.
383 """
385 return _ENV
387func getenv(name: string, default: string? = None) -> string?:
388 """Get given environment variable. Returns given default value if the
389 environment variable is not set.
391 """
393 return _ENV.get(name, default)
395func setenv(name: string, value: string):
396 """Set given environment variable.
398 """
400 name_utf8 = to_utf8(name)
401 value_utf8 = to_utf8(value)
403 c"""
404 setenv_wrapper((const char *)name_utf8.m_bytes->data(),
405 (const char *)value_utf8.m_bytes->data());
406 """
408 _ENV[name] = value
410func unsetenv(name: string):
411 """Unset given environment variable.
413 """
415 name_utf8 = to_utf8(name)
417 c"""
418 unsetenv_wrapper((const char *)name_utf8.m_bytes->data());
419 """
421 _ENV.pop(name, None)
423func tar(archive: Path,
424 extract: bool = False,
425 create: bool = False,
426 strip_components: i64 = 0,
427 output_directory: Path? = None,
428 path: Path? = None):
429 if extract:
430 if path is None:
431 path = Path("")
433 if output_directory is None:
434 output_directory_option = ""
435 else:
436 output_directory_option = f"-C {output_directory}"
438 run(f"tar --strip-components={strip_components} -x -f {archive} "
439 f"{output_directory_option} {path}")
440 elif create:
441 run(f"tar czf {archive} {path}")
442 else:
443 raise OsError("No tar operation given.")
445func nproc(all: bool = False, ignore: i64 = 0) -> i64:
446 """Get the number of processing units available.
448 """
450 count = 0
452 if all:
453 c"count = sysconf(_SC_NPROCESSORS_CONF);"
454 else:
455 c"count = sysconf(_SC_NPROCESSORS_ONLN);"
457 if count == -1:
458 raise OsError("Failed to get number of processing units.")
460 count -= ignore
462 return max(1, count)
464func geteuid() -> i64:
465 """Get the effective user ID of the calling process.
467 """
469 uid = -1
471 c"uid = geteuid_wrapper();"
473 return uid
475class Passwd:
476 name: string
477 uid: i64
478 gid: i64
479 dir: string
480 shell: string
482func getpasswd(uid: i64) -> Passwd:
483 """Get given user's passwd entry.
485 """
487 name: string? = None
488 gid: i64 = -1
489 dir: string? = None
490 shell: string? = None
492 c"""
493 struct passwd *passwd_p = getpwuid_wrapper(uid);
495 if (passwd_p != NULL) {
496 name = String(passwd_p->pw_name);
497 uid = passwd_p->pw_uid;
498 gid = passwd_p->pw_gid;
499 dir = String(passwd_p->pw_dir);
500 shell = String(passwd_p->pw_shell);
501 }
502 """
504 if name is None:
505 raise OsError(f"Failed to get passwd for UID {uid}")
507 return Passwd(name, uid, gid, dir, shell)
509func getpasswd(username: string) -> Passwd:
510 """Get given user's passwd entry.
512 """
514 name: string? = None
515 uid: i64 = -1
516 gid: i64 = -1
517 dir: string? = None
518 shell: string? = None
519 username_utf8 = to_utf8(username)
521 c"""
522 struct passwd *passwd_p =
523 getpwnam_wrapper((const char *)username_utf8.m_bytes->data());
525 if (passwd_p != NULL) {
526 name = String(passwd_p->pw_name);
527 uid = passwd_p->pw_uid;
528 gid = passwd_p->pw_gid;
529 dir = String(passwd_p->pw_dir);
530 shell = String(passwd_p->pw_shell);
531 }
532 """
534 if name is None:
535 raise OsError(f"Failed to get passwd for user {username}")
537 return Passwd(name, uid, gid, dir, shell)
539func whoami() -> string:
540 """Get effective user name.
542 """
544 return getpasswd(geteuid()).name
546test nproc():
547 assert nproc() > 0
548 assert nproc(all=True) >= nproc()
549 assert nproc(ignore=1) in [1, nproc() - 1]
550 assert nproc(ignore=nproc()) == 1
552test env_get_pwd():
553 assert "/" in getenv("PWD")
555test env_get_set_unset():
556 assert getenv("FOOBAR", "apple") == "apple"
558 setenv("FOOBAR", "banan")
559 assert getenv("FOOBAR") == "banan"
561 unsetenv("FOOBAR")
562 assert getenv("FOOBAR", None) is None
564test all():
565 assert "PWD" in env()
567test cwd():
568 directory = Path("cmd")
569 directory.mkdir(exists_ok=True)
570 directory.cd()
571 assert cwd().name() == directory.name()
572 Path("..").cd()
573 directory.rm()
575test which():
576 try:
577 which("mys-xyz123")
578 assert False
579 except OsError:
580 pass
582 assert "ls" in which("ls").to_string()
584test binary_file():
585 bf = BinaryFile(Path("README.rst"))
586 data = bf.read()
587 assert data.length() > 0
588 bf.close()
590test binary_file_not_found():
591 filename = Path("test_binary_file_not_found.bin")
592 filename.rm(force=True)
594 try:
595 ok = False
596 BinaryFile(filename)
597 except OsError as e:
598 ok = True
600 assert ok
602 filename.rm(force=True)
604test create_binary_file():
605 filename = Path("test_create_binary_file.bin")
606 filename.rm(force=True)
608 bf = BinaryFile(filename, "w")
609 bf.close()
611 bf = BinaryFile(filename)
612 assert bf.read() == b""
614 filename.rm(force=True)
616test write_binary_file():
617 filename = Path("test_write_binary_file.bin")
618 filename.rm(force=True)
620 bf = BinaryFile(filename, "w")
621 bf.write(b"123")
622 bf.close()
624 bf = BinaryFile(filename)
625 assert bf.read() == b"123"
626 bf.close()
628 filename.rm(force=True)
630test read_binary_file_with_slash_slash():
631 filename = Path("test_read_binary_file_with_slash_slash.bin")
632 filename.rm(force=True)
634 bf = BinaryFile(filename, "w")
635 bf.write(b"123")
636 bf.close()
638 bf = BinaryFile(Path("src///..//test_read_binary_file_with_slash_slash.bin"))
639 assert bf.read() == b"123"
640 bf.close()
642 filename.rm(force=True)
644test text_file():
645 tf = TextFile(Path("README.rst"))
646 data = tf.read()
647 assert "Os" in data
648 tf.close()
650test write_text_file():
651 filename = Path("test_write_text_file.txt")
652 filename.rm(force=True)
654 tf = TextFile(filename, "w")
655 tf.write("123")
656 tf.close()
658 tf = TextFile(filename)
659 assert tf.read() == "123"
660 tf.close()
662 filename.rm(force=True)
664test text_file_does_not_exist():
665 try:
666 message = ""
667 TextFile(Path("flapp.flopp"))
668 except OsError as e:
669 message = e.message
671 assert message.starts_with("failed to open 'flapp.flopp': ")
673test tar():
674 directory = Path("test_tar")
675 archive = Path("test_tar.tar.gz")
677 directory.rm(recursive=True, force=True)
678 archive.rm(recursive=True, force=True)
680 # Create.
681 directory.mkdir()
682 BinaryFile(Path("test_tar/hello.txt"), "w").write(b"hi\n")
683 tar(archive, create=True, path=directory)
685 # Extract.
686 directory.rm(recursive=True, force=True)
687 tar(archive, extract=True)
688 assert BinaryFile(Path("test_tar/hello.txt")).read() == b"hi\n"
690 directory.rm(recursive=True, force=True)
691 archive.rm(recursive=True, force=True)
693test geteuid():
694 assert geteuid() >= 0
696test getpasswd():
697 passwd_1 = getpasswd(geteuid())
698 assert passwd_1.uid >= 0
699 assert passwd_1.gid >= 0
701 assert getpasswd(passwd_1.name) == passwd_1
703test whoami():
704 whoami()