Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Sources/SQLiteData/CloudKit/Internal/Metadatabase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,30 @@
)
.execute(db)
}
migrator.registerMigration("Replace _isDeleted with _pendingStatus") { db in
try #sql(
"""
ALTER TABLE "\(raw: .sqliteDataCloudKitSchemaName)_metadata"
ADD COLUMN "_pendingStatus" INTEGER
"""
)
.execute(db)
try #sql(
"""
UPDATE "\(raw: .sqliteDataCloudKitSchemaName)_metadata"
SET "_pendingStatus" = 0
WHERE "_isDeleted" = 1
"""
)
.execute(db)
try #sql(
"""
ALTER TABLE "\(raw: .sqliteDataCloudKitSchemaName)_metadata"
DROP COLUMN "_isDeleted"
"""
)
.execute(db)
}
#if DEBUG
try metadatabase.read { db in
let hasSchemaChanges = try migrator.hasSchemaChanges(db)
Expand Down
60 changes: 46 additions & 14 deletions Sources/SQLiteData/CloudKit/Internal/MockSyncEngine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@
package final class MockSyncEngineState: CKSyncEngineStateProtocol {
package let changeTag = LockIsolated(0)
package let _pendingRecordZoneChanges = LockIsolated<
OrderedSet<CKSyncEngine.PendingRecordZoneChange>
>([]
OrderedDictionary<CKRecord.ID, CKSyncEngine.PendingRecordZoneChange>
>([:]
)
package let _pendingDatabaseChanges = LockIsolated<
OrderedSet<CKSyncEngine.PendingDatabaseChange>
>([])
OrderedDictionary<CKRecordZone.ID, CKSyncEngine.PendingDatabaseChange>
>([:])
private let fileID: StaticString
private let filePath: StaticString
private let line: UInt
Expand All @@ -160,11 +160,11 @@
}

package var pendingRecordZoneChanges: [CKSyncEngine.PendingRecordZoneChange] {
_pendingRecordZoneChanges.withValue { Array($0) }
_pendingRecordZoneChanges.withValue { Array($0.values) }
}

package var pendingDatabaseChanges: [CKSyncEngine.PendingDatabaseChange] {
_pendingDatabaseChanges.withValue { Array($0) }
_pendingDatabaseChanges.withValue { Array($0.values) }
}

package func removePendingChanges() {
Expand All @@ -173,26 +173,58 @@
}

package func add(pendingRecordZoneChanges: [CKSyncEngine.PendingRecordZoneChange]) {
self._pendingRecordZoneChanges.withValue {
$0.append(contentsOf: pendingRecordZoneChanges)
self._pendingRecordZoneChanges.withValue { dict in
for change in pendingRecordZoneChanges {
switch change {
case .saveRecord(let id), .deleteRecord(let id):
dict.updateValue(change, forKey: id)
@unknown default:
fatalError("Unsupported pendingRecordZoneChange: \(change)")
}
}
}
}

package func remove(pendingRecordZoneChanges: [CKSyncEngine.PendingRecordZoneChange]) {
self._pendingRecordZoneChanges.withValue {
$0.subtract(pendingRecordZoneChanges)
self._pendingRecordZoneChanges.withValue { dict in
for change in pendingRecordZoneChanges {
switch change {
case .saveRecord(let id), .deleteRecord(let id):
if dict[id] == change { dict.removeValue(forKey: id) }
@unknown default:
fatalError("Unsupported pendingRecordZoneChange: \(change)")
}
}
}
}

package func add(pendingDatabaseChanges: [CKSyncEngine.PendingDatabaseChange]) {
self._pendingDatabaseChanges.withValue {
$0.append(contentsOf: pendingDatabaseChanges)
self._pendingDatabaseChanges.withValue { dict in
for change in pendingDatabaseChanges {
switch change {
case .saveZone(let zone):
dict.updateValue(change, forKey: zone.zoneID)
case .deleteZone(let zoneID):
dict.updateValue(change, forKey: zoneID)
@unknown default:
fatalError("Unsupported pendingDatabaseChange: \(change)")
}
}
}
}

package func remove(pendingDatabaseChanges: [CKSyncEngine.PendingDatabaseChange]) {
self._pendingDatabaseChanges.withValue {
$0.subtract(pendingDatabaseChanges)
self._pendingDatabaseChanges.withValue { dict in
for change in pendingDatabaseChanges {
switch change {
case .saveZone(let zone):
if dict[zone.zoneID] == change { dict.removeValue(forKey: zone.zoneID) }
case .deleteZone(let zoneID):
if dict[zoneID] == change { dict.removeValue(forKey: zoneID) }
@unknown default:
fatalError("Unsupported pendingDatabaseChange: \(change)")
}
}
}
}
}
Expand Down
48 changes: 43 additions & 5 deletions Sources/SQLiteData/CloudKit/Internal/Triggers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
$0.recordPrimaryKey.eq(#sql("\(old.primaryKey)"))
&& $0.recordType.eq(tableName)
}
.update { $0._isDeleted = true }
.update { $0._pendingStatus = #bind(.deleted) }
} when: { old, new in
old.primaryKey.neq(new.primaryKey)
}
Expand Down Expand Up @@ -82,6 +82,16 @@
defaultZone: defaultZone,
privateTables: privateTables
)
SyncMetadata
.where {
$0.recordPrimaryKey.eq(#sql("\(new.primaryKey)"))
&& $0.recordType.eq(tableName)
&& $0._pendingStatus.eq(PendingStatus.deleted)
}
.update {
$0._pendingStatus = #bind(.reinserted)
$0.userModificationTime = $currentTime()
}
}
)
}
Expand Down Expand Up @@ -139,7 +149,7 @@
$0.recordPrimaryKey.eq(#sql("\(old.primaryKey)"))
&& $0.recordType.eq(tableName)
}
.update { $0._isDeleted = true }
.update { $0._pendingStatus = #bind(.deleted) }
} when: { _ in
!SyncEngine.$isSynchronizing
}
Expand Down Expand Up @@ -242,6 +252,7 @@
afterZoneUpdateTrigger(),
afterUpdateTrigger(for: syncEngine),
afterSoftDeleteTrigger(for: syncEngine),
afterReinsertTrigger(for: syncEngine),
]
}

