Skip to content

Commit b86170c

Browse files
author
James William Pye
committed
Improve the portability of .alock.
* In one version COALESCE was trying to cast int4[] to int8[]. Avoid this by COALESCE'ing the results of array_upper(). * Define the alock primitives as methods. The try_* interfaces came later, so referencing all the statements at once yielded a portability problem despite the server's capability of acquiring waiting locks. Add a test actually exercising concurrency. It's not particularly good as it would be successful if ALock() did nothing at all, but, nonetheless, exercise the code path.
1 parent c33598b commit b86170c

4 files changed

Lines changed: 87 additions & 42 deletions

File tree

postgresql/alock.py

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@
77
from abc import abstractmethod, abstractproperty
88
from .python.element import Element
99

10+
__all__ = [
11+
'ALock',
12+
'ExclusiveLock',
13+
'ShareLock',
14+
]
15+
1016
class ALock(Element):
1117
"""
1218
Advisory Lock class for managing the acquisition and release of a sequence
@@ -33,10 +39,21 @@ def mode(self):
3339
"""
3440

3541
@abstractmethod
36-
def __select_statements__(self):
42+
def _try(self, id_pairs, ids):
3743
"""
38-
ALock subclasses need to implement this in order to specify
39-
the actual statements that try, acquire, and release the locks.
44+
Try and acquire.
45+
"""
46+
47+
@abstractmethod
48+
def _acquire(self, id_pairs, ids):
49+
"""
50+
Acquire and wait if necessary.
51+
"""
52+
53+
@abstractmethod
54+
def _release(self, id_pairs, ids):
55+
"""
56+
Release the locks.
4057
"""
4158

