Add API endpoint for receiving keys

Keys are now initially imported into a key_queue table, which is never
locked exclusively - allowing the API endpoint to function while the
brute forcer is running. The brute forcer moves all pending keys in the
queue to the keys table before running the actual brute forcing.

Signed-off-by: Graham <gpe@openrs2.org>
bzip2
Graham 3 years ago
parent df55c3ece3
commit 346302fc05
  1. 3
      archive/src/main/kotlin/org/openrs2/archive/key/JsonKeyDownloader.kt
  2. 84
      archive/src/main/kotlin/org/openrs2/archive/key/KeyBruteForcer.kt
  3. 8
      archive/src/main/kotlin/org/openrs2/archive/key/KeyDownloader.kt
  4. 78
      archive/src/main/kotlin/org/openrs2/archive/key/KeyImporter.kt
  5. 9
      archive/src/main/kotlin/org/openrs2/archive/key/KeySource.kt
  6. 2
      archive/src/main/kotlin/org/openrs2/archive/key/OpenOsrsKeyDownloader.kt
  7. 2
      archive/src/main/kotlin/org/openrs2/archive/key/PolarKeyDownloader.kt
  8. 2
      archive/src/main/kotlin/org/openrs2/archive/key/RuneLiteKeyDownloader.kt
  9. 16
      archive/src/main/kotlin/org/openrs2/archive/web/KeysController.kt
  10. 2
      archive/src/main/kotlin/org/openrs2/archive/web/WebServer.kt
  11. 24
      archive/src/main/resources/org/openrs2/archive/migrations/V11__keys.sql

