Skip to content

Commit

Permalink
Fix integer-overflow problems in interval comparison.
Browse files Browse the repository at this point in the history
When using integer timestamps, the interval-comparison functions tried
to compute the overall magnitude of an interval as an int64 number of
microseconds.  As reported by Frazer McLean, this overflows for intervals
exceeding about 296000 years, which is bad since we nominally allow
intervals many times larger than that.  That results in wrong comparison
results, and possibly in corrupted btree indexes for columns containing
such large interval values.

To fix, compute the magnitude as int128 instead.  Although some compilers
have native support for int128 calculations, many don't, so create our
own support functions that can do 128-bit addition and multiplication
if the compiler support isn't there.  These support functions are designed
with an eye to allowing the int128 code paths in numeric.c to be rewritten
for use on all platforms, although this patch doesn't do that, or even
provide all the int128 primitives that will be needed for it.

Back-patch as far as 9.4.  Earlier releases did not guard against overflow
of interval values at all (commit 146604e fixed that), so it seems not
very exciting to worry about overly-large intervals for them.

Before 9.6, we did not assume that unreferenced "static inline" functions
would not draw compiler warnings, so omit functions not directly referenced
by timestamp.c, the only present consumer of int128.h.  (We could have
omitted these functions in HEAD too, but since they were written and
debugged on the way to the present patch, and they look likely to be needed
by numeric.c, let's keep them in HEAD.)  I did not bother to try to prevent
such warnings in a --disable-integer-datetimes build, though.

Before 9.5, configure will never define HAVE_INT128, so the part of
int128.h that exploits a native int128 implementation is dead code in the
9.4 branch.  I didn't bother to remove it, thinking that keeping the file
looking similar in different branches is more useful.

In HEAD only, add a simple test harness for int128.h in src/tools/.

In back branches, this does not change the float-timestamps code path.
That's not subject to the same kind of overflow risk, since it computes
the interval magnitude as float8.  (No doubt, when this code was originally
written, overflow was disregarded for exactly that reason.)  There is a
precision hazard instead :-(, but we'll avert our eyes from that question,
since no complaints have been reported and that code's deprecated anyway.

Kyotaro Horiguchi and Tom Lane

Discussion: https://postgr.es/m/[email protected]
  • Loading branch information
tglsfdc committed Apr 6, 2017
1 parent 68ea2b7 commit df1a699
Show file tree
Hide file tree
Showing 5 changed files with 590 additions and 10 deletions.
50 changes: 40 additions & 10 deletions src/backend/utils/adt/timestamp.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "access/hash.h"
#include "access/xact.h"
#include "catalog/pg_type.h"
#include "common/int128.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
Expand Down Expand Up @@ -2288,26 +2289,46 @@ timestamptz_cmp_timestamp(PG_FUNCTION_ARGS)

/*
* interval_relop - is interval1 relop interval2
*
* Interval comparison is based on converting interval values to a linear
* representation expressed in the units of the time field (microseconds,
* in the case of integer timestamps) with days assumed to be always 24 hours
* and months assumed to be always 30 days. To avoid overflow, we need a
* wider-than-int64 datatype for the linear representation, so use INT128.
*/
static inline TimeOffset

static inline INT128
interval_cmp_value(const Interval *interval)
{
TimeOffset span;
INT128 span;
int64 dayfraction;
int64 days;

/*
* Separate time field into days and dayfraction, then add the month and
* day fields to the days part. We cannot overflow int64 days here.
*/
dayfraction = interval->time % USECS_PER_DAY;
days = interval->time / USECS_PER_DAY;
days += interval->month * INT64CONST(30);
days += interval->day;

span = interval->time;
span += interval->month * INT64CONST(30) * USECS_PER_DAY;
span += interval->day * INT64CONST(24) * USECS_PER_HOUR;
/* Widen dayfraction to 128 bits */
span = int64_to_int128(dayfraction);

/* Scale up days to microseconds, forming a 128-bit product */
int128_add_int64_mul_int64(&span, days, USECS_PER_DAY);

return span;
}

static int
interval_cmp_internal(Interval *interval1, Interval *interval2)
{
TimeOffset span1 = interval_cmp_value(interval1);
TimeOffset span2 = interval_cmp_value(interval2);
INT128 span1 = interval_cmp_value(interval1);
INT128 span2 = interval_cmp_value(interval2);

return ((span1 < span2) ? -1 : (span1 > span2) ? 1 : 0);
return int128_compare(span1, span2);
}

Datum
Expand Down Expand Up @@ -2384,9 +2405,18 @@ Datum
interval_hash(PG_FUNCTION_ARGS)
{
Interval *interval = PG_GETARG_INTERVAL_P(0);
TimeOffset span = interval_cmp_value(interval);
INT128 span = interval_cmp_value(interval);
int64 span64;

/*
* Use only the least significant 64 bits for hashing. The upper 64 bits
* seldom add any useful information, and besides we must do it like this
* for compatibility with hashes calculated before use of INT128 was
* introduced.
*/
span64 = int128_to_int64(span);

return DirectFunctionCall1(hashint8, Int64GetDatumFast(span));
return DirectFunctionCall1(hashint8, Int64GetDatumFast(span64));
}

/* overlaps_timestamp() --- implements the SQL OVERLAPS operator.
Expand Down
276 changes: 276 additions & 0 deletions src/include/common/int128.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
/*-------------------------------------------------------------------------
*
* int128.h
* Roll-our-own 128-bit integer arithmetic.
*
* We make use of the native int128 type if there is one, otherwise
* implement things the hard way based on two int64 halves.
*
* See src/tools/testint128.c for a simple test harness for this file.
*
* Copyright (c) 2017, PostgreSQL Global Development Group
*
* src/include/common/int128.h
*
*-------------------------------------------------------------------------
*/
#ifndef INT128_H
#define INT128_H

/*
* For testing purposes, use of native int128 can be switched on/off by
* predefining USE_NATIVE_INT128.
*/
#ifndef USE_NATIVE_INT128
#ifdef HAVE_INT128
#define USE_NATIVE_INT128 1
#else
#define USE_NATIVE_INT128 0
#endif
#endif


#if USE_NATIVE_INT128

typedef int128 INT128;

/*
* Add an unsigned int64 value into an INT128 variable.
*/
static inline void
int128_add_uint64(INT128 *i128, uint64 v)
{
*i128 += v;
}

/*
* Add a signed int64 value into an INT128 variable.
*/
static inline void
int128_add_int64(INT128 *i128, int64 v)
{
*i128 += v;
}

/*
* Add the 128-bit product of two int64 values into an INT128 variable.
*
* XXX with a stupid compiler, this could actually be less efficient than
* the other implementation; maybe we should do it by hand always?
*/
static inline void
int128_add_int64_mul_int64(INT128 *i128, int64 x, int64 y)
{
*i128 += (int128) x *(int128) y;
}

/*
* Compare two INT128 values, return -1, 0, or +1.
*/
static inline int
int128_compare(INT128 x, INT128 y)
{
if (x < y)
return -1;
if (x > y)
return 1;
return 0;
}

/*
* Widen int64 to INT128.
*/
static inline INT128
int64_to_int128(int64 v)
{
return (INT128) v;
}

/*
* Convert INT128 to int64 (losing any high-order bits).
* This also works fine for casting down to uint64.
*/
static inline int64
int128_to_int64(INT128 val)
{
return (int64) val;
}

#else /* !USE_NATIVE_INT128 */

/*
* We lay out the INT128 structure with the same content and byte ordering
* that a native int128 type would (probably) have. This makes no difference
* for ordinary use of INT128, but allows union'ing INT128 with int128 for
* testing purposes.
*/
typedef struct
{
#ifdef WORDS_BIGENDIAN
int64 hi; /* most significant 64 bits, including sign */
uint64 lo; /* least significant 64 bits, without sign */
#else
uint64 lo; /* least significant 64 bits, without sign */
int64 hi; /* most significant 64 bits, including sign */
#endif
} INT128;

/*
* Add an unsigned int64 value into an INT128 variable.
*/
static inline void
int128_add_uint64(INT128 *i128, uint64 v)
{
/*
* First add the value to the .lo part, then check to see if a carry needs
* to be propagated into the .hi part. A carry is needed if both inputs
* have high bits set, or if just one input has high bit set while the new
* .lo part doesn't. Remember that .lo part is unsigned; we cast to
* signed here just as a cheap way to check the high bit.
*/
uint64 oldlo = i128->lo;

i128->lo += v;
if (((int64) v < 0 && (int64) oldlo < 0) ||
(((int64) v < 0 || (int64) oldlo < 0) && (int64) i128->lo >= 0))
i128->hi++;
}

/*
* Add a signed int64 value into an INT128 variable.
*/
static inline void
int128_add_int64(INT128 *i128, int64 v)
{
/*
* This is much like the above except that the carry logic differs for
* negative v. Ordinarily we'd need to subtract 1 from the .hi part
* (corresponding to adding the sign-extended bits of v to it); but if
* there is a carry out of the .lo part, that cancels and we do nothing.
*/
uint64 oldlo = i128->lo;

i128->lo += v;
if (v >= 0)
{
if ((int64) oldlo < 0 && (int64) i128->lo >= 0)
i128->hi++;
}
else
{
if (!((int64) oldlo < 0 || (int64) i128->lo >= 0))
i128->hi--;
}
}

/*
* INT64_AU32 extracts the most significant 32 bits of int64 as int64, while
* INT64_AL32 extracts the least significant 32 bits as uint64.
*/
#define INT64_AU32(i64) ((i64) >> 32)
#define INT64_AL32(i64) ((i64) & UINT64CONST(0xFFFFFFFF))

/*
* Add the 128-bit product of two int64 values into an INT128 variable.
*/
static inline void
int128_add_int64_mul_int64(INT128 *i128, int64 x, int64 y)
{
/* INT64_AU32 must use arithmetic right shift */
StaticAssertStmt(((int64) -1 >> 1) == (int64) -1,
"arithmetic right shift is needed");

/*----------
* Form the 128-bit product x * y using 64-bit arithmetic.
* Considering each 64-bit input as having 32-bit high and low parts,
* we can compute
*
* x * y = ((x.hi << 32) + x.lo) * (((y.hi << 32) + y.lo)
* = (x.hi * y.hi) << 64 +
* (x.hi * y.lo) << 32 +
* (x.lo * y.hi) << 32 +
* x.lo * y.lo
*
* Each individual product is of 32-bit terms so it won't overflow when
* computed in 64-bit arithmetic. Then we just have to shift it to the
* correct position while adding into the 128-bit result. We must also
* keep in mind that the "lo" parts must be treated as unsigned.
*----------
*/

/* No need to work hard if product must be zero */
if (x != 0 && y != 0)
{
int64 x_u32 = INT64_AU32(x);
uint64 x_l32 = INT64_AL32(x);
int64 y_u32 = INT64_AU32(y);
uint64 y_l32 = INT64_AL32(y);
int64 tmp;

/* the first term */
i128->hi += x_u32 * y_u32;

/* the second term: sign-extend it only if x is negative */
tmp = x_u32 * y_l32;
if (x < 0)
i128->hi += INT64_AU32(tmp);
else
i128->hi += ((uint64) tmp) >> 32;
int128_add_uint64(i128, ((uint64) INT64_AL32(tmp)) << 32);

/* the third term: sign-extend it only if y is negative */
tmp = x_l32 * y_u32;
if (y < 0)
i128->hi += INT64_AU32(tmp);
else
i128->hi += ((uint64) tmp) >> 32;
int128_add_uint64(i128, ((uint64) INT64_AL32(tmp)) << 32);

/* the fourth term: always unsigned */
int128_add_uint64(i128, x_l32 * y_l32);
}
}

/*
* Compare two INT128 values, return -1, 0, or +1.
*/
static inline int
int128_compare(INT128 x, INT128 y)
{
if (x.hi < y.hi)
return -1;
if (x.hi > y.hi)
return 1;
if (x.lo < y.lo)
return -1;
if (x.lo > y.lo)
return 1;
return 0;
}

/*
* Widen int64 to INT128.
*/
static inline INT128
int64_to_int128(int64 v)
{
INT128 val;

val.lo = (uint64) v;
val.hi = (v < 0) ? -INT64CONST(1) : INT64CONST(0);
return val;
}

/*
* Convert INT128 to int64 (losing any high-order bits).
* This also works fine for casting down to uint64.
*/
static inline int64
int128_to_int64(INT128 val)
{
return (int64) val.lo;
}

#endif /* USE_NATIVE_INT128 */

#endif /* INT128_H */
Loading

0 comments on commit df1a699

Please sign in to comment.