Skip to content

Commit a0a05a1

Browse files
author
James William Pye
committed
Use Python actual tuple's to represent element3.Tuple objects.
This is done to avoid additional object allocations while processing tuple data. Much of the overhead involved with parsing the tuple data ended up being due to the additional objects being allocated while processing D messages. Additionally, increase the default chunksize aggressively. (Might tune this down)
1 parent b5782ca commit a0a05a1

8 files changed

Lines changed: 291 additions & 96 deletions

File tree

postgresql/driver/pq3.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -377,10 +377,14 @@ def _init(self,
377377
self._pq_xp_fetchall() + (element.SynchronizeMessage,)
378378
)
379379
self.database._pq_push(self._xact, self)
380+
STEP = self.database._pq_step
380381
while self._xact.state != xact.Complete:
381-
self.database._pq_step()
382+
STEP()
382383
for x in self._xact.messages_received():
383-
if x.type == null:
384+
if x.__class__ is tuple or expect == x.type:
385+
# no need to step once this is seen
386+
return
387+
elif x.type == null:
384388
self.database._pq_complete()
385389
self._xact = None
386390
return
@@ -392,9 +396,6 @@ def _init(self,
392396
# return.
393397
self._xact = None
394398
return
395-
elif x.type == expect:
396-
# no need to step once this is seen
397-
return
398399
elif x.type in (bindcomplete, parsecomplete):
399400
pass
400401
else:
@@ -446,13 +447,13 @@ class SingleXactFetch(FetchAll):
446447
_expect = element.Tuple.type
447448
_process_chunk_ = FetchAll._process_tuple_chunk_Row
448449

449-
def _process_chunk(self, x, tuple_type = element.Tuple.type):
450+
def _process_chunk(self, x, tuple_type = tuple):
450451
return self._process_chunk_((
451-
y for y in x if y.type == tuple_type
452+
y for y in x if y.__class__ is tuple
452453
))
453454

454455
class MultiXactStream(Chunks):
455-
chunksize = 512
456+
chunksize = 1024 * 10
456457
# only tuple streams
457458
_process_chunk = Output._process_tuple_chunk_Row
458459

@@ -483,7 +484,7 @@ def _init(self):
483484
self._xact = self._ins(self._bind() + self._command)
484485
self.database._pq_push(self._xact, self)
485486

486-
def __next__(self, tuple_type = element.Tuple.type):
487+
def __next__(self, tuple_type = tuple):
487488
x = self._xact
488489
if x is None:
489490
raise StopIteration
@@ -493,7 +494,7 @@ def __next__(self, tuple_type = element.Tuple.type):
493494

494495
# get all the element.Tuple messages
495496
chunk = [
496-
y for y in x.messages_received() if y.type == tuple_type
497+
y for y in x.messages_received() if y.__class__ is tuple_type
497498
]
498499
if len(chunk) == self.chunksize:
499500
# there may be more, dispatch the request for the next chunk
@@ -625,7 +626,7 @@ def _fetch(self, direction, quantity):
625626
self.database._pq_push(x, self)
626627
self.database._pq_complete()
627628
return self._process_tuple((
628-
y for y in x.messages_received() if y.type == element.Tuple.type
629+
y for y in x.messages_received() if y.__class__ is tuple
629630
))
630631

631632
def seek(self, offset, whence = 'ABSOLUTE'):
@@ -1093,7 +1094,7 @@ def first(self, *parameters):
10931094
# It returned rows, look for the first tuple.
10941095
tuple_type = element.Tuple.type
10951096
for xt in x.messages_received():
1096-
if xt.type == tuple_type:
1097+
if xt.__class__ is tuple:
10971098
break
10981099
else:
10991100
return None

postgresql/port/optimized/client3.c

Lines changed: 161 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
static PyObject *
1212
cat_messages(PyObject *self, PyObject *messages_in)
1313
{
14+
const static uint32_t null_attribute = 0xFFFFFFFFL;
1415
PyObject *msgs = NULL;
1516
Py_ssize_t nmsgs = 0;
1617
Py_ssize_t cmsg = 0;
@@ -62,7 +63,8 @@ cat_messages(PyObject *self, PyObject *messages_in)
6263
*/
6364

6465
/* realloc the buf for the new copy data */
65-
bufsize = bufsize + (5 * (eofc - cmsg)) + xsize;
66+
xsize = xsize + (5 * (eofc - cmsg));
67+
bufsize = bufsize + xsize;
6668
nbuf = realloc(buf, bufsize);
6769
if (nbuf == NULL)
6870
{
@@ -99,6 +101,164 @@ cat_messages(PyObject *self, PyObject *messages_in)
99101
++cmsg;
100102
}
101103
}
104+
else if (PyTuple_CheckExact(ob))
105+
{
106+
/*
107+
* Handle 'D' tuple data from a raw Python tuple.
108+
*/
109+
Py_ssize_t eofc = cmsg;
110+
Py_ssize_t xsize = 0;
111+
112+
/* find the last of the tuple data (eofc) */
113+
do
114+
{
115+
Py_ssize_t current_item, nitems;
116+
117+
nitems = PyTuple_GET_SIZE(ob);
118+
if (nitems > 0xFFFF)
119+
{
120+
PyErr_SetString(PyExc_OverflowError,
121+
"too many attributes in tuple message");
122+
goto fail;
123+
}
124+
125+
/*
126+
* The items take *at least* 4 bytes each.
127+
* (The attribute count is considered later)
128+
*/
129+
xsize = xsize + (nitems * 4);
130+
131+
for (current_item = 0; current_item < nitems; ++current_item)
132+
{
133+
PyObject *att = PyTuple_GET_ITEM(ob, current_item);
134+
135+
/*
136+
* Attributes are expected to be bytes() or None.
137+
*/
138+
if (PyBytes_CheckExact(att))
139+
xsize = xsize + PyBytes_GET_SIZE(att);
140+
else if (att != Py_None)
141+
{
142+
PyErr_Format(PyExc_TypeError,
143+
"cannot serialize tuple message attribute of type '%s'",
144+
Py_TYPE(att)->tp_name);
145+
goto fail;
146+
}
147+
/*
148+
* else it's Py_None and the size has been specified.
149+
*/
150+
}
151+
152+
++eofc;
153+
if (eofc >= nmsgs)
154+
break; /* end of messages in the list? */
155+
156+
/* Grab the next message. */
157+
ob = PyList_GET_ITEM(msgs, eofc);
158+
} while(PyTuple_CheckExact(ob));
159+
160+
/*
161+
* Either the end of the list or `ob` is not a data object meaning
162+
* that it's the end of the copy data.
163+
*/
164+
165+
/*
166+
* realloc the buf for the new tuple data
167+
*
168+
* Each D message consumes at least 1 + 4 + 2 bytes:
169+
* 1 for the message type
170+
* 4 for the message size
171+
* 2 for the attribute count
172+
*/
173+
xsize = xsize + (7 * (eofc - cmsg));
174+
bufsize = bufsize + xsize;
175+
nbuf = realloc(buf, bufsize);
176+
if (nbuf == NULL)
177+
{
178+
PyErr_Format(
179+
PyExc_MemoryError,
180+
"failed to allocate %lu bytes of memory for out-going messages",
181+
(unsigned long) bufsize
182+
);
183+
goto fail;
184+
}
185+
else
186+
{
187+
buf = nbuf;
188+
nbuf = NULL;
189+
}
190+
191+
/*
192+
* Make the final pass through the tuple data memcpy'ing the data from
193+
* the bytes() objects.
194+
*
195+
* No type checks are done here as they should have been done while
196+
* gathering the sizes for the realloc().
197+
*/
198+
while (cmsg < eofc)
199+
{
200+
Py_ssize_t current_item, nitems;
201+
uint32_t msg_length, out_msg_len;
202+
uint16_t natts;
203+
char *localbuf = buf + bufpos + 5; /* skipping the header for now */
204+
buf[bufpos] = 'D'; /* Tuple data message type */
205+
206+
ob = PyList_GET_ITEM(msgs, cmsg);
207+
nitems = PyTuple_GET_SIZE(ob);
208+
209+
/*
210+
* 4 bytes for the message length,
211+
* 2 bytes for the attribute count and
212+
* 4 bytes for each item in 'ob'.
213+
*/
214+
msg_length = 4 + 2 + (nitems * 4);
215+
216+
/*
217+
* Set number of attributes.
218+
*/
219+
natts = local_ntohs((uint16_t) nitems);
220+
Py_MEMCPY(localbuf, &natts, 2);
221+
localbuf = localbuf + 2;
222+
223+
for (current_item = 0; current_item < nitems; ++current_item)
224+
{
225+
PyObject *att = PyTuple_GET_ITEM(ob, current_item);
226+
227+
if (att == Py_None)
228+
{
229+
Py_MEMCPY(localbuf, &null_attribute, 4);
230+
localbuf = localbuf + 4;
231+
}
232+
else
233+
{
234+
Py_ssize_t attsize = PyBytes_GET_SIZE(att);
235+
uint32_t n_attsize;
236+
237+
n_attsize = local_ntohl((uint32_t) attsize);
238+
239+
Py_MEMCPY(localbuf, &n_attsize, 4);
240+
localbuf = localbuf + 4;
241+
Py_MEMCPY(localbuf, PyBytes_AS_STRING(att), attsize);
242+
localbuf = localbuf + attsize;
243+
244+
msg_length = msg_length + attsize;
245+
}
246+
}
247+
248+
/*
249+
* Summed up the message size while copying the attributes.
250+
*/
251+
out_msg_len = local_ntohl(msg_length);
252+
Py_MEMCPY(buf + bufpos + 1, &out_msg_len, 4);
253+
254+
/*
255+
* Filled in the data while summing the message size, so
256+
* adjust the buffer position for the next message.
257+
*/
258+
bufpos = bufpos + 1 + msg_length;
259+
++cmsg;
260+
}
261+
}
102262
else
103263
{
104264
PyObject *serialized;

postgresql/port/optimized/element3.c

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,12 @@ _pack_tuple_data(PyObject *tup)
112112
* dst must be of PyTuple_Type with at least natts items slots.
113113
*/
114114
static int
115-
_unpack_tuple_data(PyObject *dst, uint16_t natts, const char *data, Py_ssize_t data_len)
115+
_unpack_tuple_data(PyObject *dst, uint16_t natts, register const char *data, Py_ssize_t data_len)
116116
{
117117
PyObject *ob;
118118
uint16_t cnatt = 0;
119119
uint32_t attsize = 0;
120-
uint32_t position = 0;
120+
register uint32_t position = 0;
121121

122122
while (cnatt < natts)
123123
{
@@ -203,7 +203,7 @@ _unpack_tuple_data(PyObject *dst, uint16_t natts, const char *data, Py_ssize_t d
203203
static PyObject *
204204
parse_tuple_message(PyObject *self, PyObject *args)
205205
{
206-
PyObject *prerob, *rob, *temp_tup;
206+
PyObject *rob;
207207
PyObject *typ;
208208
const char *data;
209209
Py_ssize_t dlen = 0;
@@ -221,26 +221,16 @@ parse_tuple_message(PyObject *self, PyObject *args)
221221
Py_MEMCPY(&natts, data, 2);
222222
natts = local_ntohs(natts);
223223

224-
prerob = PyTuple_New(natts);
225-
if (prerob == NULL)
224+
rob = PyTuple_New(natts);
225+
if (rob == NULL)
226226
return(NULL);
227227

228-
if (_unpack_tuple_data(prerob, natts, data+2, dlen-2) < 0)
229-
{
230-
Py_DECREF(prerob);
231-
return(NULL);
232-
}
233-
234-
temp_tup = PyTuple_New(1);
235-
if (temp_tup == NULL)
228+
if (_unpack_tuple_data(rob, natts, data+2, dlen-2) < 0)
236229
{
237-
Py_DECREF(prerob);
230+
Py_DECREF(rob);
238231
return(NULL);
239232
}
240-
PyTuple_SET_ITEM(temp_tup, 0, prerob);
241233

242-
rob = PyObject_CallObject(typ, temp_tup);
243-
Py_DECREF(temp_tup);
244234
return(rob);
245235
}
246236

postgresql/protocol/client3.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,6 @@
1212

1313
__all__ = ('Connection',)
1414

15-
try:
16-
from ..port.optimized import cat_messages
17-
except ImportError:
18-
from ..python.structlib import long_pack
19-
def cat_messages(messages, lpack = long_pack, blen = bytes.__len__):
20-
return b''.join([
21-
x.bytes() if x.__class__ is not bytes else (
22-
b'd' + lpack(blen(x) + 4) + x
23-
) for x in messages
24-
])
25-
del long_pack
26-
2715
client_detected_protocol_error = element.ClientError((
2816
(b'S', 'FATAL'),
2917
(b'C', '08P01'),
@@ -330,7 +318,9 @@ def send_message_data(self):
330318
raise
331319
return True
332320

333-
def standard_write_messages(self, messages):
321+
def standard_write_messages(self, messages,
322+
cat_messages = element.cat_messages
323+
):
334324
'protocol message writer'
335325
if self.writing is not self.written:
336326
self.message_data += cat_messages(self.writing)

0 commit comments

Comments
 (0)