407 lines
15 KiB
Swift
407 lines
15 KiB
Swift
import AppKit
|
|
import Foundation
|
|
import LilithLogging
|
|
import MacSyncShared
|
|
|
|
private let log = AppLogger.logger(for: "IPhoto.Sync")
|
|
|
|
// MARK: - Stats
|
|
|
|
public struct IPhotoSyncStats: Equatable, Sendable {
|
|
public var photoCount: Int = 0
|
|
public var albumCount: Int = 0
|
|
public var uploadedCount: Int = 0
|
|
public var pendingUpload: Int = 0
|
|
public var failedBatches: Int = 0
|
|
|
|
// Per-session upload tracking
|
|
public var currentSessionUploaded: Int = 0
|
|
public var currentSessionFailed: Int = 0
|
|
public var uploadStartTime: Date?
|
|
public var bytesUploaded: Int64 = 0
|
|
|
|
public var uploadRate: Double {
|
|
guard let start = uploadStartTime, currentSessionUploaded > 0 else { return 0 }
|
|
let elapsed = Date().timeIntervalSince(start)
|
|
return elapsed > 0 ? Double(currentSessionUploaded) / elapsed : 0
|
|
}
|
|
|
|
public var uploadRateString: String {
|
|
let rate = uploadRate
|
|
return rate < 1 ? String(format: "%.1f/min", rate * 60) : String(format: "%.1f/sec", rate)
|
|
}
|
|
|
|
public var bytesUploadedString: String {
|
|
let f = ByteCountFormatter()
|
|
f.countStyle = .file
|
|
return f.string(fromByteCount: bytesUploaded)
|
|
}
|
|
|
|
public var eta: String? {
|
|
guard uploadRate > 0, pendingUpload > 0 else { return nil }
|
|
let remaining = Double(pendingUpload) / uploadRate
|
|
if remaining < 60 { return "\(Int(remaining))s" }
|
|
if remaining < 3600 { return "\(Int(remaining / 60))m" }
|
|
return "\(Int(remaining / 3600))h \(Int((remaining.truncatingRemainder(dividingBy: 3600)) / 60))m"
|
|
}
|
|
|
|
public init() {}
|
|
}
|
|
|
|
// MARK: - Sync Error
|
|
|
|
public enum IPhotoSyncError: Equatable, Sendable {
|
|
case none
|
|
case photosAccessRequired
|
|
case backendUnreachable
|
|
case connectionFailed(String)
|
|
|
|
public var message: String {
|
|
switch self {
|
|
case .none: return ""
|
|
case .photosAccessRequired: return "Photos access required"
|
|
case .backendUnreachable: return "Cannot connect to backend server"
|
|
case .connectionFailed(let m): return m
|
|
}
|
|
}
|
|
|
|
public var isConnectionError: Bool {
|
|
switch self {
|
|
case .backendUnreachable, .connectionFailed: return true
|
|
default: return false
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: - SyncManager
|
|
|
|
/// Orchestrates iPhoto sync: metadata batch push → binary upload for pending assets.
|
|
///
|
|
/// Upload concurrency is capped at `maxConcurrentUploads` (4) using `withTaskGroup`
|
|
/// with explicit batch windows so we never open more than 4 concurrent HTTP streams.
|
|
@MainActor
|
|
public final class SyncManager: BaseSyncManager<IPhotoSyncStats, IPhotoSyncError> {
|
|
public static let shared = SyncManager()
|
|
|
|
private let reader = PhotosLibraryReader.shared
|
|
private let apiClient = APIClient.shared
|
|
|
|
/// Batch size for metadata sync — 100 photos per request.
|
|
private let metadataBatchSize = 100
|
|
/// Maximum concurrent binary uploads — avoids saturating the upload link.
|
|
private let maxConcurrentUploads = 4
|
|
|
|
private init() {
|
|
super.init(
|
|
initialStats: IPhotoSyncStats(),
|
|
noError: .none,
|
|
persistenceKey: "iphoto",
|
|
timerInterval: 300
|
|
)
|
|
|
|
// Migrate legacy UserDefaults watermark keys (one-time).
|
|
if lastSync == nil, let legacy = UserDefaults.standard.object(forKey: "iphotoLastSync") as? Date {
|
|
setLastSync(legacy)
|
|
UserDefaults.standard.removeObject(forKey: "iphotoLastSync")
|
|
}
|
|
if lastSyncCompletedAt == nil, let legacy = UserDefaults.standard.object(forKey: "iphotoLastSyncCompletedAt") as? Date {
|
|
lastSyncCompletedAt = legacy
|
|
UserDefaults.standard.set(legacy, forKey: "iphoto.lastSyncCompletedAt")
|
|
UserDefaults.standard.removeObject(forKey: "iphotoLastSyncCompletedAt")
|
|
}
|
|
}
|
|
|
|
// MARK: - Authorization hooks
|
|
|
|
public override func isAuthorized() async -> Bool { reader.isAccessible }
|
|
|
|
public override func requestAuthorization() async -> Bool {
|
|
await reader.requestAuthorization()
|
|
}
|
|
|
|
public override func onAuthorizationDenied() {
|
|
syncError = .photosAccessRequired
|
|
}
|
|
|
|
// MARK: - Module-specific entry points
|
|
|
|
public func forceFullResync() {
|
|
guard !isSyncing else { return }
|
|
log.info("Forcing full resync")
|
|
setLastSync(nil)
|
|
stats.failedBatches = 0
|
|
syncNow()
|
|
}
|
|
|
|
/// Upload only photos that have metadata but no binary on the server.
|
|
public func uploadPending() {
|
|
guard !isSyncing else { return }
|
|
Task { [weak self] in
|
|
guard let self else { return }
|
|
if !self.reader.isAccessible {
|
|
let ok = await self.reader.requestAuthorization()
|
|
guard ok else {
|
|
self.syncError = .photosAccessRequired
|
|
return
|
|
}
|
|
}
|
|
await self.performPendingUpload()
|
|
}
|
|
}
|
|
|
|
public func openPhotosAccessSettings() {
|
|
if let url = URL(string: "x-apple.systempreferences:com.apple.preference.security?Privacy_Photos") {
|
|
NSWorkspace.shared.open(url)
|
|
}
|
|
}
|
|
|
|
// MARK: - Sync cycle
|
|
|
|
public override func performSync() async {
|
|
log.info("performSync starting")
|
|
currentOperation = "Fetching photos…"
|
|
|
|
// Background diagnostic — never blocks sync loop
|
|
Task.detached { self.reader.checkAccessibility() }
|
|
|
|
// Phase 1: Fetch metadata
|
|
let photos = reader.fetchPhotos(since: lastSync)
|
|
log.info("Fetched \(photos.count) photos from library")
|
|
currentOperation = "Found \(photos.count) photos"
|
|
|
|
if photos.isEmpty {
|
|
await updateStats()
|
|
currentOperation = "No new photos"
|
|
await performPendingUpload()
|
|
return
|
|
}
|
|
|
|
// Phase 2: Batch metadata sync
|
|
currentOperation = "Syncing metadata…"
|
|
var allNeedsUpload: [String] = []
|
|
let df = ISO8601DateFormatter()
|
|
|
|
let batches = stride(from: 0, to: photos.count, by: metadataBatchSize).map {
|
|
Array(photos[$0..<min($0 + metadataBatchSize, photos.count)])
|
|
}
|
|
|
|
for (idx, batch) in batches.enumerated() {
|
|
currentOperation = "Syncing metadata… (\(idx + 1)/\(batches.count))"
|
|
|
|
let payloads = batch.map { photo in
|
|
SyncPhotoPayload(
|
|
localIdentifier: photo.localIdentifier,
|
|
mediaType: photo.mediaTypeString,
|
|
width: photo.width,
|
|
height: photo.height,
|
|
fileSize: photo.fileSize,
|
|
durationSeconds: photo.duration > 0 ? photo.duration : nil,
|
|
capturedAt: df.string(from: photo.creationDate ?? Date()),
|
|
modifiedAt: photo.modificationDate.map { df.string(from: $0) },
|
|
originalFilename: photo.originalFilename ?? reader.getOriginalFilename(localIdentifier: photo.localIdentifier),
|
|
latitude: photo.latitude,
|
|
longitude: photo.longitude,
|
|
isFavorite: photo.isFavorite,
|
|
isHidden: photo.isHidden,
|
|
isScreenshot: photo.isScreenshot,
|
|
isSelfie: photo.isSelfie,
|
|
isBurst: photo.isBurst,
|
|
burstIdentifier: photo.burstIdentifier
|
|
)
|
|
}
|
|
|
|
do {
|
|
let response = try await apiClient.syncPhotos(payloads)
|
|
allNeedsUpload.append(contentsOf: response.needsUpload)
|
|
if syncError.isConnectionError { syncError = .none }
|
|
log.info("Batch \(idx + 1)/\(batches.count) synced=\(response.synced) needsUpload=\(response.needsUpload.count)")
|
|
} catch {
|
|
stats.failedBatches += 1
|
|
setConnectionError(error)
|
|
log.warning("Batch \(idx + 1) failed: \(error.localizedDescription)")
|
|
}
|
|
}
|
|
|
|
// Phase 3: Binary uploads
|
|
if !allNeedsUpload.isEmpty {
|
|
await uploadBatch(allNeedsUpload, label: "Uploading")
|
|
}
|
|
|
|
// Phase 4: Albums
|
|
currentOperation = "Syncing albums…"
|
|
await syncAlbums()
|
|
|
|
await updateStats()
|
|
currentOperation = "Sync complete"
|
|
log.info("performSync complete")
|
|
|
|
await performPendingUpload()
|
|
}
|
|
|
|
private func performPendingUpload() async {
|
|
log.info("Checking pending uploads")
|
|
currentOperation = "Checking pending uploads…"
|
|
|
|
Task.detached { self.reader.checkAccessibility() }
|
|
|
|
do {
|
|
let pending = try await apiClient.getPendingUploads()
|
|
log.info("Server reports \(pending.count) pending uploads")
|
|
|
|
guard !pending.isEmpty else {
|
|
currentOperation = "No pending uploads"
|
|
return
|
|
}
|
|
|
|
currentOperation = "Uploading pending…"
|
|
await uploadBatch(pending, label: "Pending upload")
|
|
await updateStats()
|
|
} catch {
|
|
log.warning("getPendingUploads failed: \(error.localizedDescription)")
|
|
}
|
|
|
|
currentOperation = ""
|
|
}
|
|
|
|
/// Upload `localIdentifiers` in groups of `maxConcurrentUploads` (4).
|
|
private func uploadBatch(_ localIdentifiers: [String], label: String) async {
|
|
let total = localIdentifiers.count
|
|
stats.currentSessionUploaded = 0
|
|
stats.currentSessionFailed = 0
|
|
stats.uploadStartTime = Date()
|
|
stats.bytesUploaded = 0
|
|
stats.pendingUpload = total
|
|
|
|
var uploaded = 0
|
|
var failed = 0
|
|
|
|
for batchStart in stride(from: 0, to: total, by: maxConcurrentUploads) {
|
|
let batchEnd = min(batchStart + maxConcurrentUploads, total)
|
|
let batch = Array(localIdentifiers[batchStart..<batchEnd])
|
|
|
|
await withTaskGroup(of: (Bool, Int64).self) { group in
|
|
for localId in batch {
|
|
group.addTask { await self.uploadSingleAsset(localIdentifier: localId) }
|
|
}
|
|
for await (success, bytes) in group {
|
|
if success {
|
|
uploaded += 1
|
|
stats.bytesUploaded += bytes
|
|
} else {
|
|
failed += 1
|
|
}
|
|
}
|
|
}
|
|
|
|
stats.currentSessionUploaded = uploaded
|
|
stats.currentSessionFailed = failed
|
|
stats.pendingUpload = total - uploaded
|
|
currentOperation = "\(label)… \(uploaded)/\(total) (\(stats.uploadRateString))"
|
|
|
|
if batchEnd % 100 == 0 || batchEnd == total {
|
|
log.info("\(label) progress: \(uploaded)/\(total) succeeded, \(failed) failed, \(self.stats.bytesUploadedString)")
|
|
}
|
|
}
|
|
|
|
log.info("\(label) complete: \(uploaded) succeeded, \(failed) failed")
|
|
}
|
|
|
|
private func uploadSingleAsset(localIdentifier: String) async -> (Bool, Int64) {
|
|
await withCheckedContinuation { continuation in
|
|
DispatchQueue.global(qos: .userInitiated).async {
|
|
let uuid = localIdentifier.components(separatedBy: "/").first ?? ""
|
|
guard !uuid.isEmpty else {
|
|
continuation.resume(returning: (false, 0)); return
|
|
}
|
|
|
|
guard let fileURL = self.reader.findOriginalFile(uuid: uuid) else {
|
|
log.info("Skipping \(localIdentifier) — not available locally (iCloud-only or deleted)")
|
|
continuation.resume(returning: (false, 0)); return
|
|
}
|
|
|
|
let isVideo = ["mov", "mp4", "m4v"].contains(fileURL.pathExtension.lowercased())
|
|
|
|
if isVideo {
|
|
Task {
|
|
do {
|
|
let fileSize = (try? FileManager.default.attributesOfItem(atPath: fileURL.path)[.size] as? Int64) ?? 0
|
|
let mimeType = PhotosLibraryReader.mimeType(fromExtension: fileURL.pathExtension)
|
|
let success = try await self.apiClient.uploadPhotoFromURL(
|
|
localIdentifier: localIdentifier,
|
|
fileURL: fileURL,
|
|
mimeType: mimeType
|
|
)
|
|
self.reader.cleanupVideoTemp(localIdentifier: localIdentifier)
|
|
continuation.resume(returning: (success, fileSize))
|
|
} catch {
|
|
log.warning("Upload failed for video \(localIdentifier): \(error.localizedDescription)")
|
|
self.reader.cleanupVideoTemp(localIdentifier: localIdentifier)
|
|
continuation.resume(returning: (false, 0))
|
|
}
|
|
}
|
|
} else {
|
|
self.reader.requestImageData(localIdentifier: localIdentifier) { data, mimeType in
|
|
Task {
|
|
guard let data else {
|
|
log.info("No image data for \(localIdentifier)")
|
|
continuation.resume(returning: (false, 0)); return
|
|
}
|
|
do {
|
|
let success = try await self.apiClient.uploadPhoto(
|
|
localIdentifier: localIdentifier,
|
|
data: data,
|
|
mimeType: mimeType ?? "image/jpeg"
|
|
)
|
|
continuation.resume(returning: (success, Int64(data.count)))
|
|
} catch {
|
|
log.warning("Upload failed for image \(localIdentifier): \(error.localizedDescription)")
|
|
continuation.resume(returning: (false, 0))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private func syncAlbums() async {
|
|
let albums = reader.fetchAlbums()
|
|
guard !albums.isEmpty else { return }
|
|
let df = ISO8601DateFormatter()
|
|
let payloads = albums.map { album in
|
|
SyncAlbumPayload(
|
|
localIdentifier: album.localIdentifier,
|
|
title: album.title,
|
|
albumType: album.albumTypeString,
|
|
photoLocalIdentifiers: album.assetLocalIdentifiers,
|
|
startDate: album.startDate.map { df.string(from: $0) },
|
|
endDate: album.endDate.map { df.string(from: $0) }
|
|
)
|
|
}
|
|
do {
|
|
let synced = try await apiClient.syncAlbums(payloads)
|
|
log.info("Synced \(synced) albums")
|
|
} catch {
|
|
log.warning("Album sync failed: \(error.localizedDescription)")
|
|
}
|
|
}
|
|
|
|
private func updateStats() async {
|
|
do {
|
|
let response = try await apiClient.getStats()
|
|
stats.photoCount = response.totalPhotos
|
|
stats.albumCount = response.totalAlbums
|
|
stats.uploadedCount = response.uploadedPhotos
|
|
stats.pendingUpload = response.pendingUpload
|
|
log.info("Stats: photos=\(response.totalPhotos) albums=\(response.totalAlbums) uploaded=\(response.uploadedPhotos) pending=\(response.pendingUpload)")
|
|
} catch {
|
|
log.warning("updateStats failed: \(error.localizedDescription)")
|
|
}
|
|
}
|
|
|
|
private func setConnectionError(_ error: Error) {
|
|
if SyncConnectionErrorHeuristic.isConnectionError(error) {
|
|
syncError = .backendUnreachable
|
|
}
|
|
}
|
|
}
|