4259
@staticmethod
@@ -60,12 +77,12 @@ def acquire(self, blocking = True, len = len):
6077
# _count is zero, so the locks need to be acquired.
6178
wait = bool(blocking)
6279
if wait:
63-
self._acquire_stmt(self._id_pairs, self._ids)
80+
self._acquire(self._id_pairs, self._ids)
6481
else:
6582
# grab the success of each lock id. if some were
6683
# unsuccessful, then the ones that were successful need to be
6784
# released.
68-
r = self._try_stmt(self._id_pairs, self._ids)
85+
r = self._try(self._id_pairs, self._ids)
6986
# accumulate the identifiers that *did* lock
7087
release_seq = [
7188
id for didlock, id in zip(r, self.identifiers) if didlock[0]
@@ -78,7 +95,7 @@ def acquire(self, blocking = True, len = len):
7895
# on the same seq that it should be able to acquire all of
7996
# them once the contended lock is released.
8097
release_seq.reverse()
81-
self._release_stmt(*self._split_lock_identifiers(release_seq))
98+
self._release(*self._split_lock_identifiers(release_seq))
8299
# unable to acquire all.
83100
return False
84101
self._count = self._count + 1
@@ -97,7 +114,7 @@ def release(self):
97114
if not self.database.closed and self._count > 0:
98115
# if the database has been closed, or the count will
99116
# remain non-zero, there is no need to release.
100-
self._release_stmt(reversed(self._id_pairs), reversed(self._ids))
117+
self._release(reversed(self._id_pairs), reversed(self._ids))
101118
# decrement the count nonetheless.
102119
self._count = self._count - 1
103120

@@ -124,25 +141,25 @@ def __init__(self, database, *identifiers):
124141
self.connection = self.database = database
125142
self.identifiers = identifiers
126143
self._id_pairs, self._ids = self._split_lock_identifiers(identifiers)
127-
self._try_stmt, self._acquire_stmt, self._release_stmt = \
128-
self.__select_statements__()
129144

130145
class ShareLock(ALock):
131146
mode = 'share'
132-
def __select_statements__(self):
133-
sys = self.database.sys
134-
return (
135-
sys.try_advisory_shared,
136-
sys.acquire_advisory_shared,
137-
sys.release_advisory_shared
138-
)
147+
def _try(self, *args):
148+
return self.database.sys.try_advisory_shared(*args)
149+
150+
def _acquire(self, *args):
151+
return self.database.sys.acquire_advisory_shared(*args)
152+
153+
def _release(self, *args):
154+
return self.database.sys.release_advisory_shared(*args)
139155

140156
class ExclusiveLock(ALock):
141157
mode = 'exclusive'
142-
def __select_statements__(self):
143-
sys = self.database.sys
144-
return (
145-
sys.try_advisory_exclusive,
146-
sys.acquire_advisory_exclusive,
147-
sys.release_advisory_exclusive
148-
)
158+
def _try(self, *args):
159+
return self.database.sys.try_advisory_exclusive(*args)
160+
161+
def _acquire(self, *args):
162+
return self.database.sys.acquire_advisory_exclusive(*args)
163+
164+
def _release(self, *args):
165+
return self.database.sys.release_advisory_exclusive(*args)

postgresql/lib/libsys.sql

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ SELECT
208208
pg_catalog.pg_advisory_unlock_shared($2[i])
209209
END AS released
210210
FROM
211-
pg_catalog.generate_series(1, array_upper(COALESCE($1::int4[],$2::int8[]), 1)) AS g(i)
211+
pg_catalog.generate_series(1, COALESCE(array_upper($2::int8[], 1), array_upper($1::int4[], 1))) AS g(i)
212212

213213
[acquire_advisory_shared]
214214
SELECT COUNT((
@@ -220,7 +220,7 @@ SELECT COUNT((
220220
END
221221
) IS NULL) AS acquired
222222
FROM
223-
pg_catalog.generate_series(1, array_upper(COALESCE($1::int4[],$2::int8[]), 1)) AS g(i)
223+
pg_catalog.generate_series(1, COALESCE(array_upper($2::int8[], 1), array_upper($1::int4[], 1))) AS g(i)
224224

225225
[try_advisory_shared]
226226
SELECT
@@ -231,7 +231,7 @@ SELECT
231231
pg_catalog.pg_try_advisory_lock_shared($2[i])
232232
END AS acquired
233233
FROM
234-
pg_catalog.generate_series(1, array_upper(COALESCE($1::int4[],$2::int8[]), 1)) AS g(i)
234+
pg_catalog.generate_series(1, COALESCE(array_upper($2::int8[], 1), array_upper($1::int4[], 1))) AS g(i)
235235

236236
[release_advisory_exclusive]
237237
SELECT
@@ -242,7 +242,7 @@ SELECT
242242
pg_catalog.pg_advisory_unlock($2[i])
243243
END AS released
244244
FROM
245-
pg_catalog.generate_series(1, array_upper(COALESCE($1::int4[],$2::int8[]), 1)) AS g(i)
245+
pg_catalog.generate_series(1, COALESCE(array_upper($2::int8[], 1), array_upper($1::int4[], 1))) AS g(i)
246246

247247
[acquire_advisory_exclusive]
248248
SELECT COUNT((
@@ -254,7 +254,7 @@ SELECT COUNT((
254254
END
255255
) IS NULL) AS acquired -- Guaranteed to be acquired once complete.
256256
FROM
257-
pg_catalog.generate_series(1, array_upper(COALESCE($1::int4[],$2::int8[]), 1)) AS g(i)
257+
pg_catalog.generate_series(1, COALESCE(array_upper($2::int8[], 1), array_upper($1::int4[], 1))) AS g(i)
258258

259259
[try_advisory_exclusive]
260260
SELECT
@@ -265,4 +265,4 @@ SELECT
265265
pg_catalog.pg_try_advisory_lock($2[i])
266266
END AS acquired
267267
FROM
268-
pg_catalog.generate_series(1, array_upper(COALESCE($1::int4[],$2::int8[]), 1)) AS g(i)
268+
pg_catalog.generate_series(1, COALESCE(array_upper($2::int8[], 1), array_upper($1::int4[], 1))) AS g(i)

postgresql/temporal.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ def destroy(self):
6161
if os.getpid() == self._init_pid_:
6262
with self:
6363
# Kill all the open connections.
64-
db.sys.terminate_backends()
64+
try:
65+
db.sys.terminate_backends()
66+
except Exception:
67+
pass
6568
cluster = self.cluster
6669
self.cluster = None
6770
self._init_pid_ = None

postgresql/test/test_alock.py

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,44 @@
77
from ..temporal import pg_tmp
88
from .. import alock
99

10+
n_alocks = "select count(*) FROM pg_locks WHERE locktype = 'advisory'"
11+
1012
class test_alock(unittest.TestCase):
13+
@pg_tmp
14+
def testALockWait(self):
15+
# sadly, this is primarily used to exercise the code paths..
16+
ad = prepare(n_alocks).first
17+
self.failUnlessEqual(ad(), 0)
18+
state = [False, False, False]
19+
alt = new()
20+
def concurrent_lock():
21+
pass
22+
with alock.ExclusiveLock(alt, (1,1)):
23+
with alock.ExclusiveLock(alt, (0,0)):
24+
# start it
25+
state[0] = True
26+
while not state[1]:
27+
pass
28+
time.sleep(0.01)
29+
while not state[2]:
30+
time.sleep(0.01)
31+
t = threading.Thread(target = concurrent_lock)
32+
t.start()
33+
while not state[0]:
34+
time.sleep(0.01)
35+
self.failUnlessEqual(ad(), 2)
36+
state[1] = True
37+
with alock.ExclusiveLock(db, (0,0)):
38+
self.failUnlessEqual(ad(), 2)
39+
state[2] = True
40+
with alock.ExclusiveLock(db, (1,1)):
41+
self.failUnlessEqual(ad(), 2)
42+
t.join(timeout = 1)
43+
1144
@pg_tmp
1245
def testALockNoWait(self):
1346
alt = new()
14-
ad = db.prepare(
15-
"select count(*) FROM pg_locks WHERE locktype = 'advisory'"
16-
).first
47+
ad = prepare(n_alocks).first
1748
self.failUnlessEqual(ad(), 0)
1849
with alock.ExclusiveLock(db, (0,0)):
1950
l=alock.ExclusiveLock(alt, (0,0))
@@ -24,9 +55,7 @@ def testALockNoWait(self):
2455

2556
@pg_tmp
2657
def testALock(self):
27-
ad = db.prepare(
28-
"select count(*) FROM pg_locks WHERE locktype = 'advisory'"
29-
).first
58+
ad = prepare(n_alocks).first
3059
self.failUnlessEqual(ad(), 0)
3160
# test a variety..
3261
lockids = [
@@ -64,9 +93,7 @@ def testALock(self):
6493
@pg_tmp
6594
def testPartialALock(self):
6695
# Validates that release is properly cleaning up
67-
ad = db.prepare(
68-
"select count(*) FROM pg_locks WHERE locktype = 'advisory'"
69-
).first
96+
ad = prepare(n_alocks).first
7097
self.failUnlessEqual(ad(), 0)
7198
held = (0,-1234)
7299
wanted = [0, 324, -1232948, 7, held, 1, (2,4), (834,1)]
@@ -89,9 +116,7 @@ def testALockParameterErrors(self):
89116

90117
@pg_tmp
91118
def testALockOnClosed(self):
92-
ad = db.prepare(
93-
"select count(*) FROM pg_locks WHERE locktype = 'advisory'"
94-
).first
119+
ad = prepare(n_alocks).first
95120
self.failUnlessEqual(ad(), 0)
96121
held = (0,-1234)
97122
alt = new()

0 commit comments

Comments
 (0)