compressor_test.py 3.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. # Copyright 2016 The Brotli Authors. All rights reserved.
  2. #
  3. # Distributed under MIT license.
  4. # See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
  5. import functools
  6. import unittest
  7. from . import _test_utils
  8. import brotli
  9. # Do not inherit from TestCase here to ensure that test methods
  10. # are not run automatically and instead are run as part of a specific
  11. # configuration below.
  12. class _TestCompressor(object):
  13. CHUNK_SIZE = 2048
  14. def tearDown(self):
  15. self.compressor = None
  16. def _check_decompression(self, test_data):
  17. # Write decompression to temp file and verify it matches the original.
  18. temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
  19. temp_compressed = _test_utils.get_temp_compressed_name(test_data)
  20. original = test_data
  21. with open(temp_uncompressed, 'wb') as out_file:
  22. with open(temp_compressed, 'rb') as in_file:
  23. out_file.write(brotli.decompress(in_file.read()))
  24. self.assertFilesMatch(temp_uncompressed, original)
  25. def _test_single_process(self, test_data):
  26. # Write single-shot compression to temp file.
  27. temp_compressed = _test_utils.get_temp_compressed_name(test_data)
  28. with open(temp_compressed, 'wb') as out_file:
  29. with open(test_data, 'rb') as in_file:
  30. out_file.write(self.compressor.process(in_file.read()))
  31. out_file.write(self.compressor.finish())
  32. self._check_decompression(test_data)
  33. def _test_multiple_process(self, test_data):
  34. # Write chunked compression to temp file.
  35. temp_compressed = _test_utils.get_temp_compressed_name(test_data)
  36. with open(temp_compressed, 'wb') as out_file:
  37. with open(test_data, 'rb') as in_file:
  38. read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE)
  39. for data in iter(read_chunk, b''):
  40. out_file.write(self.compressor.process(data))
  41. out_file.write(self.compressor.finish())
  42. self._check_decompression(test_data)
  43. def _test_multiple_process_and_flush(self, test_data):
  44. # Write chunked and flushed compression to temp file.
  45. temp_compressed = _test_utils.get_temp_compressed_name(test_data)
  46. with open(temp_compressed, 'wb') as out_file:
  47. with open(test_data, 'rb') as in_file:
  48. read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE)
  49. for data in iter(read_chunk, b''):
  50. out_file.write(self.compressor.process(data))
  51. out_file.write(self.compressor.flush())
  52. out_file.write(self.compressor.finish())
  53. self._check_decompression(test_data)
  54. _test_utils.generate_test_methods(_TestCompressor)
  55. class TestCompressorQuality1(_TestCompressor, _test_utils.TestCase):
  56. def setUp(self):
  57. self.compressor = brotli.Compressor(quality=1)
  58. class TestCompressorQuality6(_TestCompressor, _test_utils.TestCase):
  59. def setUp(self):
  60. self.compressor = brotli.Compressor(quality=6)
  61. class TestCompressorQuality9(_TestCompressor, _test_utils.TestCase):
  62. def setUp(self):
  63. self.compressor = brotli.Compressor(quality=9)
  64. class TestCompressorQuality11(_TestCompressor, _test_utils.TestCase):
  65. def setUp(self):
  66. self.compressor = brotli.Compressor(quality=11)
  67. if __name__ == '__main__':
  68. unittest.main()