Expand Down Expand Up @@ -323,7 +334,7 @@
)
)
} when: { old, new in
old._isDeleted.eq(new._isDeleted) && !SyncEngine.$isSynchronizing
old._pendingStatus.is(new._pendingStatus) && !SyncEngine.$isSynchronizing
}
)
}
Expand All @@ -334,7 +345,7 @@
createTemporaryTrigger(
"\(String.sqliteDataCloudKitSchemaName)_after_delete_on_sqlitedata_icloud_metadata",
ifNotExists: true,
after: .update(of: \._isDeleted) { _, new in
after: .update(of: \._pendingStatus) { _, new in
Values(
syncEngine.$didDelete(
recordName: new.recordName,
Expand All @@ -344,7 +355,34 @@
)
)
} when: { old, new in
!old._isDeleted && new._isDeleted && !SyncEngine.$isSynchronizing
(old._pendingStatus.is(nil) || old._pendingStatus.neq(PendingStatus.deleted))
&& new._pendingStatus.eq(PendingStatus.deleted)
&& !SyncEngine.$isSynchronizing
}
)
}

fileprivate static func afterReinsertTrigger(
for syncEngine: SyncEngine
) -> TemporaryTrigger<Self> {
createTemporaryTrigger(
"\(String.sqliteDataCloudKitSchemaName)_after_reinsert_on_sqlitedata_icloud_metadata",
ifNotExists: true,
after: .update(of: \._pendingStatus) { _, new in
Values(
syncEngine.$didUpdate(
recordName: new.recordName,
zoneName: new.zoneName,
ownerName: new.ownerName,
oldZoneName: new.zoneName,
oldOwnerName: new.ownerName,
descendantRecordNames: #bind(nil)
)
)
} when: { old, new in
old._pendingStatus.eq(PendingStatus.deleted)
&& new._pendingStatus.eq(PendingStatus.reinserted)
&& !SyncEngine.$isSynchronizing
}
)
}
Expand Down
61 changes: 41 additions & 20 deletions Sources/SQLiteData/CloudKit/SyncEngine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1125,13 +1125,12 @@

