Tower of Power

Too sweet to be sour.

Doctrine 1.2 MSSQL Alternative LIMIT/Paging

At work I had been having all sorts of issues with Doctrine_Connection_Mssql’s LIMIT alteration, based on Zend_Db’s code.

The code used the more-compatible-with-SQL-Server-2000 technique of modifying the query to SELECT TOP (offset + limit), reverse the ORDER BY clause and SELECT TOP (limit), then finally reversing the returned dataset.

As ugly as this technique is, it works. The problem is it requires an extreme amount of intelligence or an extreme amount of simplicity in the query in order for an automated system like Doctrine to be usable. The biggest caveat with this technique is good goddamned luck paging your query if it doesn’t have an ORDER BY. And sometimes queries that are complex enough break the modified Zend_Db code.

There exists an easier MSSQL paging technique. Using features first available in SQL Server 2005, with only 1 subquery you can mimic MySQL’s LIMIT clause with ease.

Basically, Microsoft provided the following special feature to determine the row number in your final resultset:

1
SELECT Row_Number() OVER (ORDER BY column) AS RowIndex FROM table

Notice the OVER (ORDER BY column) segment? This is provided as this query will most often be used in a subquery. Given that MSSQL does not allow ORDER BY statements in subqueries, just move them into the OVER (...) section.

To borrow Ralph Varjabedian’s example, the following MySQL query:

1
SELECT * FROM users LIMIT 15, 15

Becomes functionally equivalent to the following MSSQL query:

1
2
3
4
5
6
7
8
9
10
11
12
13
SELECT
    *
FROM
    (
    SELECT
        Row_Number() OVER (ORDER BY userID) AS RowIndex,
        *
    FROM
        users
    ) AS sub
WHERE
    sub.RowIndex > 15
    AND sub.RowIndex <= 30

Now all I needed to do was add the ability for Doctrine to parse a generated MSSQL query and reform it like the one above!

I’ve provided a copy of the Doctrine connection adapter I wrote. Simply add the following line to wherever you setup Doctrine:

Please note: This was the result of about 6 hours of hacking today. There are certainly places in the code where it can be more robust or improved, especially in my little parser. Use at your own risk (though I have yet to encounter any errors in my application).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
<?php
class CaseMan_Doctrine_Connection_Mssql extends Doctrine_Connection_Mssql
{
    /**
     * @var string $driverName                  the name of this connection driver
     */
    protected $driverName = 'Mssql';

    /**
     *
     * @var boolean $is2005OrBetter             cached result determining if server is SQL Server 2005 or better
     */
    protected $is2005OrBetter;

    /**
     * the constructor
     *
     * @param Doctrine_Manager $manager
     * @param PDO $pdo                          database handle
     */
    public function __construct(Doctrine_Manager $manager, $adapter)
    {
        parent::__construct($manager, $adapter);
    }

