Coverage for src/buffered_writer.mys : 0%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from . import Writer
3class BufferedWriter(Writer):
4 """Efficient writing of any amount of bytes. Small writes are using an
5 internal buffer for fewer calls to the underlying writer, while
6 big writes are written directly.
8 """
10 _writer: Writer
11 _buffer: bytes
12 _size: i64
14 func __init__(self, writer: Writer, size: i64 = 64):
15 """Create a buffered writer that writes to given writer. Size is the
16 size of the internal buffer.
18 """
20 self._writer = writer
21 self._buffer = bytes(size)
22 self._size = 0
24 func write(self, data: bytes) -> i64:
25 """Write given data. Never blocks.
27 """
29 size = data.length()
31 if size <= self._buffer.length():
32 if size <= self._buffer.length() - self._size:
33 self._buffer.copy_into(data, 0, size, self._size)
34 self._size += size
35 else:
36 self.flush()
38 if self._size == 0:
39 self._buffer.copy_into(data, 0, size, 0)
40 self._size = size
41 else:
42 size = 0
43 else:
44 self.flush()
46 if self._size == 0:
47 size = self._writer.write(data)
48 else:
49 size = 0
51 return size
53 func flush(self):
54 """Write all buffered data to the underlying writer.
56 """
58 if self._size > 0:
59 self._size -= self._writer.write_from(self._buffer, 0, self._size)