@ -12,9 +12,10 @@ import java.net.http.HttpResponse
import java.time.Duration
public abstract class JsonKeyDownloader(
source: KeySource,
private val client: HttpClient,
private val jsonKeyReader: JsonKeyReader
) : KeyDownloader {
) : KeyDownloader(source) {
override suspend fun download(url: String): Sequence<XteaKey> {
val request = HttpRequest.newBuilder(URI(url))
.GET()

@ -22,6 +22,88 @@ public class KeyBruteForcer @Inject constructor(
val uncompressedChecksum: Int
)
/*
* Copy XTEA keys from key_queue to keys. The queue exists so that we don't
* block the /keys API endpoint from working while the brute forcer is
* running.
*
* This has to be a different transaction as it needs to lock the keys
* table in EXCLUSIVE mode, but we want to downgrade that to SHARE mode as
* soon as possible. Locks can only be released on commit in Postgres.
*/
private suspend fun assignKeyIds() {
database.execute { connection ->
connection.prepareStatement(
"""
LOCK TABLE keys IN EXCLUSIVE MODE
""".trimIndent()
).use { stmt ->
stmt.execute()
}
connection.prepareStatement(
"""
CREATE TEMPORARY TABLE tmp_keys (
key xtea_key NOT NULL,
source key_source NOT NULL,
first_seen TIMESTAMPTZ NOT NULL,
last_seen TIMESTAMPTZ NOT NULL
) ON COMMIT DROP
""".trimIndent()
).use { stmt ->
stmt.execute()
}
connection.prepareStatement(
"""
INSERT INTO tmp_keys (key, source, first_seen, last_seen)
SELECT key, source, first_seen, last_seen
FROM key_queue
FOR UPDATE SKIP LOCKED
""".trimIndent()
).use { stmt ->
stmt.execute()
}
connection.prepareStatement(
"""
INSERT INTO keys (key)
SELECT t.key
FROM tmp_keys t
LEFT JOIN keys k ON k.key = t.key
WHERE k.key IS NULL
ON CONFLICT DO NOTHING
""".trimIndent()
).use { stmt ->
stmt.execute()
}
connection.prepareStatement(
"""
INSERT INTO key_sources AS s (key_id, source, first_seen, last_seen)
SELECT k.id, t.source, t.first_seen, t.last_seen
FROM tmp_keys t
JOIN keys k ON k.key = t.key
ON CONFLICT (key_id, source) DO UPDATE SET
first_seen = LEAST(s.first_seen, EXCLUDED.first_seen),
last_seen = GREATEST(s.last_seen, EXCLUDED.last_seen)
""".trimIndent()
).use { stmt ->
stmt.execute()
}
connection.prepareStatement(
"""
DELETE FROM key_queue k
USING tmp_keys t
WHERE k.key = t.key AND k.source = t.source
""".trimIndent()
).use { stmt ->
stmt.execute()
}
}
}
/*
* The code for writing to the containers and keys tables ensures that the
* row IDs are allocated monotonically (by forbidding any other
@ -64,6 +146,8 @@ public class KeyBruteForcer @Inject constructor(
* from the tables.
*/
public suspend fun bruteForce() {
assignKeyIds()
database.execute { connection ->
connection.prepareStatement(
"""

@ -2,7 +2,9 @@ package org.openrs2.archive.key
import org.openrs2.crypto.XteaKey
public interface KeyDownloader {
public suspend fun getMissingUrls(seenUrls: Set<String>): Set<String>
public suspend fun download(url: String): Sequence<XteaKey>
public abstract class KeyDownloader(
public val source: KeySource
) {
public abstract suspend fun getMissingUrls(seenUrls: Set<String>): Set<String>
public abstract suspend fun download(url: String): Sequence<XteaKey>
}

@ -6,6 +6,9 @@ import org.openrs2.db.Database
import java.nio.file.Files
import java.nio.file.Path
import java.sql.Connection
import java.sql.Types
import java.time.Instant
import java.time.ZoneOffset
import javax.inject.Inject
import javax.inject.Singleton
@ -15,6 +18,8 @@ public class KeyImporter @Inject constructor(
private val jsonKeyReader: JsonKeyReader,
private val downloaders: Set<KeyDownloader>
) {
private data class Key(val key: XteaKey, val source: KeySource)
public suspend fun import(path: Path) {
val keys = mutableSetOf<XteaKey>()
@ -43,10 +48,12 @@ public class KeyImporter @Inject constructor(
logger.info { "Importing ${keys.size} keys" }
import(keys)
import(keys, KeySource.DISK)
}
public suspend fun download() {
val now = Instant.now()
val seenUrls = database.execute { connection ->
connection.prepareStatement(
"""
@ -63,12 +70,14 @@ public class KeyImporter @Inject constructor(
}
}
val keys = mutableSetOf<XteaKey>()
val keys = mutableSetOf<Key>()
val urls = mutableSetOf<String>()
for (downloader in downloaders) {
for (url in downloader.getMissingUrls(seenUrls)) {
keys += downloader.download(url)
keys += downloader.download(url).map { key ->
Key(key, downloader.source)
}
urls += url
}
}
@ -89,68 +98,49 @@ public class KeyImporter @Inject constructor(
stmt.executeBatch()
}
import(connection, keys)
import(connection, keys, now)
}
}
public suspend fun import(keys: Iterable<XteaKey>) {
public suspend fun import(keys: Iterable<XteaKey>, source: KeySource) {
val now = Instant.now()
database.execute { connection ->
import(connection, keys)
import(connection, keys.map { key ->
Key(key, source)
}, now)
}
}
private fun import(connection: Connection, keys: Iterable<XteaKey>) {
connection.prepareStatement(
"""
LOCK TABLE keys IN EXCLUSIVE MODE
""".trimIndent()
).use { stmt ->
stmt.execute()
}
connection.prepareStatement(
"""
CREATE TEMPORARY TABLE tmp_keys (
key xtea_key PRIMARY KEY NOT NULL
) ON COMMIT DROP
""".trimIndent()
).use { stmt ->
stmt.execute()
}
private fun import(connection: Connection, keys: Iterable<Key>, now: Instant) {
val timestamp = now.atOffset(ZoneOffset.UTC)
connection.prepareStatement(
"""
INSERT INTO tmp_keys (key)
VALUES (ROW(?, ?, ?, ?))
INSERT INTO key_queue AS K (key, source, first_seen, last_seen)
VALUES (ROW(?, ?, ?, ?), ?::key_source, ?, ?)
ON CONFLICT (key, source) DO UPDATE SET
first_seen = LEAST(k.first_seen, EXCLUDED.first_seen),
last_seen = GREATEST(k.last_seen, EXCLUDED.last_seen)
""".trimIndent()
).use { stmt ->
for (key in keys) {
if (key.isZero) {
if (key.key.isZero) {
continue
}
stmt.setInt(1, key.k0)
stmt.setInt(2, key.k1)
stmt.setInt(3, key.k2)
stmt.setInt(4, key.k3)
stmt.setInt(1, key.key.k0)
stmt.setInt(2, key.key.k1)
stmt.setInt(3, key.key.k2)
stmt.setInt(4, key.key.k3)
stmt.setString(5, key.source.name.lowercase())
stmt.setObject(6, timestamp, Types.TIMESTAMP_WITH_TIMEZONE)
stmt.setObject(7, timestamp, Types.TIMESTAMP_WITH_TIMEZONE)
stmt.addBatch()
}
stmt.executeBatch()
}
connection.prepareStatement(
"""
INSERT INTO keys (key)
SELECT t.key
FROM tmp_keys t
LEFT JOIN keys k ON k.key = t.key
WHERE k.key IS NULL
ON CONFLICT DO NOTHING
""".trimIndent()
).use { stmt ->
stmt.execute()
}
}
private companion object {

@ -0,0 +1,9 @@
package org.openrs2.archive.key
public enum class KeySource {
API,
DISK,
OPENOSRS,
POLAR,
RUNELITE
}

@ -8,7 +8,7 @@ import javax.inject.Singleton
public class OpenOsrsKeyDownloader @Inject constructor(
client: HttpClient,
jsonKeyReader: JsonKeyReader
) : JsonKeyDownloader(client, jsonKeyReader) {
) : JsonKeyDownloader(KeySource.OPENOSRS, client, jsonKeyReader) {
override suspend fun getMissingUrls(seenUrls: Set<String>): Set<String> {
return setOf(ENDPOINT)
}

@ -18,7 +18,7 @@ import javax.inject.Singleton
public class PolarKeyDownloader @Inject constructor(
private val client: HttpClient,
jsonKeyReader: JsonKeyReader
) : JsonKeyDownloader(client, jsonKeyReader) {
) : JsonKeyDownloader(KeySource.POLAR, client, jsonKeyReader) {
override suspend fun getMissingUrls(seenUrls: Set<String>): Set<String> {
val request = HttpRequest.newBuilder(ENDPOINT)
.GET()

@ -17,7 +17,7 @@ import javax.inject.Singleton
public class RuneLiteKeyDownloader @Inject constructor(
private val client: HttpClient,
jsonKeyReader: JsonKeyReader
) : JsonKeyDownloader(client, jsonKeyReader) {
) : JsonKeyDownloader(KeySource.RUNELITE, client, jsonKeyReader) {
override suspend fun getMissingUrls(seenUrls: Set<String>): Set<String> {
val version = getVersion()
return setOf(getXteaEndpoint(version))

@ -1,14 +1,20 @@
package org.openrs2.archive.web
import io.ktor.application.ApplicationCall
import io.ktor.http.HttpStatusCode
import io.ktor.request.receive
import io.ktor.response.respond
import io.ktor.thymeleaf.ThymeleafContent
import org.openrs2.archive.key.KeyExporter
import org.openrs2.archive.key.KeyImporter
import org.openrs2.archive.key.KeySource
import org.openrs2.crypto.XteaKey
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
public class KeysController @Inject constructor(
private val importer: KeyImporter,
private val exporter: KeyExporter
) {
public suspend fun index(call: ApplicationCall) {
@ -17,6 +23,16 @@ public class KeysController @Inject constructor(
call.respond(ThymeleafContent("keys/index.html", mapOf("stats" to stats, "analysis" to analysis)))
}
public suspend fun import(call: ApplicationCall) {
val keys = call.receive<Array<IntArray>>().mapTo(mutableSetOf(), XteaKey::fromIntArray)
if (keys.isNotEmpty()) {
importer.import(keys, KeySource.API)
}
call.respond(HttpStatusCode.NoContent)
}
public suspend fun exportAll(call: ApplicationCall) {
call.respond(exporter.exportAll())
}

@ -13,6 +13,7 @@ import io.ktor.jackson.JacksonConverter
import io.ktor.response.respond
import io.ktor.response.respondRedirect
import io.ktor.routing.get
import io.ktor.routing.post
import io.ktor.routing.routing
import io.ktor.server.cio.CIO
import io.ktor.server.engine.embeddedServer
@ -83,6 +84,7 @@ public class WebServer @Inject constructor(
get("/caches/{id}/keys.zip") { cachesController.exportKeysZip(call) }
get("/caches/{id}/map.png") { cachesController.renderMap(call) }
get("/keys") { keysController.index(call) }
post("/keys") { keysController.import(call) }
get("/keys/all.json") { keysController.exportAll(call) }
get("/keys/valid.json") { keysController.exportValid(call) }
static("/static") { resources("/org/openrs2/archive/static") }

@ -0,0 +1,24 @@
-- @formatter:off
CREATE TYPE key_source AS ENUM (
'api',
'disk',
'openosrs',
'polar',
'runelite'
);
CREATE TABLE key_sources (
key_id BIGINT NOT NULL REFERENCES keys (id),
source key_source NOT NULL,
first_seen TIMESTAMPTZ NOT NULL,
last_seen TIMESTAMPTZ NOT NULL,
PRIMARY KEY (key_id, source)
);
CREATE TABLE key_queue (
key xtea_key NOT NULL,
source key_source NOT NULL,
first_seen TIMESTAMPTZ NOT NULL,
last_seen TIMESTAMPTZ NOT NULL,
PRIMARY KEY (key, source)
);
Loading…
Cancel
Save