macsync/@packages/iphoto/Sources/IPhotoSync/SyncManager.swift
Natalie 2568866c70 feat(@applications): implement mac-sync identity and photo workflows
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-05-17 20:27:05 -07:00

423 lines
16 KiB
Swift

import AppKit
import Foundation
import LilithLogging
import MacSyncShared
private let log = AppLogger.logger(for: "IPhoto.Sync")
// MARK: - Stats
public struct IPhotoSyncStats: Equatable, Sendable {
public var photoCount: Int = 0
public var albumCount: Int = 0
public var uploadedCount: Int = 0
public var pendingUpload: Int = 0
public var failedBatches: Int = 0
// Per-session upload tracking
public var currentSessionUploaded: Int = 0
public var currentSessionFailed: Int = 0
public var uploadStartTime: Date?
public var bytesUploaded: Int64 = 0
public var uploadRate: Double {
guard let start = uploadStartTime, currentSessionUploaded > 0 else { return 0 }
let elapsed = Date().timeIntervalSince(start)
return elapsed > 0 ? Double(currentSessionUploaded) / elapsed : 0
}
public var uploadRateString: String {
let rate = uploadRate
return rate < 1 ? String(format: "%.1f/min", rate * 60) : String(format: "%.1f/sec", rate)
}
public var bytesUploadedString: String {
let f = ByteCountFormatter()
f.countStyle = .file
return f.string(fromByteCount: bytesUploaded)
}
public var eta: String? {
guard uploadRate > 0, pendingUpload > 0 else { return nil }
let remaining = Double(pendingUpload) / uploadRate
if remaining < 60 { return "\(Int(remaining))s" }
if remaining < 3600 { return "\(Int(remaining / 60))m" }
return "\(Int(remaining / 3600))h \(Int((remaining.truncatingRemainder(dividingBy: 3600)) / 60))m"
}
public init() {}
}
// MARK: - Sync Error
public enum IPhotoSyncError: Equatable, Sendable {
case none
case photosAccessRequired
case backendUnreachable
case connectionFailed(String)
public var message: String {
switch self {
case .none: return ""
case .photosAccessRequired: return "Photos access required"
case .backendUnreachable: return "Cannot connect to backend server"
case .connectionFailed(let m): return m
}
}
public var isConnectionError: Bool {
switch self {
case .backendUnreachable, .connectionFailed: return true
default: return false
}
}
}
// MARK: - SyncManager
/// Orchestrates iPhoto sync: metadata batch push binary upload for pending assets.
///
/// Upload concurrency is capped at `maxConcurrentUploads` (4) using `withTaskGroup`
/// with explicit batch windows so we never open more than 4 concurrent HTTP streams.
@MainActor
public final class SyncManager: BaseSyncManager<IPhotoSyncStats, IPhotoSyncError> {
public static let shared = SyncManager()
private let reader = PhotosLibraryReader.shared
private let apiClient = APIClient.shared
/// Batch size for metadata sync 100 photos per request.
private let metadataBatchSize = 100
/// Maximum concurrent binary uploads avoids saturating the upload link.
private let maxConcurrentUploads = 8
/// Max batches drained per `performPendingUpload` cycle. Each batch is up to
/// `pendingFetchLimit` items, so a heavy backlog finishes inside one cycle
/// instead of waiting `timerInterval` between fetches.
private let pendingFetchLimit = 500
private let pendingMaxBatchesPerCycle = 20
private init() {
super.init(
initialStats: IPhotoSyncStats(),
noError: .none,
persistenceKey: "iphoto",
timerInterval: 300
)
// Migrate legacy UserDefaults watermark keys (one-time).
if lastSync == nil, let legacy = UserDefaults.standard.object(forKey: "iphotoLastSync") as? Date {
setLastSync(legacy)
UserDefaults.standard.removeObject(forKey: "iphotoLastSync")
}
if lastSyncCompletedAt == nil, let legacy = UserDefaults.standard.object(forKey: "iphotoLastSyncCompletedAt") as? Date {
lastSyncCompletedAt = legacy
UserDefaults.standard.set(legacy, forKey: "iphoto.lastSyncCompletedAt")
UserDefaults.standard.removeObject(forKey: "iphotoLastSyncCompletedAt")
}
}
// MARK: - Authorization hooks
public override func isAuthorized() async -> Bool { reader.isAccessible }
public override func requestAuthorization() async -> Bool {
await reader.requestAuthorization()
}
public override func onAuthorizationDenied() {
syncError = .photosAccessRequired
}
// MARK: - Module-specific entry points
public func forceFullResync() {
guard !isSyncing else { return }
log.info("Forcing full resync")
setLastSync(nil)
stats.failedBatches = 0
syncNow()
}
/// Upload only photos that have metadata but no binary on the server.
public func uploadPending() {
guard !isSyncing else { return }
Task { [weak self] in
guard let self else { return }
if !self.reader.isAccessible {
let ok = await self.reader.requestAuthorization()
guard ok else {
self.syncError = .photosAccessRequired
return
}
}
await self.performPendingUpload()
}
}
public func openPhotosAccessSettings() {
if let url = URL(string: "x-apple.systempreferences:com.apple.preference.security?Privacy_Photos") {
NSWorkspace.shared.open(url)
}
}
// MARK: - Sync cycle
public override func performSync() async {
log.info("performSync starting")
currentOperation = "Fetching photos…"
// Background diagnostic never blocks sync loop
Task.detached { self.reader.checkAccessibility() }
// Phase 1: Fetch metadata
let photos = reader.fetchPhotos(since: lastSync)
log.info("Fetched \(photos.count) photos from library")
currentOperation = "Found \(photos.count) photos"
if photos.isEmpty {
await updateStats()
currentOperation = "No new photos"
await performPendingUpload()
return
}
// Phase 2: Batch metadata sync
currentOperation = "Syncing metadata…"
var allNeedsUpload: [String] = []
let df = ISO8601DateFormatter()
let batches = stride(from: 0, to: photos.count, by: metadataBatchSize).map {
Array(photos[$0..<min($0 + metadataBatchSize, photos.count)])
}
for (idx, batch) in batches.enumerated() {
currentOperation = "Syncing metadata… (\(idx + 1)/\(batches.count))"
let payloads = batch.map { photo in
SyncPhotoPayload(
localIdentifier: photo.localIdentifier,
mediaType: photo.mediaTypeString,
width: photo.width,
height: photo.height,
fileSize: photo.fileSize,
durationSeconds: photo.duration > 0 ? photo.duration : nil,
capturedAt: df.string(from: photo.creationDate ?? Date()),
modifiedAt: photo.modificationDate.map { df.string(from: $0) },
originalFilename: photo.originalFilename ?? reader.getOriginalFilename(localIdentifier: photo.localIdentifier),
latitude: photo.latitude,
longitude: photo.longitude,
isFavorite: photo.isFavorite,
isHidden: photo.isHidden,
isScreenshot: photo.isScreenshot,
isSelfie: photo.isSelfie,
isBurst: photo.isBurst,
burstIdentifier: photo.burstIdentifier
)
}
do {
let response = try await apiClient.syncPhotos(payloads)
allNeedsUpload.append(contentsOf: response.needsUpload)
if syncError.isConnectionError { syncError = .none }
log.info("Batch \(idx + 1)/\(batches.count) synced=\(response.synced) needsUpload=\(response.needsUpload.count)")
} catch {
stats.failedBatches += 1
setConnectionError(error)
log.warning("Batch \(idx + 1) failed: \(error.localizedDescription)")
}
}
// Phase 3: Binary uploads
if !allNeedsUpload.isEmpty {
await uploadBatch(allNeedsUpload, label: "Uploading")
}
// Phase 4: Albums
currentOperation = "Syncing albums…"
await syncAlbums()
await updateStats()
currentOperation = "Sync complete"
log.info("performSync complete")
await performPendingUpload()
}
private func performPendingUpload() async {
log.info("Checking pending uploads")
currentOperation = "Checking pending uploads…"
Task.detached { self.reader.checkAccessibility() }
// Drain the backlog in back-to-back batches inside one cycle. Without
// this, a large backlog only loses `pendingFetchLimit` per `timerInterval`.
var batchesProcessed = 0
while batchesProcessed < pendingMaxBatchesPerCycle {
do {
let pending = try await apiClient.getPendingUploads(limit: pendingFetchLimit)
log.info("Server reports \(pending.count) pending uploads (batch \(batchesProcessed + 1))")
if pending.isEmpty {
currentOperation = batchesProcessed == 0 ? "No pending uploads" : "Pending drained"
break
}
currentOperation = "Uploading pending…"
await uploadBatch(pending, label: "Pending upload")
await updateStats()
batchesProcessed += 1
// If the server returned fewer than the requested limit, we've
// drained everything currently queued for this device.
if pending.count < pendingFetchLimit { break }
} catch {
log.warning("getPendingUploads failed: \(error.localizedDescription)")
break
}
}
currentOperation = ""
}
/// Upload `localIdentifiers` in groups of `maxConcurrentUploads` (4).
private func uploadBatch(_ localIdentifiers: [String], label: String) async {
let total = localIdentifiers.count
stats.currentSessionUploaded = 0
stats.currentSessionFailed = 0
stats.uploadStartTime = Date()
stats.bytesUploaded = 0
stats.pendingUpload = total
var uploaded = 0
var failed = 0
for batchStart in stride(from: 0, to: total, by: maxConcurrentUploads) {
let batchEnd = min(batchStart + maxConcurrentUploads, total)
let batch = Array(localIdentifiers[batchStart..<batchEnd])
await withTaskGroup(of: (Bool, Int64).self) { group in
for localId in batch {
group.addTask { await self.uploadSingleAsset(localIdentifier: localId) }
}
for await (success, bytes) in group {
if success {
uploaded += 1
stats.bytesUploaded += bytes
} else {
failed += 1
}
}
}
stats.currentSessionUploaded = uploaded
stats.currentSessionFailed = failed
stats.pendingUpload = total - uploaded
currentOperation = "\(label)\(uploaded)/\(total) (\(stats.uploadRateString))"
if batchEnd % 100 == 0 || batchEnd == total {
log.info("\(label) progress: \(uploaded)/\(total) succeeded, \(failed) failed, \(self.stats.bytesUploadedString)")
}
}
log.info("\(label) complete: \(uploaded) succeeded, \(failed) failed")
}
private func uploadSingleAsset(localIdentifier: String) async -> (Bool, Int64) {
await withCheckedContinuation { continuation in
DispatchQueue.global(qos: .userInitiated).async {
let uuid = localIdentifier.components(separatedBy: "/").first ?? ""
guard !uuid.isEmpty else {
continuation.resume(returning: (false, 0)); return
}
guard let fileURL = self.reader.findOriginalFile(uuid: uuid) else {
log.info("Skipping \(localIdentifier) — not available locally (iCloud-only or deleted)")
continuation.resume(returning: (false, 0)); return
}
let isVideo = ["mov", "mp4", "m4v"].contains(fileURL.pathExtension.lowercased())
if isVideo {
Task {
do {
let fileSize = (try? FileManager.default.attributesOfItem(atPath: fileURL.path)[.size] as? Int64) ?? 0
let mimeType = PhotosLibraryReader.mimeType(fromExtension: fileURL.pathExtension)
let success = try await self.apiClient.uploadPhotoFromURL(
localIdentifier: localIdentifier,
fileURL: fileURL,
mimeType: mimeType
)
self.reader.cleanupVideoTemp(localIdentifier: localIdentifier)
continuation.resume(returning: (success, fileSize))
} catch {
log.warning("Upload failed for video \(localIdentifier): \(error.localizedDescription)")
self.reader.cleanupVideoTemp(localIdentifier: localIdentifier)
continuation.resume(returning: (false, 0))
}
}
} else {
self.reader.requestImageData(localIdentifier: localIdentifier) { data, mimeType in
Task {
guard let data else {
log.info("No image data for \(localIdentifier)")
continuation.resume(returning: (false, 0)); return
}
do {
let success = try await self.apiClient.uploadPhoto(
localIdentifier: localIdentifier,
data: data,
mimeType: mimeType ?? "image/jpeg"
)
continuation.resume(returning: (success, Int64(data.count)))
} catch {
log.warning("Upload failed for image \(localIdentifier): \(error.localizedDescription)")
continuation.resume(returning: (false, 0))
}
}
}
}
}
}
}
private func syncAlbums() async {
let albums = reader.fetchAlbums()
guard !albums.isEmpty else { return }
let df = ISO8601DateFormatter()
let payloads = albums.map { album in
SyncAlbumPayload(
localIdentifier: album.localIdentifier,
title: album.title,
albumType: album.albumTypeString,
photoLocalIdentifiers: album.assetLocalIdentifiers,
startDate: album.startDate.map { df.string(from: $0) },
endDate: album.endDate.map { df.string(from: $0) }
)
}
do {
let synced = try await apiClient.syncAlbums(payloads)
log.info("Synced \(synced) albums")
} catch {
log.warning("Album sync failed: \(error.localizedDescription)")
}
}
private func updateStats() async {
do {
let response = try await apiClient.getStats()
stats.photoCount = response.totalPhotos
stats.albumCount = response.totalAlbums
stats.uploadedCount = response.uploadedPhotos
stats.pendingUpload = response.pendingUpload
log.info("Stats: photos=\(response.totalPhotos) albums=\(response.totalAlbums) uploaded=\(response.uploadedPhotos) pending=\(response.pendingUpload)")
} catch {
log.warning("updateStats failed: \(error.localizedDescription)")
}
}
private func setConnectionError(_ error: Error) {
if SyncConnectionErrorHeuristic.isConnectionError(error) {
syncError = .backendUnreachable
}
}
}