forked from openrs2/openrs2
parent
ec4f8b59c9
commit
3a39639e3d
@ -0,0 +1,31 @@ |
||||
plugins { |
||||
`maven-publish` |
||||
kotlin("jvm") |
||||
} |
||||
|
||||
dependencies { |
||||
api("org.jetbrains.kotlinx:kotlinx-coroutines-core:${Versions.kotlinCoroutines}") |
||||
|
||||
implementation("com.google.guava:guava:${Versions.guava}") |
||||
|
||||
testImplementation("com.h2database:h2:${Versions.h2}") |
||||
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:${Versions.kotlinCoroutines}") |
||||
} |
||||
|
||||
publishing { |
||||
publications.create<MavenPublication>("maven") { |
||||
from(components["java"]) |
||||
|
||||
pom { |
||||
packaging = "jar" |
||||
name.set("OpenRS2 Database") |
||||
description.set( |
||||
""" |
||||
A thin layer on top of the JDBC API that enforces the use of |
||||
transactions, automatically retrying them on deadlock, and |
||||
provides coroutine integration. |
||||
""".trimIndent() |
||||
) |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,16 @@ |
||||
package dev.openrs2.db |
||||
|
||||
/** |
||||
* A functional interface for calculating the delay before a transaction is |
||||
* retried due to deadlock. |
||||
*/ |
||||
public fun interface BackoffStrategy { |
||||
/** |
||||
* Returns the delay in milliseconds to wait for after the given |
||||
* transaction [attempt] number. |
||||
* @param attempt the attempt number, starting at 0 to compute the delay |
||||
* after the first failed attempt. |
||||
* @return the delay in milliseconds. |
||||
*/ |
||||
public fun getDelay(attempt: Int): Long |
||||
} |
@ -0,0 +1,35 @@ |
||||
package dev.openrs2.db |
||||
|
||||
import com.google.common.math.IntMath |
||||
import java.util.concurrent.ThreadLocalRandom |
||||
import kotlin.math.min |
||||
|
||||
/** |
||||
* A [BackoffStrategy] that implements binary exponential backoff. It returns a |
||||
* delay between `0` and `2**c - 1` inclusive, where `c` is the attempt |
||||
* number (starting at 1 for the delay after the first attempt). `c` is clamped |
||||
* at [cMax]. The delay is scaled by [scale] milliseconds. |
||||
*/ |
||||
public class BinaryExponentialBackoffStrategy( |
||||
/** |
||||
* The maximum value of `c` (inclusive). Must be greater than zero. |
||||
*/ |
||||
private val cMax: Int, |
||||
|
||||
/** |
||||
* The scale, in milliseconds. Must be greater than zero (use |
||||
* [FixedBackoffStrategy] for a delay that is always zero). |
||||
*/ |
||||
private val scale: Long |
||||
) : BackoffStrategy { |
||||
init { |
||||
require(cMax >= 1 && scale >= 1) |
||||
} |
||||
|
||||
override fun getDelay(attempt: Int): Long { |
||||
require(attempt >= 0) |
||||
|
||||
val bound = IntMath.pow(2, min(attempt + 1, cMax)) |
||||
return ThreadLocalRandom.current().nextInt(bound).toLong() * scale |
||||
} |
||||
} |
@ -0,0 +1,131 @@ |
||||
package dev.openrs2.db |
||||
|
||||
import kotlinx.coroutines.Dispatchers |
||||
import kotlinx.coroutines.delay |
||||
import java.sql.Connection |
||||
import java.sql.SQLException |
||||
import javax.sql.DataSource |
||||
|
||||
/** |
||||
* A thin layer on top of the JDBC API that enforces the use of transactions, |
||||
* automatically retrying them on deadlock, and provides coroutine integration. |
||||
* |
||||
* Connection pooling is not provided by this library. A separate connection |
||||
* pooling library (such as HikariCP or the functionality built into your |
||||
* database driver) should be used in combination with this library. |
||||
*/ |
||||
public class Database( |
||||
/** |
||||
* The [DataSource] used to obtain [Connection] objects. A new connection |
||||
* object is opened for each transaction attempt, so a pooled [DataSource] |
||||
* should be used. |
||||
*/ |
||||
private val dataSource: DataSource, |
||||
|
||||
/** |
||||
* The [DeadlockDetector] used to determine if a transaction should be |
||||
* retried. Defaults to [DefaultDeadlockDetector], which is vendor-neutral |
||||
* but not very efficient (any error causes a transaction to be retried). |
||||
* One of the vendor-specific implementations should be used instead for |
||||
* optimal performance. |
||||
*/ |
||||
private val deadlockDetector: DeadlockDetector = DefaultDeadlockDetector, |
||||
|
||||
/** |
||||
* The [BackoffStrategy] used to determine how long to wait between |
||||
* transaction attempts. Defaults to a [BinaryExponentialBackoffStrategy] |
||||
* with cMax=8 and scale=10 milliseconds. |
||||
*/ |
||||
private val backoffStrategy: BackoffStrategy = DEFAULT_BACKOFF_STRATEGY, |
||||
|
||||
/** |
||||
* The nubmer of times to try executing a transaction before giving up. |
||||
* Defaults to 5. Must be greater than zero. |
||||
*/ |
||||
private val attempts: Int = DEFAULT_ATTEMPTS |
||||
) { |
||||
init { |
||||
require(attempts >= 1) |
||||
} |
||||
|
||||
/** |
||||
* Executes a [Transaction]. If the transaction fails due to deadlock, it |
||||
* is retried up to [attempts] times in total (including the first |
||||
* attempt). |
||||
* |
||||
* The coroutine is suspended for a delay between each attempt. |
||||
* |
||||
* The JDBC calls will block the thread the coroutine is scheduled on. This |
||||
* function should therefore be called within a context that uses the |
||||
* [Dispatchers.IO] dispatcher. |
||||
* @param transaction the transaction. |
||||
* @return the result returned by [Transaction.execute]. |
||||
*/ |
||||
public suspend fun <T> execute(transaction: Transaction<T>): T { |
||||
for (attempt in 0 until attempts) { |
||||
try { |
||||
return executeOnce(transaction) |
||||
} catch (t: Throwable) { |
||||
if (isDeadlock(t) && attempt != attempts - 1) { |
||||
val backoff = backoffStrategy.getDelay(attempt) |
||||
delay(backoff) |
||||
continue |
||||
} |
||||
|
||||
throw t |
||||
} |
||||
} |
||||
|
||||
throw AssertionError() |
||||
} |
||||
|
||||
private fun <T> executeOnce(transaction: Transaction<T>): T { |
||||
dataSource.connection.use { connection -> |
||||
val oldAutoCommit = connection.autoCommit |
||||
connection.autoCommit = false |
||||
|
||||
try { |
||||
try { |
||||
val result = transaction.execute(connection) |
||||
connection.commit() |
||||
return result |
||||
} finally { |
||||
connection.rollback() |
||||
} |
||||
} finally { |
||||
connection.autoCommit = oldAutoCommit |
||||
} |
||||
} |
||||
} |
||||
|
||||
private fun isDeadlock(t: Throwable): Boolean { |
||||
if (t is SQLException) { |
||||
return isDeadlock(t) |
||||
} |
||||
|
||||
val cause = t.cause |
||||
if (cause != null) { |
||||
return isDeadlock(cause) |
||||
} |
||||
|
||||
return false |
||||
} |
||||
|
||||
private fun isDeadlock(ex: SQLException): Boolean { |
||||
if (deadlockDetector.isDeadlock(ex)) { |
||||
return true |
||||
} |
||||
|
||||
val next = ex.nextException |
||||
if (next != null) { |
||||
return isDeadlock(next) |
||||
} |
||||
|
||||
return false |
||||
} |
||||
|
||||
private companion object { |
||||
private const val DEFAULT_ATTEMPTS = 5 |
||||
private val DEFAULT_BACKOFF_STRATEGY = BinaryExponentialBackoffStrategy(8, 10) |
||||
} |
||||
} |
@ -0,0 +1,21 @@ |
||||
package dev.openrs2.db |
||||
|
||||
import java.sql.SQLException |
||||
|
||||
/** |
||||
* A functional interface for checking if an [SQLException] represents a |
||||
* deadlock. There is no standard mechanism for representing deadlock errors, |
||||
* so vendor-specific implementations are required. |
||||
* |
||||
* The [Database] class already examines the entire [Throwable.cause] and |
||||
* [SQLException.next] chain, so implementations only need to examine the |
||||
* individual [SQLException] passed to them. |
||||
*/ |
||||
public fun interface DeadlockDetector { |
||||
/** |
||||
* Determines whether the [SQLException] was caused by a deadlock or not. |
||||
* @param ex the [SQLException]. |
||||
* @return `true` if so, `false` otherwise. |
||||
*/ |
||||
public fun isDeadlock(ex: SQLException): Boolean |
||||
} |
@ -0,0 +1,15 @@ |
||||
package dev.openrs2.db |
||||
|
||||
import java.sql.SQLException |
||||
|
||||
/** |
||||
* A vendor-neutral [DeadlockDetector], which considers every [SQLException] to |
||||
* be a deadlock. One of the vendor-specific implementations should be used if |
||||
* possible, which will prevent non-deadlock errors (such as unique constraint |
||||
* violations) from being needlessly retried. |
||||
*/ |
||||
public object DefaultDeadlockDetector : DeadlockDetector { |
||||
override fun isDeadlock(ex: SQLException): Boolean { |
||||
return true |
||||
} |
||||
} |
@ -0,0 +1,25 @@ |
||||
package dev.openrs2.db |
||||
|
||||
/** |
||||
* A [BackoffStrategy] with a fixed delay. |
||||
* |
||||
* It permits a delay of zero, which is appropriate for use with database |
||||
* servers that allow one of the deadlocked connections to proceed (thus |
||||
* guaranteeing forward progress) and where you only expect a small amount of |
||||
* lock contention. |
||||
*/ |
||||
public class FixedBackoffStrategy( |
||||
/** |
||||
* The delay in milliseconds. Must be zero or positive. |
||||
*/ |
||||
private val delay: Long |
||||
) : BackoffStrategy { |
||||
init { |
||||
delay >= 0 |
||||
} |
||||
|
||||
override fun getDelay(attempt: Int): Long { |
||||
require(attempt >= 0) |
||||
return delay |
||||
} |
||||
} |
@ -0,0 +1,16 @@ |
||||
package dev.openrs2.db |
||||
|
||||
import java.sql.SQLException |
||||
|
||||
/** |
||||
* A [DeadlockDetector] implementation for H2. |
||||
*/ |
||||
public object H2DeadlockDetector : DeadlockDetector { |
||||
private const val DEADLOCK_1 = 40001 |
||||
private const val LOCK_TIMEOUT_1 = 50200 |
||||
|
||||
override fun isDeadlock(ex: SQLException): Boolean { |
||||
// see https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html |
||||
return ex.errorCode == DEADLOCK_1 || ex.errorCode == LOCK_TIMEOUT_1 |
||||
} |
||||
} |
@ -0,0 +1,16 @@ |
||||
package dev.openrs2.db |
||||
|
||||
import java.sql.SQLException |
||||
|
||||
/** |
||||
* A [DeadlockDetector] implementation for MySQL and MariaDB. |
||||
*/ |
||||
public object MysqlDeadlockDetector : DeadlockDetector { |
||||
private const val LOCK_WAIT_TIMEOUT = 1205 |
||||
private const val LOCK_DEADLOCK = 1213 |
||||
|
||||
override fun isDeadlock(ex: SQLException): Boolean { |
||||
// see https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html |
||||
return ex.errorCode == LOCK_WAIT_TIMEOUT || ex.errorCode == LOCK_DEADLOCK |
||||
} |
||||
} |
@ -0,0 +1,17 @@ |
||||
package dev.openrs2.db |
||||
|
||||
import java.sql.SQLException |
||||
|
||||
/** |
||||
* A [DeadlockDetector] implementation for PostgreSQL. |
||||
*/ |
||||
public object PostgresDeadlockDetector : DeadlockDetector { |
||||
private const val SERIALIZATION_FAILURE = "40001" |
||||
private const val DEADLOCK_DETECTED = "40P01" |
||||
|
||||
override fun isDeadlock(ex: SQLException): Boolean { |
||||
// see https://www.postgresql.org/docs/current/errcodes-appendix.html |
||||
val sqlState = ex.sqlState ?: return false |
||||
return sqlState == SERIALIZATION_FAILURE || sqlState == DEADLOCK_DETECTED |
||||
} |
||||
} |
@ -0,0 +1,32 @@ |
||||
package dev.openrs2.db |
||||
|
||||
import java.sql.SQLException |
||||
|
||||
/** |
||||
* A [DeadlockDetector] implementation for SQLite. |
||||
*/ |
||||
public object SqliteDeadlockDetector : DeadlockDetector { |
||||
private const val BUSY = 5 |
||||
private const val LOCKED = 6 |
||||
|
||||
override fun isDeadlock(ex: SQLException): Boolean { |
||||
/* |
||||
* https://www.sqlite.org/rescode.html documents the meaning of the |
||||
* error codes. |
||||
* |
||||
* SQLITE_BUSY (5) is similar to a lock wait timeout, so it is |
||||
* desirable to retry if we encounter it. Furthermore, in WAL mode, it |
||||
* can be thrown immediately if a reader and writer deadlock. |
||||
* |
||||
* SQLITE_LOCKED (6) is normally only caused by conflicts within the |
||||
* same connection, which will presumably happen every time we |
||||
* re-attempt the transaction. However, there is an edge case which |
||||
* makes retrying desirable: the error can be caused by a conflict with |
||||
* another connection if a shared cache is used. |
||||
* |
||||
* SQLITE_PROTOCOL (15) has its own built-in retry/backoff logic, so I |
||||
* have omitted it from this check. |
||||
*/ |
||||
return ex.errorCode == BUSY || ex.errorCode == LOCKED |
||||
} |
||||
} |
@ -0,0 +1,23 @@ |
||||
package dev.openrs2.db |
||||
|
||||
import java.sql.Connection |
||||
|
||||
/** |
||||
* A functional interface representing a single database transation. |
||||
* @param T the result type. Use [Unit] if the transaction does not return a |
||||
* result. |
||||
*/ |
||||
public fun interface Transaction<T> { |
||||
/** |
||||
* Executes the transaction on the given [connection]. It is not necessary |
||||
* to implement commit or rollback logic yourself. |
||||
* |
||||
* The transaction may be called multiple times if a deadlock occurs, so |
||||
* care needs to be taken if the transaction has any application-level side |
||||
* effects. |
||||
* @param connection the database connection, which is only valid for the |
||||
* duration of the transaction. |
||||
* @return the result. |
||||
*/ |
||||
public fun execute(connection: Connection): T |
||||
} |
@ -0,0 +1,35 @@ |
||||
package dev.openrs2.db |
||||
|
||||
import org.junit.jupiter.api.assertThrows |
||||
import kotlin.test.Test |
||||
import kotlin.test.assertEquals |
||||
|
||||
object BackoffStrategyTest { |
||||
@Test |
||||
fun testFixedBackoff() { |
||||
val strategy = FixedBackoffStrategy(1000) |
||||
|
||||
assertEquals(1000, strategy.getDelay(0)) |
||||
assertEquals(1000, strategy.getDelay(1)) |
||||
assertEquals(1000, strategy.getDelay(2)) |
||||
|
||||
assertThrows<IllegalArgumentException> { |
||||
strategy.getDelay(-1) |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
fun testBinaryExponentialBackoff() { |
||||
assertThrows<IllegalArgumentException> { |
||||
BinaryExponentialBackoffStrategy(0, 1) |
||||
} |
||||
|
||||
assertThrows<IllegalArgumentException> { |
||||
BinaryExponentialBackoffStrategy(1, 0) |
||||
} |
||||
|
||||
assertThrows<IllegalArgumentException> { |
||||
BinaryExponentialBackoffStrategy(1, 1).getDelay(-1) |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,185 @@ |
||||
package dev.openrs2.db |
||||
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi |
||||
import kotlinx.coroutines.test.runBlockingTest |
||||
import org.h2.jdbcx.JdbcDataSource |
||||
import org.junit.jupiter.api.assertThrows |
||||
import java.sql.SQLException |
||||
import kotlin.test.Test |
||||
import kotlin.test.assertEquals |
||||
import kotlin.test.assertTrue |
||||
|
||||
@ExperimentalCoroutinesApi |
||||
object DatabaseTest { |
||||
private class TestException : Exception { |
||||
constructor() : super() |
||||
constructor(cause: Throwable) : super(cause) |
||||
} |
||||
|
||||
private class DeadlockException : SQLException(null, null, 40001) |
||||
private class NonDeadlockException : SQLException() |
||||
|
||||
private const val DELAY = 10L |
||||
private const val ATTEMPTS = 5 |
||||
|
||||
private val dataSource = JdbcDataSource().apply { |
||||
setUrl("jdbc:h2:mem:") |
||||
} |
||||
|
||||
private val database = Database( |
||||
dataSource, |
||||
deadlockDetector = H2DeadlockDetector, |
||||
backoffStrategy = FixedBackoffStrategy(DELAY), |
||||
attempts = ATTEMPTS |
||||
) |
||||
|
||||
@Test |
||||
fun testBounds() { |
||||
assertThrows<IllegalArgumentException> { |
||||
Database(dataSource, attempts = 0) |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
fun testSuccessful() = runBlockingTest { |
||||
val start = currentTime |
||||
|
||||
val result = database.execute { connection -> |
||||
connection.prepareStatement("VALUES 12345").use { stmt -> |
||||
val rows = stmt.executeQuery() |
||||
assertTrue(rows.next()) |
||||
return@execute rows.getInt(1) |
||||
} |
||||
} |
||||
|
||||
assertEquals(12345, result) |
||||
|
||||
val elapsed = currentTime - start |
||||
assertEquals(0, elapsed) |
||||
} |
||||
|
||||
@Test |
||||
fun testDeadlockRetry() = runBlockingTest { |
||||
var attempts = 0 |
||||
val start = currentTime |
||||
|
||||
val result = database.execute { connection -> |
||||
if (attempts++ == 0) { |
||||
throw DeadlockException() |
||||
} |
||||
|
||||
connection.prepareStatement("VALUES 12345").use { stmt -> |
||||
val rows = stmt.executeQuery() |
||||
assertTrue(rows.next()) |
||||
return@execute rows.getInt(1) |
||||
} |
||||
} |
||||
|
||||
assertEquals(12345, result) |
||||
assertEquals(2, attempts) |
||||
|
||||
val elapsed = currentTime - start |
||||
assertEquals(10, elapsed) |
||||
} |
||||
|
||||
@Test |
||||
fun testDeadlockFailure() { |
||||
var attempts = 0 |
||||
|
||||
assertThrows<DeadlockException> { |
||||
runBlockingTest { |
||||
database.execute<Unit> { |
||||
attempts++ |
||||
throw DeadlockException() |
||||
} |
||||
} |
||||
} |
||||
|
||||
assertEquals(ATTEMPTS, attempts) |
||||
} |
||||
|
||||
@Test |
||||
fun testNonDeadlockFailure() { |
||||
var attempts = 0 |
||||
|
||||
assertThrows<TestException> { |
||||
runBlockingTest { |
||||
database.execute<Unit> { |
||||
attempts++ |
||||
throw TestException() |
||||
} |
||||
} |
||||
} |
||||
|
||||
assertEquals(1, attempts) |
||||
} |
||||
|
||||
@Test |
||||
fun testDeadlockCauseChain() = runBlockingTest { |
||||
var attempts = 0 |
||||
val start = currentTime |
||||
|
||||
val result = database.execute { connection -> |
||||
if (attempts++ == 0) { |
||||
throw TestException(DeadlockException()) |
||||
} |
||||
|
||||
connection.prepareStatement("VALUES 12345").use { stmt -> |
||||
val rows = stmt.executeQuery() |
||||
assertTrue(rows.next()) |
||||
return@execute rows.getInt(1) |
||||
} |
||||
} |
||||
|
||||
assertEquals(12345, result) |
||||
assertEquals(2, attempts) |
||||
|
||||
val elapsed = currentTime - start |
||||
assertEquals(10, elapsed) |
||||
} |
||||
|
||||
@Test |
||||
fun testDeadlockNextChain() = runBlockingTest { |
||||
var attempts = 0 |
||||
val start = currentTime |
||||
|
||||
val result = database.execute { connection -> |
||||
if (attempts++ == 0) { |
||||
val ex = SQLException() |
||||
ex.nextException = DeadlockException() |
||||
throw ex |
||||
} |
||||
|
||||
connection.prepareStatement("VALUES 12345").use { stmt -> |
||||
val rows = stmt.executeQuery() |
||||
assertTrue(rows.next()) |
||||
return@execute rows.getInt(1) |
||||
} |
||||
} |
||||
|
||||
assertEquals(12345, result) |
||||
assertEquals(2, attempts) |
||||
|
||||
val elapsed = currentTime - start |
||||
assertEquals(10, elapsed) |
||||
} |
||||
|
||||
@Test |
||||
fun testNonDeadlockNextChain() { |
||||
var attempts = 0 |
||||
|
||||
assertThrows<NonDeadlockException> { |
||||
runBlockingTest { |
||||
database.execute<Unit> { |
||||
attempts++ |
||||
|
||||
val ex = NonDeadlockException() |
||||
ex.nextException = SQLException() |
||||
throw ex |
||||
} |
||||
} |
||||
} |
||||
|
||||
assertEquals(1, attempts) |
||||
} |
||||
} |
@ -0,0 +1,41 @@ |
||||
package dev.openrs2.db |
||||
|
||||
import java.sql.SQLException |
||||
import kotlin.test.Test |
||||
import kotlin.test.assertFalse |
||||
import kotlin.test.assertTrue |
||||
|
||||
object DeadlockDetectorTest { |
||||
@Test |
||||
fun testDefault() { |
||||
assertTrue(DefaultDeadlockDetector.isDeadlock(SQLException())) |
||||
} |
||||
|
||||
@Test |
||||
fun testH2() { |
||||
assertFalse(H2DeadlockDetector.isDeadlock(SQLException())) |
||||
assertTrue(H2DeadlockDetector.isDeadlock(SQLException(null, null, 40001))) |
||||
assertTrue(H2DeadlockDetector.isDeadlock(SQLException(null, null, 50200))) |
||||
} |
||||
|
||||
@Test |
||||
fun testMysql() { |
||||
assertFalse(MysqlDeadlockDetector.isDeadlock(SQLException())) |
||||
assertTrue(MysqlDeadlockDetector.isDeadlock(SQLException(null, null, 1205))) |
||||
assertTrue(MysqlDeadlockDetector.isDeadlock(SQLException(null, null, 1213))) |
||||
} |
||||
|
||||
@Test |
||||
fun testPostgres() { |
||||
assertFalse(PostgresDeadlockDetector.isDeadlock(SQLException())) |
||||
assertTrue(PostgresDeadlockDetector.isDeadlock(SQLException(null, "40001"))) |
||||
assertTrue(PostgresDeadlockDetector.isDeadlock(SQLException(null, "40P01"))) |
||||
} |
||||
|
||||
@Test |
||||
fun testSqlite() { |
||||
assertFalse(SqliteDeadlockDetector.isDeadlock(SQLException())) |
||||
assertTrue(SqliteDeadlockDetector.isDeadlock(SQLException(null, null, 5))) |
||||
assertTrue(SqliteDeadlockDetector.isDeadlock(SQLException(null, null, 6))) |
||||
} |
||||
} |
Loading…
Reference in new issue