123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- #!/usr/bin/env vpython3
- # Copyright 2020 The Chromium Authors. All rights reserved.
- # Use of this source code is governed by a BSD-style license that can be
- # found in the LICENSE file.
- """Unit tests for lockfile.py"""
- import logging
- import os
- import shutil
- import sys
- import tempfile
- import threading
- import unittest
- if sys.version_info.major == 2:
- import mock
- import Queue
- else:
- from unittest import mock
- import queue as Queue
- DEPOT_TOOLS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.insert(0, DEPOT_TOOLS_ROOT)
- from testing_support import coverage_utils
- import lockfile
- class LockTest(unittest.TestCase):
- def setUp(self):
- self.cache_dir = tempfile.mkdtemp(prefix='lockfile')
- self.addCleanup(shutil.rmtree, self.cache_dir, ignore_errors=True)
- def testLock(self):
- with lockfile.lock(self.cache_dir):
- # cached dir locked, attempt to lock it again
- with self.assertRaises(lockfile.LockError):
- with lockfile.lock(self.cache_dir):
- pass
- with lockfile.lock(self.cache_dir):
- pass
- @mock.patch('time.sleep')
- def testLockConcurrent(self, sleep_mock):
- '''testLockConcurrent simulates what happens when two separate processes try
- to acquire the same file lock with timeout.'''
- # Queues q_f1 and q_sleep are used to controll execution of individual
- # threads.
- q_f1 = Queue.Queue()
- q_sleep = Queue.Queue()
- results = Queue.Queue()
- def side_effect(arg):
- '''side_effect is called when with l.lock is blocked. In this unit test
- case, it comes from f2.'''
- logging.debug('sleep: started')
- q_sleep.put(True)
- logging.debug('sleep: waiting for q_sleep to be consumed')
- q_sleep.join()
- logging.debug('sleep: waiting for result before exiting')
- results.get(timeout=1)
- logging.debug('sleep: exiting')
- sleep_mock.side_effect = side_effect
- def f1():
- '''f1 enters first in l.lock (controlled via q_f1). It then waits for
- side_effect to put a message in queue q_sleep.'''
- logging.debug('f1 started, locking')
- with lockfile.lock(self.cache_dir, timeout=1):
- logging.debug('f1: locked')
- q_f1.put(True)
- logging.debug('f1: waiting on q_f1 to be consumed')
- q_f1.join()
- logging.debug('f1: done waiting on q_f1, getting q_sleep')
- q_sleep.get(timeout=1)
- results.put(True)
- logging.debug('f1: lock released')
- q_sleep.task_done()
- logging.debug('f1: exiting')
- def f2():
- '''f2 enters second in l.lock (controlled by q_f1).'''
- logging.debug('f2: started, consuming q_f1')
- q_f1.get(timeout=1) # wait for f1 to execute lock
- q_f1.task_done()
- logging.debug('f2: done waiting for q_f1, locking')
- with lockfile.lock(self.cache_dir, timeout=1):
- logging.debug('f2: locked')
- results.put(True)
- t1 = threading.Thread(target=f1)
- t1.start()
- t2 = threading.Thread(target=f2)
- t2.start()
- t1.join()
- t2.join()
- # One result was consumed by side_effect, we expect only one in the queue.
- self.assertEqual(1, results.qsize())
- sleep_mock.assert_called_once_with(1)
- if __name__ == '__main__':
- logging.basicConfig(
- level=logging.DEBUG if '-v' in sys.argv else logging.ERROR)
- sys.exit(
- coverage_utils.covered_main(
- (os.path.join(DEPOT_TOOLS_ROOT, 'git_cache.py')),
- required_percentage=0))
|