lockfile_test.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. #!/usr/bin/env vpython3
  2. # Copyright 2020 The Chromium Authors. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Unit tests for lockfile.py"""
  6. import logging
  7. import os
  8. import shutil
  9. import sys
  10. import tempfile
  11. import threading
  12. import unittest
  13. from unittest import mock
  14. import queue
  15. DEPOT_TOOLS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  16. sys.path.insert(0, DEPOT_TOOLS_ROOT)
  17. from testing_support import coverage_utils
  18. import lockfile
  19. # TODO: Should fix these warnings.
  20. # pylint: disable=line-too-long
  21. class LockTest(unittest.TestCase):
  22. def setUp(self):
  23. self.cache_dir = tempfile.mkdtemp(prefix='lockfile')
  24. self.addCleanup(shutil.rmtree, self.cache_dir, ignore_errors=True)
  25. def testLock(self):
  26. with lockfile.lock(self.cache_dir):
  27. # cached dir locked, attempt to lock it again
  28. with self.assertRaises(lockfile.LockError):
  29. with lockfile.lock(self.cache_dir):
  30. pass
  31. with lockfile.lock(self.cache_dir):
  32. pass
  33. @mock.patch('time.sleep')
  34. def testLockConcurrent(self, sleep_mock):
  35. '''testLockConcurrent simulates what happens when two separate processes try
  36. to acquire the same file lock with timeout.'''
  37. # Queues q_f1 and q_sleep are used to controll execution of individual
  38. # threads.
  39. q_f1 = queue.Queue()
  40. q_sleep = queue.Queue()
  41. results = queue.Queue()
  42. def side_effect(arg):
  43. '''side_effect is called when with l.lock is blocked. In this unit test
  44. case, it comes from f2.'''
  45. logging.debug('sleep: started')
  46. q_sleep.put(True)
  47. logging.debug('sleep: waiting for q_sleep to be consumed')
  48. q_sleep.join()
  49. logging.debug('sleep: waiting for result before exiting')
  50. results.get(timeout=1)
  51. logging.debug('sleep: exiting')
  52. sleep_mock.side_effect = side_effect
  53. def f1():
  54. '''f1 enters first in l.lock (controlled via q_f1). It then waits for
  55. side_effect to put a message in queue q_sleep.'''
  56. logging.debug('f1 started, locking')
  57. with lockfile.lock(self.cache_dir, timeout=1):
  58. logging.debug('f1: locked')
  59. q_f1.put(True)
  60. logging.debug('f1: waiting on q_f1 to be consumed')
  61. q_f1.join()
  62. logging.debug('f1: done waiting on q_f1, getting q_sleep')
  63. q_sleep.get(timeout=1)
  64. results.put(True)
  65. logging.debug('f1: lock released')
  66. q_sleep.task_done()
  67. logging.debug('f1: exiting')
  68. def f2():
  69. '''f2 enters second in l.lock (controlled by q_f1).'''
  70. logging.debug('f2: started, consuming q_f1')
  71. q_f1.get(timeout=1) # wait for f1 to execute lock
  72. q_f1.task_done()
  73. logging.debug('f2: done waiting for q_f1, locking')
  74. with lockfile.lock(self.cache_dir, timeout=1):
  75. logging.debug('f2: locked')
  76. results.put(True)
  77. t1 = threading.Thread(target=f1)
  78. t1.start()
  79. t2 = threading.Thread(target=f2)
  80. t2.start()
  81. t1.join()
  82. t2.join()
  83. # One result was consumed by side_effect, we expect only one in the
  84. # queue.
  85. self.assertEqual(1, results.qsize())
  86. sleep_mock.assert_called_with(0.1)
  87. if __name__ == '__main__':
  88. logging.basicConfig(
  89. level=logging.DEBUG if '-v' in sys.argv else logging.ERROR)
  90. sys.exit(
  91. coverage_utils.covered_main(
  92. (os.path.join(DEPOT_TOOLS_ROOT, 'git_cache.py')),
  93. required_percentage=0))