Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from . import Reader 

2 

3class BufferedReader(Reader): 

4 """Efficient reading of any amount of bytes. Small reads are using an 

5 internal buffer for fewer calls to the underlying reader, while 

6 big reads are read directly into given destination buffer. 

7 

8 """ 

9 

10 _reader: Reader 

11 _buffer: bytes 

12 _offset: i64 

13 _size: i64 

14 

15 func __init__(self, reader: Reader, size: i64 = 64): 

16 """Create a buffered reader that reads from given reader. Size is the 

17 size of the internal buffer. 

18 

19 """ 

20 

21 self._reader = reader 

22 self._buffer = bytes(size) 

23 self._offset = 0 

24 self._size = 0 

25 

26 func clear(self): 

27 """Clear the buffer. 

28 

29 """ 

30 

31 self._offset = 0 

32 self._size = 0 

33 

34 func read(self, size: i64) -> bytes: 

35 """Read given number of bytes. Always returns size number of bytes, 

36 unless the connection was closed, in which case the remaining 

37 number of bytes is returned. 

38 

39 """ 

40 

41 value = bytes(size) 

42 offset = 0 

43 

44 if self._size > 0: 

45 if size <= self._size: 

46 value.copy_into(self._buffer, self._offset, self._offset + size, 0) 

47 self._offset += size 

48 self._size -= size 

49 

50 return value 

51 else: 

52 value.copy_into(self._buffer, 

53 self._offset, 

54 self._offset + self._size, 

55 0) 

56 offset = self._size 

57 self._offset = 0 

58 self._size = 0 

59 

60 left = size - offset 

61 

62 if left < self._buffer.length(): 

63 read_offset = 0 

64 

65 while read_offset < left: 

66 read = self._reader.try_read_into( 

67 self._buffer, 

68 read_offset, 

69 self._buffer.length() - read_offset) 

70 

71 if read == 0: 

72 value.copy_into(self._buffer, 0, read_offset, offset) 

73 value.resize(i64(offset + read_offset)) 

74 

75 return value 

76 

77 read_offset += read 

78 

79 self._offset = left 

80 self._size = read_offset - self._offset 

81 value.copy_into(self._buffer, 0, self._offset, offset) 

82 else: 

83 read = self._reader.read_into(value, offset, size - offset) 

84 value.resize(i64(offset + read)) 

85 

86 return value 

87 

88 func read_until(self, pattern: bytes, keep_pattern: bool = True) -> bytes?: 

89 """Read until given pattern is found. Returns None if the pattern was 

90 not found before the reader was closed. 

91 

92 """ 

93 

94 value = b"" 

95 

96 while not value.ends_with(pattern): 

97 data = self.read(1) 

98 

99 if data.length() == 0: 

100 return None 

101 

102 value += data 

103 

104 if not keep_pattern: 

105 value.resize(value.length() - pattern.length()) 

106 

107 return value 

108 

109class _Reader(Reader): 

110 data: bytes 

111 _offset: i64 

112 

113 func _readable(self, size: i64) -> i64: 

114 left = self.data.length() - self._offset 

115 

116 if size > left: 

117 size = left 

118 

119 return size 

120 

121 func read(self, size: i64) -> bytes: 

122 value = b"" 

123 size = self._readable(size) 

124 

125 for i in range(i64(size)): 

126 value += self.data[i64(self._offset) + i] 

127 

128 self._offset += size 

129 

130 return value 

131 

132 func read_into(self, data: bytes, start: i64, size: i64) -> i64: 

133 size = self._readable(size) 

134 

135 for i in range(i64(size)): 

136 data[i64(start) + i] = self.read(1)[0] 

137 

138 return size 

139 

140 func try_read_into(self, data: bytes, start: i64, size: i64) -> i64: 

141 return self.read_into(data, start, self._readable(size)) 

142 

143test mixed(): 

144 foo = BufferedReader( 

145 _Reader( 

146 b"012345678901234567890123456789012345678901234567890123456789" 

147 b"012345678901234567890123456789")) 

148 assert foo.read(1) == b"0" 

149 assert foo.read(2) == b"12" 

150 assert foo.read_until(b"5") == b"345" 

151 assert foo.read_until(b"6", keep_pattern=False) == b"" 

152 assert foo.read_until(b"8", keep_pattern=True) == b"78" 

153 assert foo.read(70) == ( 

154 b"9012345678901234567890123456789012345678901234567890123456789012345678") 

155 assert foo.read(1) == b"9" 

156 assert foo.read(100) == b"0123456789" 

157 assert foo.read(1) == b"" 

158 

159test read_until_error(): 

160 foo = BufferedReader(_Reader(b"0123456789")) 

161 assert foo.read_until(b"g") is None 

162 

163test read_into(): 

164 reader = BufferedReader( 

165 _Reader( 

166 b"\x12\x34\x56\x78\x9a\xbc\xde\x01\x23\x45\x67\x89\xab\xcd\xef")) 

167 data = b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" 

168 assert reader.read_into(data, 0, 1) == 1 

169 assert data == b"\x12\x00\x00\x00\x00\x00\x00\x00\x00\x00" 

170 assert reader.read_into(data, 1, 3) == 3 

171 assert data == b"\x12\x34\x56\x78\x00\x00\x00\x00\x00\x00" 

172 assert reader.read_into(data, 3, 2) == 2 

173 assert data == b"\x12\x34\x56\x9a\xbc\x00\x00\x00\x00\x00" 

174 assert reader.try_read_into(data, 9, 1) == 1 

175 assert data == b"\x12\x34\x56\x9a\xbc\x00\x00\x00\x00\xde" 

176 assert reader.read_into(data, 1, 10) == 8 

177 assert data == b"\x12\x01\x23\x45\x67\x89\xab\xcd\xef\xde" 

178 

179test clear(): 

180 foo = BufferedReader(_Reader(b"0123456789"), size=4) 

181 assert foo.read(1) == b"0" 

182 assert foo.read(1) == b"1" 

183 foo.clear() 

184 assert foo.read(1) == b"4" 

185 assert foo.read(6) == b"56789" 

186 assert foo.read(1) == b""