@ -23,7 +23,6 @@ import java.sql.Connection
import java.sql.SQLException
import java.sql.Types
import java.time.Instant
import java.time.OffsetDateTime
import java.time.ZoneOffset
import javax.inject.Inject
import javax.inject.Singleton
@ -74,13 +73,25 @@ public class CacheImporter @Inject constructor(
public val versionTruncated : Boolean
) : Container ( compressed , uncompressed )
private enum class SourceType {
DISK ,
JS5REMOTE
}
public data class MasterIndexResult (
val masterIndexId : Int ,
val sourceId : Int ,
val indexes : List < ByteBuf ? >
)
public suspend fun import (
store : Store ,
game : String ,
build : Int ? ,
timestamp : Instant ? ,
name : String ? ,
description : String ?
description : String ? ,
url : String ?
) {
database . execute { connection ->
prepare ( connection )
@ -89,12 +100,25 @@ public class CacheImporter @Inject constructor(
// import master index
val masterIndex = createMasterIndex ( store )
try {
addMasterIndex ( connection , masterIndex , gameId , build , timestamp , name , description , overwrite = false )
val masterIndexId = try {
addMasterIndex ( connection , masterIndex )
} finally {
masterIndex . release ( )
}
// create source
val sourceId = addSource (
connection ,
SourceType . DISK ,
masterIndexId ,
gameId ,
build ,
timestamp ,
name ,
description ,
url
)
// import indexes
val indexes = arrayOfNulls < Js5Index > ( Js5Archive . ARCHIVESET )
val indexGroups = mutableListOf < Index > ( )
@ -106,7 +130,7 @@ public class CacheImporter @Inject constructor(
}
for ( index in indexGroups ) {
addIndex ( connection , index )
addIndex ( connection , sourceId , index )
}
} finally {
indexGroups . forEach ( Index :: release )
@ -127,7 +151,7 @@ public class CacheImporter @Inject constructor(
groups += group
if ( groups . size >= BATCH _SIZE ) {
addGroups ( connection , groups )
addGroups ( connection , sourceId , groups )
groups . forEach ( Group :: release )
groups . clear ( )
@ -136,7 +160,7 @@ public class CacheImporter @Inject constructor(
}
if ( groups . isNotEmpty ( ) ) {
addGroups ( connection , groups )
addGroups ( connection , sourceId , groups )
}
} finally {
groups . forEach ( Group :: release )
@ -151,7 +175,8 @@ public class CacheImporter @Inject constructor(
build : Int ? ,
timestamp : Instant ? ,
name : String ? ,
description : String ?
description : String ? ,
url : String ?
) {
Js5Compression . uncompress ( buf . slice ( ) ) . use { uncompressed ->
val masterIndex = MasterIndex ( Js5MasterIndex . read ( uncompressed . slice ( ) , format ) , buf , uncompressed )
@ -160,7 +185,8 @@ public class CacheImporter @Inject constructor(
prepare ( connection )
val gameId = getGameId ( connection , game )
addMasterIndex ( connection , masterIndex , gameId , build , timestamp , name , description , overwrite = false )
val masterIndexId = addMasterIndex ( connection , masterIndex )
addSource ( connection , SourceType . DISK , masterIndexId , gameId , build , timestamp , name , description , url )
}
}
}
@ -172,9 +198,8 @@ public class CacheImporter @Inject constructor(
gameId : Int ,
build : Int ,
lastId : Int ? ,
timestamp : Instant ,
name : String ,
) : Pair < Int , List < ByteBuf ? > > {
timestamp : Instant
) : MasterIndexResult {
return database . execute { connection ->
prepare ( connection )
@ -191,15 +216,21 @@ public class CacheImporter @Inject constructor(
stmt . execute ( )
}
val id = addMasterIndex (
val masterIndexId = addMasterIndex (
connection ,
MasterIndex ( masterIndex , buf , uncompressed )
)
val sourceId = addSource (
connection ,
MasterIndex ( masterIndex , buf , uncompressed ) ,
SourceType . JS5REMOTE ,
masterIndexId ,
gameId ,
build ,
timestamp ,
name ,
name = " Original " ,
description = null ,
overwrite = true
url = null
)
/ *
@ -226,7 +257,7 @@ public class CacheImporter @Inject constructor(
""" .trimIndent()
) . use { stmt ->
stmt . setObject ( 1 , lastId , Types . INTEGER )
stmt . setInt ( 2 , i d)
stmt . setInt ( 2 , masterIndexI d)
stmt . executeQuery ( ) . use { rows ->
val indexes = mutableListOf < ByteBuf ? > ( )
@ -241,7 +272,7 @@ public class CacheImporter @Inject constructor(
}
indexes . filterNotNull ( ) . forEach ( ByteBuf :: retain )
return @execute Pair ( i d, indexes )
return @execute MasterIndexResult ( masterIndexId , sourceI d, indexes )
} finally {
indexes . filterNotNull ( ) . forEach ( ByteBuf :: release )
}
@ -251,6 +282,7 @@ public class CacheImporter @Inject constructor(
}
public suspend fun importIndexAndGetMissingGroups (
sourceId : Int ,
archive : Int ,
index : Js5Index ,
buf : ByteBuf ,
@ -259,7 +291,7 @@ public class CacheImporter @Inject constructor(
) : List < Int > {
return database . execute { connection ->
prepare ( connection )
val id = addIndex ( connection , Index ( archive , index , buf , uncompressed ) )
val id = addIndex ( connection , sourceId , Index ( archive , index , buf , uncompressed ) )
/ *
* In order to defend against ( crc32 , version ) collisions , we only
@ -304,14 +336,14 @@ public class CacheImporter @Inject constructor(
}
}
public suspend fun importGroups ( groups : List < Group > ) {
public suspend fun importGroups ( sourceId : Int , groups : List < Group > ) {
if ( groups . isEmpty ( ) ) {
return
}
database . execute { connection ->
prepare ( connection )
addGroups ( connection , groups )
addGroups ( connection , sourceId , groups )
}
}
@ -329,27 +361,15 @@ public class CacheImporter @Inject constructor(
private fun addMasterIndex (
connection : Connection ,
masterIndex : MasterIndex ,
gameId : Int ,
build : Int ? ,
timestamp : Instant ? ,
name : String ? ,
description : String ? ,
overwrite : Boolean
masterIndex : MasterIndex
) : Int {
val containerId = addContainer ( connection , masterIndex )
var masterIndexId : Int ? = null
var newTimestamp : Instant ?
var newName : String ?
var newDescription : String ?
connection . prepareStatement (
"""
SELECT id , game _id , timestamp , name , description
SELECT id
FROM master _indexes
WHERE container _id = ? AND format = ?: : master _index _format
FOR UPDATE
""" .trimIndent()
) . use { stmt ->
stmt . setLong ( 1 , containerId )
@ -357,161 +377,129 @@ public class CacheImporter @Inject constructor(
stmt . executeQuery ( ) . use { rows ->
if ( rows . next ( ) ) {
masterIndexId = rows . getInt ( 1 )
return rows . getInt ( 1 )
}
}
}
if ( masterIndexId != null ) {
val oldGameId = rows . getInt ( 2 )
val oldTimestamp : Instant ? = rows . getTimestamp ( 3 ) ?. toInstant ( )
val oldName : String ? = rows . getString ( 4 )
val oldDescription : String ? = rows . getString ( 5 )
val masterIndexId : Int
check ( oldGameId == gameId )
connection . prepareStatement (
"""
INSERT INTO master _indexes ( container _id , format )
VALUES ( ? , ?: : master _index _format )
RETURNING id
""" .trimIndent()
) . use { stmt ->
stmt . setLong ( 1 , containerId )
stmt . setString ( 2 , masterIndex . index . format . name . toLowerCase ( ) )
if ( oldTimestamp != null && timestamp != null ) {
newTimestamp = if ( oldTimestamp . isBefore ( timestamp ) ) {
oldTimestamp
} else {
timestamp
}
} else if ( oldTimestamp != null ) {
newTimestamp = oldTimestamp
} else {
newTimestamp = timestamp
}
stmt . executeQuery ( ) . use { rows ->
check ( rows . next ( ) )
masterIndexId = rows . getInt ( 1 )
}
}
if ( overwrite ) {
newName = name
} else if ( oldName != null && name != null && oldName != name ) {
newName = " $oldName / $name "
} else if ( oldName != null ) {
newName = oldName
} else {
newName = name
}
connection . prepareStatement (
"""
INSERT INTO master _index _archives (
master _index _id , archive _id , crc32 , version , whirlpool , groups , total _uncompressed _length
)
VALUES ( ? , ? , ? , ? , ? , ? , ? )
""" .trimIndent()
) . use { stmt ->
for ( ( i , entry ) in masterIndex . index . entries . withIndex ( ) ) {
stmt . setInt ( 1 , masterIndexId )
stmt . setInt ( 2 , i )
stmt . setInt ( 3 , entry . checksum )
if ( overwrite ) {
newDescription = description
} else if ( oldDescription != null && description != null && oldDescription != description ) {
newDescription = " $oldDescription \n \n $description "
} else if ( oldDescription != null ) {
newDescription = oldDescription
} else {
newDescription = description
}
if ( masterIndex . index . format >= MasterIndexFormat . VERSIONED ) {
stmt . setInt ( 4 , entry . version )
} else {
newTimestamp = timestamp
newName = name
newDescription = description
stmt . setInt ( 4 , 0 )
}
}
}
if ( masterIndexId != null ) {
connection . prepareStatement (
"""
UPDATE master _indexes
SET timestamp = ? , name = ? , description = ?
WHERE id = ?
""" .trimIndent()
) . use { stmt ->
if ( newTimestamp != null ) {
val offsetDateTime = OffsetDateTime . ofInstant ( newTimestamp , ZoneOffset . UTC )
stmt . setObject ( 1 , offsetDateTime , Types . TIMESTAMP _WITH _TIMEZONE )
if ( masterIndex . index . format >= MasterIndexFormat . DIGESTS ) {
stmt . setBytes ( 5 , entry . digest ?: ByteArray ( Whirlpool . DIGESTBYTES ) )
} else {
stmt . setNull ( 1 , Types . TIMESTAMP _WITH _TIMEZONE )
stmt . setNull ( 5 , Types . BINARY )
}
stmt . setString ( 2 , newName )
stmt . setString ( 3 , newDescription )
stmt . setInt ( 4 , masterIndexId !! )
stmt . execute ( )
}
} else {
connection . prepareStatement (
"""
INSERT INTO master _indexes ( container _id , format , game _id , timestamp , name , description )
VALUES ( ? , ?: : master _index _format , ? , ? , ? , ? )
RETURNING id
""" .trimIndent()
) . use { stmt ->
stmt . setLong ( 1 , containerId )
stmt . setString ( 2 , masterIndex . index . format . name . toLowerCase ( ) )
stmt . setInt ( 3 , gameId )
if ( newTimestamp != null ) {
val offsetDateTime = OffsetDateTime . ofInstant ( newTimestamp , ZoneOffset . UTC )
stmt . setObject ( 4 , offsetDateTime , Types . TIMESTAMP _WITH _TIMEZONE )
if ( masterIndex . index . format >= MasterIndexFormat . LENGTHS ) {
stmt . setInt ( 6 , entry . groups )
stmt . setInt ( 7 , entry . totalUncompressedLength )
} else {
stmt . setNull ( 4 , Types . TIMESTAMP _WITH _TIMEZONE )
stmt . setNull ( 6 , Types . INTEGER )
stmt . setNull ( 7 , Types . INTEGER )
}
stmt . setString ( 5 , newName )
stmt . setString ( 6 , newDescription )
stmt . executeQuery ( ) . use { rows ->
check ( rows . next ( ) )
masterIndexId = rows . getInt ( 1 )
}
stmt . addBatch ( )
}
stmt . executeBatch ( )
}
return masterIndexId
}
private fun addSource (
connection : Connection ,
type : SourceType ,
masterIndexId : Int ,
gameId : Int ,
build : Int ? ,
timestamp : Instant ? ,
name : String ? ,
description : String ? ,
url : String ?
) : Int {
if ( type == SourceType . JS5REMOTE && build != null ) {
connection . prepareStatement (
"""
INSERT INTO master _index _archives (
master _index _id , archive _id , crc32 , version , whirlpool , groups , total _uncompressed _length
)
VALUES ( ? , ? , ? , ? , ? , ? , ? )
SELECT id
FROM sources
WHERE type = ' js5remote ' AND master _index _id = ? AND game _id = ? AND build = ?
""" .trimIndent()
) . use { stmt ->
for ( ( i , entry ) in masterIndex . index . entries . withIndex ( ) ) {
stmt . setInt ( 1 , masterIndexId !! )
stmt . setInt ( 2 , i )
stmt . setInt ( 3 , entry . checksum )
if ( masterIndex . index . format >= MasterIndexFormat . VERSIONED ) {
stmt . setInt ( 4 , entry . version )
} else {
stmt . setInt ( 4 , 0 )
}
if ( masterIndex . index . format >= MasterIndexFormat . DIGESTS ) {
stmt . setBytes ( 5 , entry . digest ?: ByteArray ( Whirlpool . DIGESTBYTES ) )
} else {
stmt . setNull ( 5 , Types . BINARY )
}
stmt . setInt ( 1 , masterIndexId )
stmt . setInt ( 2 , gameId )
stmt . setInt ( 3 , build )
if ( masterIndex . index . format >= MasterIndexFormat . LENGTHS ) {
stmt . setInt ( 6 , entry . groups )
stmt . setInt ( 7 , entry . totalUncompressedLength )
} else {
stmt . setNull ( 6 , Types . INTEGER )
stmt . setNull ( 7 , Types . INTEGER )
stmt . executeQuery ( ) . use { rows ->
if ( rows . next ( ) ) {
return rows . getInt ( 1 )
}
stmt . addBatch ( )
}
stmt . executeBatch ( )
}
}
if ( build != null ) {
connection . prepareStatement (
"""
INSERT INTO master _index _builds ( master _index _id , build )
VALUES ( ? , ? )
ON CONFLICT DO NOTHING
""" .trimIndent()
) . use { stmt ->
stmt . setInt ( 1 , masterIndexId !! )
stmt . setInt ( 2 , build )
stmt . execute ( )
connection . prepareStatement (
"""
INSERT INTO sources ( type , master _index _id , game _id , build , timestamp , name , description , url )
VALUES ( ?: : source _type , ? , ? , ? , ? , ? , ? , ? )
RETURNING id
""" .trimIndent()
) . use { stmt ->
stmt . setString ( 1 , type . toString ( ) . toLowerCase ( ) )
stmt . setInt ( 2 , masterIndexId )
stmt . setInt ( 3 , gameId )
stmt . setObject ( 4 , build , Types . INTEGER )
if ( timestamp != null ) {
stmt . setObject ( 5 , timestamp . atOffset ( ZoneOffset . UTC ) , Types . TIMESTAMP _WITH _TIMEZONE )
} else {
stmt . setNull ( 5 , Types . TIMESTAMP _WITH _TIMEZONE )
}
}
return masterIndexId !!
stmt . setString ( 6 , name )
stmt . setString ( 7 , description )
stmt . setString ( 8 , url )
stmt . executeQuery ( ) . use { rows ->
check ( rows . next ( ) )
return rows . getInt ( 1 )
}
}
}
private fun readGroup ( store : Store , archive : Int , index : Js5Index ? , group : Int ) : Group ? {
@ -547,7 +535,7 @@ public class CacheImporter @Inject constructor(
}
}
private fun addGroups ( connection : Connection , groups : List < Group > ) : List < Long > {
private fun addGroups ( connection : Connection , sourceId : Int , groups : List < Group > ) : List < Long > {
val containerIds = addContainers ( connection , groups )
connection . prepareStatement (
@ -569,11 +557,31 @@ public class CacheImporter @Inject constructor(
stmt . executeBatch ( )
}
connection . prepareStatement (
"""
INSERT INTO source _groups ( source _id , archive _id , group _id , version , version _truncated , container _id )
VALUES ( ? , ? , ? , ? , ? , ? )
ON CONFLICT DO NOTHING
""" .trimIndent()
) . use { stmt ->
for ( ( i , group ) in groups . withIndex ( ) ) {
stmt . setInt ( 1 , sourceId )
stmt . setInt ( 2 , group . archive )
stmt . setInt ( 3 , group . group )
stmt . setInt ( 4 , group . version )
stmt . setBoolean ( 5 , group . versionTruncated )
stmt . setLong ( 6 , containerIds [ i ] )
stmt . addBatch ( )
}
stmt . executeBatch ( )
}
return containerIds
}
private fun addGroup ( connection : Connection , group : Group ) : Long {
return addGroups ( connection , listOf ( group ) ) . single ( )
private fun addGroup ( connection : Connection , sourceId : Int , group : Group ) : Long {
return addGroups ( connection , sourceId , listOf ( group ) ) . single ( )
}
private fun readIndex ( store : Store , archive : Int ) : Index {
@ -584,8 +592,8 @@ public class CacheImporter @Inject constructor(
}
}
private fun addIndex ( connection : Connection , index : Index ) : Long {
val containerId = addGroup ( connection , index )
private fun addIndex ( connection : Connection , sourceId : Int , index : Index ) : Long {
val containerId = addGroup ( connection , sourceId , index )
val savepoint = connection . setSavepoint ( )
connection . prepareStatement (