| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- # Copyright 2016 The Brotli Authors. All rights reserved.
- #
- # Distributed under MIT license.
- # See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
- import functools
- import os
- import unittest
- from . import _test_utils
- import brotli
- def _get_original_name(test_data):
- return test_data.split('.compressed')[0]
- class TestDecompressor(_test_utils.TestCase):
- CHUNK_SIZE = 1
- def setUp(self):
- self.decompressor = brotli.Decompressor()
- def tearDown(self):
- self.decompressor = None
- def _check_decompression(self, test_data):
- # Verify decompression matches the original.
- temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
- original = _get_original_name(test_data)
- self.assertFilesMatch(temp_uncompressed, original)
- def _decompress(self, test_data):
- temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
- with open(temp_uncompressed, 'wb') as out_file:
- with open(test_data, 'rb') as in_file:
- read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE)
- for data in iter(read_chunk, b''):
- out_file.write(self.decompressor.process(data))
- self.assertTrue(self.decompressor.is_finished())
- def _decompress_with_limit(self, test_data, max_output_length):
- temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
- with open(temp_uncompressed, 'wb') as out_file:
- with open(test_data, 'rb') as in_file:
- chunk_iter = iter(functools.partial(in_file.read, 10 * 1024), b'')
- while not self.decompressor.is_finished():
- data = b''
- if self.decompressor.can_accept_more_data():
- data = next(chunk_iter, b'')
- decompressed_data = self.decompressor.process(data, max_output_length=max_output_length)
- self.assertTrue(len(decompressed_data) <= max_output_length)
- out_file.write(decompressed_data)
- self.assertTrue(next(chunk_iter, None) == None)
- def _test_decompress(self, test_data):
- self._decompress(test_data)
- self._check_decompression(test_data)
- def _test_decompress_with_limit(self, test_data):
- self._decompress_with_limit(test_data, max_output_length=20)
- self._check_decompression(test_data)
- def test_too_much_input(self):
- with open(os.path.join(_test_utils.TESTDATA_DIR, "zerosukkanooa.compressed"), 'rb') as in_file:
- compressed = in_file.read()
- self.decompressor.process(compressed[:-1], max_output_length=1)
- # the following assertion checks whether the test setup is correct
- self.assertTrue(not self.decompressor.can_accept_more_data())
- with self.assertRaises(brotli.error):
- self.decompressor.process(compressed[-1:])
- def test_changing_limit(self):
- test_data = os.path.join(_test_utils.TESTDATA_DIR, "zerosukkanooa.compressed")
- temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
- with open(temp_uncompressed, 'wb') as out_file:
- with open(test_data, 'rb') as in_file:
- compressed = in_file.read()
- uncompressed = self.decompressor.process(compressed[:-1], max_output_length=1)
- self.assertTrue(len(uncompressed) <= 1)
- out_file.write(uncompressed)
- while not self.decompressor.can_accept_more_data():
- out_file.write(self.decompressor.process(b''))
- out_file.write(self.decompressor.process(compressed[-1:]))
- self._check_decompression(test_data)
- def test_garbage_appended(self):
- with self.assertRaises(brotli.error):
- self.decompressor.process(brotli.compress(b'a') + b'a')
- def test_already_finished(self):
- self.decompressor.process(brotli.compress(b'a'))
- with self.assertRaises(brotli.error):
- self.decompressor.process(b'a')
- _test_utils.generate_test_methods(TestDecompressor, for_decompression=True)
- if __name__ == '__main__':
- unittest.main()
|