decompressor_test.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. # Copyright 2016 The Brotli Authors. All rights reserved.
  2. #
  3. # Distributed under MIT license.
  4. # See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
  5. import functools
  6. import os
  7. import unittest
  8. from . import _test_utils
  9. import brotli
  10. def _get_original_name(test_data):
  11. return test_data.split('.compressed')[0]
  12. class TestDecompressor(_test_utils.TestCase):
  13. CHUNK_SIZE = 1
  14. def setUp(self):
  15. self.decompressor = brotli.Decompressor()
  16. def tearDown(self):
  17. self.decompressor = None
  18. def _check_decompression(self, test_data):
  19. # Verify decompression matches the original.
  20. temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
  21. original = _get_original_name(test_data)
  22. self.assertFilesMatch(temp_uncompressed, original)
  23. def _decompress(self, test_data):
  24. temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
  25. with open(temp_uncompressed, 'wb') as out_file:
  26. with open(test_data, 'rb') as in_file:
  27. read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE)
  28. for data in iter(read_chunk, b''):
  29. out_file.write(self.decompressor.process(data))
  30. self.assertTrue(self.decompressor.is_finished())
  31. def _decompress_with_limit(self, test_data, max_output_length):
  32. temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
  33. with open(temp_uncompressed, 'wb') as out_file:
  34. with open(test_data, 'rb') as in_file:
  35. chunk_iter = iter(functools.partial(in_file.read, 10 * 1024), b'')
  36. while not self.decompressor.is_finished():
  37. data = b''
  38. if self.decompressor.can_accept_more_data():
  39. data = next(chunk_iter, b'')
  40. decompressed_data = self.decompressor.process(data, max_output_length=max_output_length)
  41. self.assertTrue(len(decompressed_data) <= max_output_length)
  42. out_file.write(decompressed_data)
  43. self.assertTrue(next(chunk_iter, None) == None)
  44. def _test_decompress(self, test_data):
  45. self._decompress(test_data)
  46. self._check_decompression(test_data)
  47. def _test_decompress_with_limit(self, test_data):
  48. self._decompress_with_limit(test_data, max_output_length=20)
  49. self._check_decompression(test_data)
  50. def test_too_much_input(self):
  51. with open(os.path.join(_test_utils.TESTDATA_DIR, "zerosukkanooa.compressed"), 'rb') as in_file:
  52. compressed = in_file.read()
  53. self.decompressor.process(compressed[:-1], max_output_length=1)
  54. # the following assertion checks whether the test setup is correct
  55. self.assertTrue(not self.decompressor.can_accept_more_data())
  56. with self.assertRaises(brotli.error):
  57. self.decompressor.process(compressed[-1:])
  58. def test_changing_limit(self):
  59. test_data = os.path.join(_test_utils.TESTDATA_DIR, "zerosukkanooa.compressed")
  60. temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
  61. with open(temp_uncompressed, 'wb') as out_file:
  62. with open(test_data, 'rb') as in_file:
  63. compressed = in_file.read()
  64. uncompressed = self.decompressor.process(compressed[:-1], max_output_length=1)
  65. self.assertTrue(len(uncompressed) <= 1)
  66. out_file.write(uncompressed)
  67. while not self.decompressor.can_accept_more_data():
  68. out_file.write(self.decompressor.process(b''))
  69. out_file.write(self.decompressor.process(compressed[-1:]))
  70. self._check_decompression(test_data)
  71. def test_garbage_appended(self):
  72. with self.assertRaises(brotli.error):
  73. self.decompressor.process(brotli.compress(b'a') + b'a')
  74. def test_already_finished(self):
  75. self.decompressor.process(brotli.compress(b'a'))
  76. with self.assertRaises(brotli.error):
  77. self.decompressor.process(b'a')
  78. _test_utils.generate_test_methods(TestDecompressor, for_decompression=True)
  79. if __name__ == '__main__':
  80. unittest.main()