let batch = await syncEngine.recordZoneChangeBatch(pendingChanges: changes) { recordID in
guard
let (metadata, allFields) = await withErrorReporting(
let metadata = await withErrorReporting(
.sqliteDataCloudKitFailure,
catching: {
try await metadatabase.read { db in
try SyncMetadata
.find(recordID)
.select { ($0, $0._lastKnownServerRecordAllFields) }
.fetchOne(db)
}
}
Expand Down Expand Up @@ -1197,7 +1196,7 @@
}

let record =
allFields
metadata.allFieldsRecord
?? CKRecord(
recordType: metadata.recordType,
recordID: recordID
Expand All @@ -1220,7 +1219,17 @@
with: T(queryOutput: row),
userModificationTime: metadata.userModificationTime
)
await refreshLastKnownServerRecord(record)
await withErrorReporting(.sqliteDataCloudKitFailure) {
try await userDatabase.write { db in
try refreshLastKnownServerRecord(record, db: db)
if metadata._pendingStatus == .reinserted {
try SyncMetadata
.find(record.recordID)
.update { $0._pendingStatus = #bind(nil) }
.execute(db)
}
}
}
sentRecord = recordID
return record
}
Expand Down Expand Up @@ -1948,9 +1957,12 @@
func open<T>(_ table: some SynchronizableTable<T>) throws {
var columnNames: [String] = T.TableColumns.writableColumns.map(\.name)
if !force,
let allFields = metadata._lastKnownServerRecordAllFields,
let allFields = metadata.allFieldsRecord,
let row = try T.find(#sql("\(bind: metadata.recordPrimaryKey)")).fetchOne(db)
{
if metadata._pendingStatus == .reinserted {
allFields.update(with: T(queryOutput: row), userModificationTime: metadata.userModificationTime)
}
serverRecord.update(
with: allFields,
row: T(queryOutput: row),
Expand All @@ -1970,6 +1982,12 @@
.find(serverRecord.recordID)
.update { $0.setLastKnownServerRecord(serverRecord) }
.execute(db)
if metadata._pendingStatus == .reinserted {
try SyncMetadata
.find(serverRecord.recordID)
.update { $0._pendingStatus = #bind(nil) }
.execute(db)
}
} catch {
guard
let error = error as? DatabaseError,
Expand All @@ -1992,22 +2010,25 @@
private func refreshLastKnownServerRecord(_ record: CKRecord) async {
await withErrorReporting(.sqliteDataCloudKitFailure) {
try await userDatabase.write { db in
let metadata = try SyncMetadata.find(record.recordID).fetchOne(db)
func updateLastKnownServerRecord() throws {
try SyncMetadata
.find(record.recordID)
.update { $0.setLastKnownServerRecord(record) }
.execute(db)
}

if let lastKnownDate = metadata?.lastKnownServerRecord?.modificationDate {
if let recordDate = record.modificationDate, lastKnownDate < recordDate {
try updateLastKnownServerRecord()
}
} else {
try updateLastKnownServerRecord()
}
try refreshLastKnownServerRecord(record, db: db)
}
}
}

private func refreshLastKnownServerRecord(_ record: CKRecord, db: Database) throws {
let metadata = try SyncMetadata.find(record.recordID).fetchOne(db)
func updateLastKnownServerRecord() throws {
try SyncMetadata
.find(record.recordID)
.update { $0.setLastKnownServerRecord(record) }
.execute(db)
}
if let lastKnownDate = metadata?.lastKnownServerRecord?.modificationDate {
if let recordDate = record.modificationDate, lastKnownDate < recordDate {
try updateLastKnownServerRecord()
}
} else {
try updateLastKnownServerRecord()
}
}

Expand Down
33 changes: 29 additions & 4 deletions Sources/SQLiteData/CloudKit/SyncMetadata.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
#if canImport(CloudKit)
import CloudKit

/// Represents the pending synchronization state of a record.
public enum PendingStatus: Int, Hashable, Sendable, QueryBindable, QueryDecodable, QueryRepresentable {
/// Indicates the metadata has been "soft" deleted. It will be fully deleted once the
/// next batch of pending changes is processed.
case deleted = 0
/// Indicates the record has been reinserted after being soft-deleted. This status
/// will be cleared after the next batch of pending changes is processed or server
/// record changes are applied.
case reinserted = 1
}

/// A table that tracks metadata related to synchronized data.
///
/// Each row of this table represents a synchronized record across all tables synchronized with
Expand Down Expand Up @@ -93,9 +104,23 @@
@Column(as: CKShare?.SystemFieldsRepresentation.self)
public let share: CKShare?

/// Determines if the metadata has been "soft" deleted. It will be fully deleted once the
/// next batch of pending changes is processed.
public let _isDeleted: Bool
/// The pending synchronization state of the record which can require special handling.
///
/// `nil` indicates a normal record with standard sync behavior.
public let _pendingStatus: PendingStatus?

/// The appropriate all-fields record to use as a base when building a `CKRecord` for upload.
///
/// For reinserted rows, returns `lastKnownServerRecord` (system fields only)
/// For all others, returns `_lastKnownServerRecordAllFields`.
var allFieldsRecord: CKRecord? {
switch _pendingStatus {
case .reinserted:
lastKnownServerRecord
case nil, .deleted:
_lastKnownServerRecordAllFields
}
}

@Column("hasLastKnownServerRecord", generated: .virtual)
public let _hasLastKnownServerRecord: Bool
Expand Down Expand Up @@ -191,7 +216,7 @@
self._hasLastKnownServerRecord = lastKnownServerRecord != nil
self._isShared = share != nil
self.userModificationTime = userModificationTime
self._isDeleted = false
self._pendingStatus = nil
}

package static func find(_ recordID: CKRecord.ID) -> Where<Self> {
Expand Down
Loading