    /**
     * Adds an adapter-specific LIMIT clause to the SELECT statement.
     *
     * @param string $query
     * @param mixed $limit
     * @param mixed $offset
     * @return string
     */
    public function modifyLimitQuery($query, $limit = false, $offset = false, $isManip = false)
    {
        if ($limit <= 0 && $offset <= 0)
            return $query;

        if( !$this->is2005OrBetter() ) //Not at least 2005
            return parent::modifyLimitQuery ($query, $limit, $offset, $isManip);

        //SETUP FIELD ALIASES
        $inner_query_name = '_inner_query_';
        $row_count_name = '_inner_query_row_count_';

        //PREPARE TOKENIZER REGEXES
        $escaped = "\\\.";
        $sections = "SELECT|FROM|WHERE|GROUP[ ]+BY|HAVING|ORDER[ ]+BY";
        $open_delimiters = "[\[\(]";
        $close_delimiters = "[\]\)]";
        $string_delimiters = "[\"\']";

        //TOKENIZE QUERY
        $_split_query = preg_split(
            "#({$escaped}|{$sections}|{$open_delimiters}|{$close_delimiters}|{$string_delimiters})#i",
            trim($query),
            -1,
            PREG_SPLIT_DELIM_CAPTURE | PREG_SPLIT_NO_EMPTY
        );

        $query_parts = array();
        $_state = 'BEGIN';
        $_current_part = 'SELECT';
        $_stack = array();
        while( $_token = array_shift($_split_query) )
        {
            switch( $_state )
            {
                case 'BEGIN':

                    if( trim(strtoupper($_token)) == 'SELECT' )
                    {
                        $query_parts['SELECT'] = '';
                        $_current_part = 'SELECT';
                        $_state = 'SECTION';
                    }
                    else
                    {
                        throw new Doctrine_Exception("Invalid query passed to modifyLimitQuery, must begin with SELECT");
                    }

                    break;
                case 'SECTION':

                    if( preg_match("#^({$sections})$#i", trim($_token)) )
                    {
                        $_section = strtoupper(trim($_token));

                        $query_parts[$_section] = '';
                        $_current_part = $_section;
                        $_state = 'SECTION';
                    }
                    else
                    {
                        $query_parts[$_current_part] .= $_token;

                        if( preg_match("#^({$string_delimiters})$#i", trim($_token)) )
                        {
                            array_push($_stack, trim($_token));
                            $_state = 'STRING';
                        } else if( preg_match("#^({$open_delimiters})$#i", trim($_token)) )
                        {
                            array_push($_stack, trim($_token));
                            $_state = 'DELIMITED';
                        }
                    }

                    break;
                case 'DELIMITED':

                    $query_parts[$_current_part] .= $_token;

                    if( preg_match("#^({$close_delimiters})$#i", trim($_token)) )
                    {
                        $_prev_delimiter = array_pop($_stack);
                        switch( trim($_token) )
                        {
                            case ']':
                                if( $_prev_delimiter != '[' )
                                    throw new Doctrine_Exception("Mismatched ]");
                                break;
                            case ')':
                                if( $_prev_delimiter != '(' )
                                    throw new Doctrine_Excpetion("Mismatched )");
                                break;
                            default:
                                trigger_error("FATAL ERROR: UNRECOGNIZED CLOSE DELIMITER TOKEN '{$_token}' IN " . __CLASS__ . '::' . __METHOD__, E_USER_ERROR);
                        }

                        if( count($_stack) == 0 )
                            $_state = 'SECTION';
                    }
                    elseif( preg_match("#^({$open_delimiters})$#i", trim($_token)) )
                    {
                        array_push($_stack, trim($_token));
                    }

                    break;
                case 'STRING':

                    $query_parts[$_current_part] .= $_token;

                    if( preg_match("#^({$string_delimiters})$#i", trim($_token)) )
                    {
                        if( trim($_token) == end($_stack) )
                        {
                            array_pop($_stack);
                            $_state = 'SECTION';
                        }
                    }

                    break;
            }
        }
        unset($item, $_current_part, $_token, $_stack, $_state, $_section);

        //DIVIDE UP THE SELECT STATEMENT TO PREPARE TO INSERT THE ROW_NUMBER() SELECT FIELD
        $_select_split = array_map('trim', preg_split(
            "#^(DISTINCT|)[ ]*(TOP[ ]+[0-9]+|)#i",
            trim($query_parts['SELECT']),
            -1,
            PREG_SPLIT_DELIM_CAPTURE | PREG_SPLIT_NO_EMPTY
        ));

        $_select_details = array_pop($_select_split);

        $query_parts['SELECT'] = array_merge((array)implode(' ', $_select_split), (array)$_select_details);
        unset($_select_split, $_select_details);



        //SETUP OUTER QUERY SELECT STATEMENT
        $outer_select = array();
        foreach( array_map('trim', explode(',', end($query_parts['SELECT']))) as $_select )
        {
            $matches = array();
            if( preg_match('#AS[ ]+(?<alias>.*)$#i', $_select, $matches) )
            {
                $outer_select[] = "[{$inner_query_name}]." . trim($matches['alias']);
            }
            else if( preg_match('#^(?<table>(\[[^\]]+\]|[^\.]+)\.|)(?<field>.*)#i', $_select, $matches) )
            {
                $outer_select[] = "[{$inner_query_name}]." . trim($matches['field']);
            }
        }

        //SETUP ROW_COUNT OVER() SEGMENT
        if( isset($query_parts['ORDER BY']) )
        {
            $row_count_select = "Row_Number() OVER (ORDER BY " . $query_parts['ORDER BY'] . ") AS [{$row_count_name}]";
            unset($query_parts['ORDER BY']); //ORDER BY NOT ALLOWED IN SUBQUERY, OVER(...) TAKES ITS PLACE
        }
        else
        {
            $row_count_select = "Row_Number() OVER (ORDER BY (SELECT 0)) AS [{$row_count_name}]";
        }

        $query = implode(' ', array(
            'SELECT',
            count($query_parts['SELECT']) > 1 ?
                $query_parts['SELECT'][0] . ' ' . implode(', ', array(
                    $row_count_select,
                    $query_parts['SELECT'][1]
                )) :
                implode(', ', array(
                    $row_count_select,
                    $query_parts['SELECT'][0]
                )),
        ));

        unset($query_parts['SELECT']);
        foreach( $query_parts as $section => $parameters )
            $query .= ' ' . $section . ' ' . $parameters;


        $outer_query = "SELECT " . implode(', ', $outer_select) . " FROM (" . $query . ") AS [{$inner_query_name}]";

        if( $limit || $offset )
        {
            $outer_where = array();
            if( $limit )
                $outer_where[] = "[{$inner_query_name}].[{$row_count_name}] <= " . ($limit + $offset);
            if( $offset )
                $outer_where[] = "[{$inner_query_name}].[{$row_count_name}] > " . $offset;

            $outer_query .= ' WHERE ' . implode(' AND ', $outer_where);
        }

        return $outer_query;
    }

    public function is2005OrBetter()
    {
        if( !isset($this->is2005OrBetter) )
        {
            $version = $this->getServerVersion();

            if( $version['major'] >= 9 )
                $this->is2005OrBetter = true;
            else
                $this->is2005OrBetter = false;
        }

        return $this->is2005OrBetter;
    }

    /**
     * return version information about the server
     *
     * @param bool   $native  determines if the raw version string should be returned
     * @return array    version information
     */
    public function getServerVersion($native = false)
    {
        if ($this->serverInfo) {
            $serverInfo = $this->serverInfo;
        } else {
            $query      = 'SELECT @@VERSION';
            $serverInfo = $this->fetchOne($query);
        }
        // cache server_info
        $this->serverInfo = $serverInfo;
        if ( ! $native) {
            if (preg_match('/([0-9]+)\.([0-9]+)\.([0-9]+)/', $serverInfo, $tmp)) {
                $serverInfo = array(
                    'major' => $tmp[1],
                    'minor' => $tmp[2],
                    'patch' => $tmp[3],
                    'extra' => null,
                    'native' => $serverInfo,
                );
            } else {
                $serverInfo = array(
                    'major' => null,
                    'minor' => null,
                    'patch' => null,
                    'extra' => null,
                    'native' => $serverInfo,
                );
            }
        }
        return $serverInfo;
    }
}