[codex] Encrypt local batch fallback files#38
Conversation
PR SummaryMedium Risk Overview Adds queued-batch replay for remote mode. Reviewed by Cursor Bugbot for commit e2a2503. Bugbot is set up for automated code reviews on this repo. Configure here. |
8076dde to
6b8acdb
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Autofix Details
Bugbot Autofix prepared fixes for both issues found in the latest run.
- ✅ Fixed: Concurrent replays can resubmit the same batches
- Added a replay-in-progress guard with rerun coalescing so overlapping retry requests no longer resubmit the same queued batch files.
- ✅ Fixed: Duplicate keychain item silently yields stale key
- Key creation now re-reads and returns the persisted key when SecItemAdd reports a duplicate instead of returning unwritten random bytes.
Preview (89e1a2ddac)
diff --git a/README.md b/README.md
--- a/README.md
+++ b/README.md
@@ -28,11 +28,14 @@
(default 4096) with `ocrTextTruncated` set when the cap applies.
- Batches every 30s or 24 frames, whichever first.
- Local-only mode persists batches under `~/.evalops/agentd/batches/` as
- `0o600` JSON and sweeps old or over-budget batches; HTTP mode `POST`s a
- Connect/proto JSON `SubmitBatchRequest` to
+ `0o600` JSON by default and sweeps old or over-budget batches; HTTP mode
+ `POST`s a Connect/proto JSON `SubmitBatchRequest` to
`chronicle.v1.ChronicleService.SubmitBatch` and falls back to local on
failure. Remote HTTP is allowed only for loopback development; non-loopback
remote endpoints must use HTTPS and configured client auth.
+- Remote and Secret Broker modes encrypt local fallback batches at rest by
+ default using a per-device Keychain-backed AES-GCM key. Local-only mode can
+ opt in with `encryptLocalBatches: true`.
- Optional Secret Broker mode wraps the frame batch into a broker artifact
(`chronicle_frame_batch_json`) first, then sends only the artifact/session
reference to Chronicle so Platform can unwrap, meter, and revoke through ASB.
@@ -97,6 +100,8 @@
- `maxOcrTextChars: 4096`
- `maxBatchAgeDays: 7`
- `maxBatchBytes: 536870912`
+- `encryptLocalBatches: false` in local-only mode, `true` in remote or Secret
+ Broker mode when omitted
- `auth: { "mode": "none" }`
Remote mode requires `localOnly: false`, an HTTPS or loopback endpoint, and an
@@ -165,6 +170,12 @@
stops capture until a later policy resumes it, while manual user pause still
wins locally.
+Encrypted local batches use the `.agentdbatch` extension. The encryption key is
+created or loaded from Keychain service `dev.evalops.agentd.local-batch-key`,
+accounted by `deviceId`, and is never written to `config.json` or the batch
+directory. Retention sweeps apply to both plaintext `.json` batches and
+encrypted `.agentdbatch` batches.
+
## What's next
- Consume generated `chronicle.v1` Swift types when the platform SDK publishes
@@ -172,7 +183,6 @@
([evalops/platform#1078](https://github.com/evalops/platform/issues/1078)).
- Calendar / Zoom auto-pause via NATS subject
`chronicle.policy.pause` (siphon-fed).
-- Encryption-at-rest option for local batches.
- Hardware-backed permission-flow smoke test for Screen Recording and
Accessibility prompts.
diff --git a/Sources/agentd/Config.swift b/Sources/agentd/Config.swift
--- a/Sources/agentd/Config.swift
+++ b/Sources/agentd/Config.swift
@@ -74,6 +74,7 @@
var idleThresholdSeconds: Double
var idlePollSeconds: Double
var localOnly: Bool
+ var encryptLocalBatches: Bool
var auth: AuthMode
var secretBroker: SecretBrokerConfig?
@@ -100,6 +101,7 @@
case idleThresholdSeconds
case idlePollSeconds
case localOnly
+ case encryptLocalBatches
case auth
case secretBroker
}
@@ -126,6 +128,7 @@
idleThresholdSeconds: Double = 60,
idlePollSeconds: Double = 5,
localOnly: Bool,
+ encryptLocalBatches: Bool? = nil,
auth: AuthMode = .none,
secretBroker: SecretBrokerConfig? = nil
) {
@@ -150,6 +153,7 @@
self.idleThresholdSeconds = idleThresholdSeconds
self.idlePollSeconds = idlePollSeconds
self.localOnly = localOnly
+ self.encryptLocalBatches = encryptLocalBatches ?? (!localOnly || secretBroker != nil)
self.auth = auth
self.secretBroker = secretBroker
}
@@ -241,6 +245,9 @@
localOnly = try container.decodeIfPresent(Bool.self, forKey: .localOnly) ?? true
auth = try container.decodeIfPresent(AuthMode.self, forKey: .auth) ?? .none
secretBroker = try container.decodeIfPresent(SecretBrokerConfig.self, forKey: .secretBroker)
+ encryptLocalBatches =
+ try container.decodeIfPresent(Bool.self, forKey: .encryptLocalBatches)
+ ?? (!localOnly || secretBroker != nil)
}
func encode(to encoder: Encoder) throws {
@@ -266,6 +273,7 @@
try container.encode(idleThresholdSeconds, forKey: .idleThresholdSeconds)
try container.encode(idlePollSeconds, forKey: .idlePollSeconds)
try container.encode(localOnly, forKey: .localOnly)
+ try container.encode(encryptLocalBatches, forKey: .encryptLocalBatches)
try container.encode(auth, forKey: .auth)
try container.encodeIfPresent(secretBroker, forKey: .secretBroker)
}
@@ -325,6 +333,7 @@
idleThresholdSeconds: 60,
idlePollSeconds: 5,
localOnly: true,
+ encryptLocalBatches: false,
auth: .none,
secretBroker: nil
)
diff --git a/Sources/agentd/LocalBatchCrypto.swift b/Sources/agentd/LocalBatchCrypto.swift
new file mode 100644
--- /dev/null
+++ b/Sources/agentd/LocalBatchCrypto.swift
@@ -1,0 +1,145 @@
+// SPDX-License-Identifier: BUSL-1.1
+
+@preconcurrency import CryptoKit
+import Foundation
+import Security
+
+protocol LocalBatchKeyProviding: Sendable {
+ func localBatchKey(deviceId: String) throws -> SymmetricKey
+}
+
+struct KeychainLocalBatchKeyProvider: LocalBatchKeyProviding {
+ private let service: String
+ private let readKeyData: @Sendable (String, String) throws -> Data?
+ private let storeKeyData: @Sendable (String, String, Data) throws -> Bool
+ private let generateRandomKeyData: @Sendable () throws -> Data
+
+ init(
+ service: String = "dev.evalops.agentd.local-batch-key",
+ readKeyData: @escaping @Sendable (String, String) throws -> Data? = Self.readKeyData,
+ storeKeyData: @escaping @Sendable (String, String, Data) throws -> Bool = Self.storeKeyData,
+ generateRandomKeyData: @escaping @Sendable () throws -> Data = Self.generateRandomKeyData
+ ) {
+ self.service = service
+ self.readKeyData = readKeyData
+ self.storeKeyData = storeKeyData
+ self.generateRandomKeyData = generateRandomKeyData
+ }
+
+ func localBatchKey(deviceId: String) throws -> SymmetricKey {
+ if let existing = try readKey(deviceId: deviceId) {
+ return SymmetricKey(data: existing)
+ }
+
+ let bytes = try generateRandomKeyData()
+ if try storeKey(bytes, deviceId: deviceId) {
+ return SymmetricKey(data: bytes)
+ }
+
+ guard let existing = try readKey(deviceId: deviceId) else {
+ throw LocalBatchCryptoError.keychainReadFailed(errSecItemNotFound)
+ }
+ return SymmetricKey(data: existing)
+ }
+
+ private func readKey(deviceId: String) throws -> Data? {
+ try readKeyData(service, deviceId)
+ }
+
+ private func storeKey(_ key: Data, deviceId: String) throws -> Bool {
+ try storeKeyData(service, deviceId, key)
+ }
+
+ private static func generateRandomKeyData() throws -> Data {
+ var bytes = Data(count: 32)
+ let status = bytes.withUnsafeMutableBytes { buffer in
+ SecRandomCopyBytes(kSecRandomDefault, buffer.count, buffer.baseAddress!)
+ }
+ guard status == errSecSuccess else {
+ throw LocalBatchCryptoError.keyGenerationFailed(status)
+ }
+ return bytes
+ }
+
+ private static func readKeyData(service: String, deviceId: String) throws -> Data? {
+ let query: [String: Any] = [
+ kSecClass as String: kSecClassGenericPassword,
+ kSecAttrService as String: service,
+ kSecAttrAccount as String: deviceId,
+ kSecReturnData as String: true,
+ kSecMatchLimit as String: kSecMatchLimitOne,
+ ]
+ var item: CFTypeRef?
+ let status = SecItemCopyMatching(query as CFDictionary, &item)
+ if status == errSecItemNotFound {
+ return nil
+ }
+ guard status == errSecSuccess, let data = item as? Data else {
+ throw LocalBatchCryptoError.keychainReadFailed(status)
+ }
+ return data
+ }
+
+ private static func storeKeyData(service: String, deviceId: String, key: Data) throws -> Bool {
+ let attributes: [String: Any] = [
+ kSecClass as String: kSecClassGenericPassword,
+ kSecAttrService as String: service,
+ kSecAttrAccount as String: deviceId,
+ kSecAttrAccessible as String: kSecAttrAccessibleAfterFirstUnlockThisDeviceOnly,
+ kSecValueData as String: key,
+ ]
+ let status = SecItemAdd(attributes as CFDictionary, nil)
+ switch status {
+ case errSecSuccess:
+ return true
+ case errSecDuplicateItem:
+ return false
+ default:
+ throw LocalBatchCryptoError.keychainWriteFailed(status)
+ }
+ }
+}
+
+struct LocalBatchCryptor: @unchecked Sendable {
+ static let encryptedExtension = "agentdbatch"
+ private static let magic = Data("AGENTD-BATCH-AESGCM-v1\n".utf8)
+
+ let key: SymmetricKey
+
+ func encrypt(_ plaintext: Data) throws -> Data {
+ let sealed = try AES.GCM.seal(plaintext, using: key)
+ guard let combined = sealed.combined else {
+ throw LocalBatchCryptoError.invalidCiphertext
+ }
+ return Self.magic + combined
+ }
+
+ func decrypt(_ ciphertext: Data) throws -> Data {
+ guard ciphertext.starts(with: Self.magic) else {
+ throw LocalBatchCryptoError.invalidCiphertext
+ }
+ let combined = ciphertext.dropFirst(Self.magic.count)
+ let box = try AES.GCM.SealedBox(combined: Data(combined))
+ return try AES.GCM.open(box, using: key)
+ }
+}
+
+enum LocalBatchCryptoError: Error, LocalizedError, Equatable {
+ case keyGenerationFailed(OSStatus)
+ case keychainReadFailed(OSStatus)
+ case keychainWriteFailed(OSStatus)
+ case invalidCiphertext
+
+ var errorDescription: String? {
+ switch self {
+ case .keyGenerationFailed(let status):
+ return "failed to generate local batch key: \(status)"
+ case .keychainReadFailed(let status):
+ return "failed to read local batch key from Keychain: \(status)"
+ case .keychainWriteFailed(let status):
+ return "failed to store local batch key in Keychain: \(status)"
+ case .invalidCiphertext:
+ return "local batch ciphertext is invalid"
+ }
+ }
+}
diff --git a/Sources/agentd/Submitter.swift b/Sources/agentd/Submitter.swift
--- a/Sources/agentd/Submitter.swift
+++ b/Sources/agentd/Submitter.swift
@@ -14,6 +14,9 @@
private let batchDirectory: URL
private let maxBatchBytes: Int64
private let maxBatchAgeDays: Double
+ private let localBatchCryptor: LocalBatchCryptor?
+ private var isRetryingLocalBatches = false
+ private var retryLocalBatchesNeedsRerun = false
init(
endpoint: URL,
@@ -25,7 +28,10 @@
client: (any HTTPClient)? = nil,
batchDirectory: URL? = nil,
maxBatchBytes: Int64 = 512 * 1024 * 1024,
- maxBatchAgeDays: Double = 7
+ maxBatchAgeDays: Double = 7,
+ deviceId: String = "default",
+ encryptLocalBatches: Bool = false,
+ localBatchKeyProvider: any LocalBatchKeyProviding = KeychainLocalBatchKeyProvider()
) throws {
guard EndpointPolicy.isAllowed(endpoint: endpoint, localOnly: localOnly) else {
throw SubmitterInitError.insecureRemoteEndpoint(endpoint.absoluteString)
@@ -56,6 +62,13 @@
.appendingPathComponent(".evalops/agentd/batches")
self.maxBatchBytes = maxBatchBytes
self.maxBatchAgeDays = maxBatchAgeDays
+ if encryptLocalBatches {
+ self.localBatchCryptor = try LocalBatchCryptor(
+ key: localBatchKeyProvider.localBatchKey(deviceId: deviceId)
+ )
+ } else {
+ self.localBatchCryptor = nil
+ }
if let client {
self.client = client
@@ -102,35 +115,53 @@
}
let submitData: Data
- if let secretBroker {
- do {
- let wrapped = try await wrapFrameBatch(batch, using: secretBroker)
- submitData = try encodeBrokerSubmitBatchRequest(
- sessionToken: secretBroker.sessionToken,
- artifactId: wrapped.artifactId,
- grantId: wrapped.grantId,
- localOnly: localOnly
+ do {
+ submitData = try await makeRemoteSubmitData(for: batch, fallbackData: fallbackData)
+ } catch {
+ Log.submit.warning(
+ "remote submit prepare failed batch=\(batch.batchId, privacy: .public) error=\(error.localizedDescription, privacy: .public) — falling back to local"
+ )
+ await persistLocal(batch.batchId, data: fallbackData)
+ return .persistedLocal
+ }
+
+ let result = await submitRemoteData(submitData, batchId: batch.batchId)
+ if case .submitted = result {
+ let replay = await retryLocalBatches()
+ if replay.submitted > 0 || replay.failed > 0 {
+ Log.submit.info(
+ "local batch replay submitted=\(replay.submitted, privacy: .public) failed=\(replay.failed, privacy: .public)"
)
- } catch {
- Log.submit.warning(
- "secret broker wrap failed batch=\(batch.batchId, privacy: .public) error=\(error.localizedDescription, privacy: .public) — falling back to local"
- )
- await persistLocal(batch.batchId, data: fallbackData)
- return .persistedLocal
}
- } else {
- submitData = fallbackData
+ return result
}
- let req = makeRequest(body: submitData)
+ await persistLocal(batch.batchId, data: fallbackData)
+ return .persistedLocal
+ }
+
+ private func makeRemoteSubmitData(for batch: Batch, fallbackData: Data) async throws -> Data {
+ guard let secretBroker else {
+ return fallbackData
+ }
+ let wrapped = try await wrapFrameBatch(batch, using: secretBroker)
+ return try encodeBrokerSubmitBatchRequest(
+ sessionToken: secretBroker.sessionToken,
+ artifactId: wrapped.artifactId,
+ grantId: wrapped.grantId,
+ localOnly: localOnly
+ )
+ }
+
+ private func submitRemoteData(_ data: Data, batchId: String) async -> SubmitResult {
+ let req = makeRequest(body: data)
do {
let (body, resp) = try await client.data(for: req)
if let http = resp as? HTTPURLResponse, !(200..<300).contains(http.statusCode) {
Log.submit.warning(
- "submit status=\(http.statusCode, privacy: .public) batch=\(batch.batchId, privacy: .public) — falling back to local"
+ "submit status=\(http.statusCode, privacy: .public) batch=\(batchId, privacy: .public)"
)
- await persistLocal(batch.batchId, data: fallbackData)
- return .persistedLocal
+ return .failed
} else {
let response: SubmitBatchResponse?
if body.isEmpty {
@@ -140,14 +171,13 @@
response = try JSONDecoder().decode(SubmitBatchResponse.self, from: body)
} catch {
Log.submit.warning(
- "submit malformed response batch=\(batch.batchId, privacy: .public) — falling back to local"
+ "submit malformed response batch=\(batchId, privacy: .public)"
)
- await persistLocal(batch.batchId, data: fallbackData)
- return .persistedLocal
+ return .failed
}
}
Log.submit.info(
- "submit ok batch=\(batch.batchId, privacy: .public) frames=\(batch.frames.count, privacy: .public)"
+ "submit ok batch=\(batchId, privacy: .public)"
)
await sweepLocalBatches()
return .submitted(response)
@@ -155,8 +185,7 @@
} catch {
Log.submit.warning(
"submit error \(error.localizedDescription, privacy: .public) — falling back to local")
- await persistLocal(batch.batchId, data: fallbackData)
- return .persistedLocal
+ return .failed
}
}
@@ -219,54 +248,124 @@
private func persistLocal(_ id: String, data: Data) async {
try? FileManager.default.createDirectory(at: batchDirectory, withIntermediateDirectories: true)
- let url = batchDirectory.appendingPathComponent("\(id).json")
- try? data.write(to: url, options: .atomic)
+ let url: URL
+ let dataToWrite: Data
+ if let localBatchCryptor {
+ do {
+ dataToWrite = try localBatchCryptor.encrypt(data)
+ url = batchDirectory.appendingPathComponent(
+ "\(id).\(LocalBatchCryptor.encryptedExtension)"
+ )
+ } catch {
+ Log.submit.error(
+ "local batch encrypt failed id=\(id, privacy: .public) error=\(error.localizedDescription, privacy: .public)"
+ )
+ return
+ }
+ } else {
+ dataToWrite = data
+ url = batchDirectory.appendingPathComponent("\(id).json")
+ }
+ try? dataToWrite.write(to: url, options: .atomic)
try? FileManager.default.setAttributes([.posixPermissions: 0o600], ofItemAtPath: url.path)
Log.submit.info("local persist \(url.path, privacy: .public)")
await sweepLocalBatches()
}
+ func retryLocalBatches() async -> LocalBatchReplayResult {
+ guard !localOnly else {
+ return LocalBatchReplayResult(submitted: 0, failed: 0)
+ }
+
+ if isRetryingLocalBatches {
+ retryLocalBatchesNeedsRerun = true
+ return LocalBatchReplayResult(submitted: 0, failed: 0)
+ }
+
+ isRetryingLocalBatches = true
+ defer { isRetryingLocalBatches = false }
+ var submitted = 0
+ var failed = 0
+ repeat {
+ retryLocalBatchesNeedsRerun = false
+ let result = await retryLocalBatchesPass()
+ submitted += result.submitted
+ failed += result.failed
+ } while retryLocalBatchesNeedsRerun
+ return LocalBatchReplayResult(submitted: submitted, failed: failed)
+ }
+
+ private func retryLocalBatchesPass() async -> LocalBatchReplayResult {
+ var submitted = 0
+ var failed = 0
+ for file in localBatchFiles().sorted(by: { $0.modified < $1.modified }) {
+ let data: Data
+ do {
+ data = try readLocalBatch(file)
+ } catch {
+ failed += 1
+ Log.submit.warning(
+ "local batch replay read failed file=\(file.url.lastPathComponent, privacy: .public) error=\(error.localizedDescription, privacy: .public)"
+ )
+ continue
+ }
+
+ let submitData: Data
+ do {
+ submitData = try await makeReplaySubmitData(from: data)
+ } catch {
+ failed += 1
+ Log.submit.warning(
+ "local batch replay prepare failed file=\(file.url.lastPathComponent, privacy: .public) error=\(error.localizedDescription, privacy: .public)"
+ )
+ continue
+ }
+
+ let result = await submitRemoteData(submitData, batchId: file.batchId)
+ switch result {
+ case .submitted:
+ try? FileManager.default.removeItem(at: file.url)
+ submitted += 1
+ case .failed, .persistedLocal:
+ failed += 1
+ }
+ }
+ await sweepLocalBatches()
+ return LocalBatchReplayResult(submitted: submitted, failed: failed)
+ }
+
+ private func makeReplaySubmitData(from persistedData: Data) async throws -> Data {
+ guard secretBroker != nil else {
+ return persistedData
+ }
+ let request = try decodeSubmitBatchRequest(persistedData)
+ guard let batch = request.batch else {
+ throw SubmitterError.missingPersistedBatch
+ }
+ return try await makeRemoteSubmitData(for: batch, fallbackData: persistedData)
+ }
+
func sweepLocalBatches() async {
- let fm = FileManager.default
- guard
- let urls = try? fm.contentsOfDirectory(
- at: batchDirectory,
- includingPropertiesForKeys: [.contentModificationDateKey, .fileSizeKey, .isRegularFileKey],
- options: [.skipsHiddenFiles]
- )
- else { return }
-
- var files: [LocalBatchFile] = []
let now = Date()
let cutoff = now.addingTimeInterval(-maxBatchAgeDays * 24 * 60 * 60)
var removedCount = 0
var removedBytes: Int64 = 0
- for url in urls where url.pathExtension == "json" {
- guard
- let values = try? url.resourceValues(forKeys: [
- .contentModificationDateKey,
- .fileSizeKey,
- .isRegularFileKey,
- ]),
- values.isRegularFile == true
- else { continue }
-
- let modified = values.contentModificationDate ?? .distantPast
- let size = Int64(values.fileSize ?? 0)
- if modified < cutoff {
- if (try? fm.removeItem(at: url)) != nil {
+ var files: [LocalBatchFile] = []
+ for file in localBatchFiles() {
+ if file.modified < cutoff {
+ if (try? FileManager.default.removeItem(at: file.url)) != nil {
removedCount += 1
- removedBytes += size
+ removedBytes += file.size
}
- continue
+ } else {
+ files.append(file)
}
- files.append(LocalBatchFile(url: url, modified: modified, size: size))
}
var totalBytes = files.reduce(Int64(0)) { $0 + $1.size }
for file in files.sorted(by: { $0.modified < $1.modified }) where totalBytes > maxBatchBytes {
- if (try? fm.removeItem(at: file.url)) != nil {
+ if (try? FileManager.default.removeItem(at: file.url)) != nil {
totalBytes -= file.size
removedCount += 1
removedBytes += file.size
@@ -289,9 +388,8 @@
}
private func localBatchFiles() -> [LocalBatchFile] {
- let fm = FileManager.default
guard
- let urls = try? fm.contentsOfDirectory(
+ let urls = try? FileManager.default.contentsOfDirectory(
at: batchDirectory,
includingPropertiesForKeys: [.contentModificationDateKey, .fileSizeKey, .isRegularFileKey],
options: [.skipsHiddenFiles]
@@ -299,7 +397,8 @@
else { return [] }
var files: [LocalBatchFile] = []
- for url in urls where url.pathExtension == "json" {
+ for url in urls
+ where url.pathExtension == "json" || url.pathExtension == LocalBatchCryptor.encryptedExtension {
guard
let values = try? url.resourceValues(forKeys: [
.contentModificationDateKey,
@@ -312,19 +411,41 @@
LocalBatchFile(
url: url,
modified: values.contentModificationDate ?? .distantPast,
- size: Int64(values.fileSize ?? 0)
+ size: Int64(values.fileSize ?? 0),
+ encrypted: url.pathExtension == LocalBatchCryptor.encryptedExtension
))
}
return files
}
+
+ private func readLocalBatch(_ file: LocalBatchFile) throws -> Data {
+ let data = try Data(contentsOf: file.url)
+ guard file.encrypted else {
+ return data
+ }
+ guard let localBatchCryptor else {
+ throw LocalBatchCryptoError.invalidCiphertext
+ }
+ return try localBatchCryptor.decrypt(data)
+ }
}
private struct LocalBatchFile {
let url: URL
let modified: Date
let size: Int64
+ let encrypted: Bool
+
+ var batchId: String {
+ url.deletingPathExtension().lastPathComponent
+ }
}
+struct LocalBatchReplayResult: Sendable, Equatable {
+ let submitted: Int
+ let failed: Int
+}
+
struct LocalBatchStats: Sendable, Equatable {
let fileCount: Int
let bytes: Int64
@@ -522,6 +643,7 @@
case invalidBatchPayload
case secretBrokerStatus(Int)
case malformedSecretBrokerResponse
+ case missingPersistedBatch
var errorDescription: String? {
switch self {
@@ -531,6 +653,8 @@
return "secret broker wrap returned HTTP \(status)"
case .malformedSecretBrokerResponse:
return "secret broker wrap response did not include artifact and grant ids"
+ case .missingPersistedBatch:
+ return "persisted local batch did not include an inline batch payload"
}
}
}
@@ -629,6 +753,12 @@
return try enc.encode(request)
}
+private func decodeSubmitBatchRequest(_ data: Data) throws -> SubmitBatchRequest {
+ let dec = JSONDecoder()
+ dec.dateDecodingStrategy = .iso8601
+ return try dec.decode(SubmitBatchRequest.self, from: data)
+}
+
func encodeFrameBatch(_ batch: Batch) throws -> Data {
let enc = JSONEncoder()
enc.dateEncodingStrategy = .iso8601
diff --git a/Sources/agentd/main.swift b/Sources/agentd/main.swift
--- a/Sources/agentd/main.swift
+++ b/Sources/agentd/main.swift
@@ -33,7 +33,9 @@
authMode: cfg.auth,
secretBroker: cfg.secretBroker,
maxBatchBytes: cfg.maxBatchBytes,
- maxBatchAgeDays: cfg.maxBatchAgeDays
+ maxBatchAgeDays: cfg.maxBatchAgeDays,
+ deviceId: cfg.deviceId,
+ encryptLocalBatches: cfg.encryptLocalBatches
)
} catch {
Log.submit.fault(
@@ -44,7 +46,9 @@
localOnly: true,
authMode: .none,
maxBatchBytes: cfg.maxBatchBytes,
- maxBatchAgeDays: cfg.maxBatchAgeDays
+ maxBatchAgeDays: cfg.maxBatchAgeDays,
+ deviceId: cfg.deviceId,
+ encryptLocalBatches: false
)
}
self.submitter = submitter
@@ -277,8 +281,12 @@
flushTimer?.invalidate()
let interval = max(1, config.batchIntervalSeconds)
let pipeline = pipeline
+ let submitter = submitter
flushTimer = Timer.scheduledTimer(withTimeInterval: interval, repeats: true) { _ in
- Task { await pipeline.flush() }
+ Task {
+ await pipeline.flush()
+ _ = await submitter.retryLocalBatches()
+ }
}
}
diff --git a/Tests/agentdTests/SubmitterTests.swift b/Tests/agentdTests/SubmitterTests.swift
--- a/Tests/agentdTests/SubmitterTests.swift
+++ b/Tests/agentdTests/SubmitterTests.swift
@@ -1,5 +1,6 @@
// SPDX-License-Identifier: BUSL-1.1
+@preconcurrency import CryptoKit
import Foundation
import XCTest
@@ -130,6 +131,33 @@
XCTAssertNotNil(root["secretBroker"])
}
+ func testAgentConfigDefaultsEncryptedBatchesForRemoteMode() throws {
+ let remote = """
+ {
+ "deviceId": "device_1",
+ "organizationId": "org_1",
+ "endpoint": "https://chronicle.example.com/chronicle.v1.ChronicleService/SubmitBatch",
+ "localOnly": false,
+ "auth": {
+ "mode": "bearer",
+ "keychainService": "agentd",
+ "keychainAccount": "chronicle"
+ }
+ }
+ """.data(using: .utf8)!
+ let local = """
+ {
+ "deviceId": "device_1",
+ "organizationId": "org_1",
+ "endpoint": "http://127.0.0.1:8787/chronicle.v1.ChronicleService/SubmitBatch",
+ "localOnly": true
+ }
+ """.data(using: .utf8)!
+
+ XCTAssertTrue(try JSONDecoder().decode(AgentConfig.self, from: remote).encryptLocalBatches)
+ XCTAssertFalse(try JSONDecoder().decode(AgentConfig.self, from: local).encryptLocalBatches)
+ }
+
func testEndpointPolicyRejectsPlainHttpRemoteAndAllowsHttpsAndLoopback() throws {
let provider = StubCredentialProvider(token: "token")
XCTAssertNoThrow(
@@ -340,11 +368,297 @@
}
}
+ func testEncryptedLocalBatchPersistenceDoesNotWritePlaintext() async throws {
+ let dir = try makeTemporaryDirectory()
+ let submitter = try Submitter(
+ endpoint: URL(string: "http://127.0.0.1:8787/submit")!,
+ localOnly: true,
+ batchDirectory: dir,
+ deviceId: "device_1",
+ encryptLocalBatches: true,
+ localBatchKeyProvider: StaticLocalBatchKeyProvider.one
+ )
+
+ let result = await submitter.submit(Self.batch())
+
+ XCTAssertEqual(result, .persistedLocal)
+ let files = try FileManager.default.contentsOfDirectory(
+ at: dir,
+ includingPropertiesForKeys: nil
+ )
+ let file = try XCTUnwrap(files.first)
+ XCTAssertEqual(file.pathExtension, LocalBatchCryptor.encryptedExtension)
+ let stored = try Data(contentsOf: file)
+ XCTAssertFalse(String(data: stored, encoding: .utf8)?.contains("ChronicleService") ?? false)
+ let plaintext = try LocalBatchCryptor(key: StaticLocalBatchKeyProvider.one.key).decrypt(stored)
+ let root = try XCTUnwrap(JSONSerialization.jsonObject(with: plaintext) as? [String: Any])
+ XCTAssertNotNil(root["batch"])
+ }
+
+ func testEncryptedLocalBatchFailsClosedWithWrongKey() async throws {
+ let dir = try makeTemporaryDirectory()
+ let submitter = try Submitter(
+ endpoint: URL(string: "http://127.0.0.1:8787/submit")!,
+ localOnly: true,
+ batchDirectory: dir,
+ deviceId: "device_1",
+ encryptLocalBatches: true,
+ localBatchKeyProvider: StaticLocalBatchKeyProvider.one
+ )
+ _ = await submitter.submit(Self.batch())
+
+ let file = try XCTUnwrap(
+ FileManager.default.contentsOfDirectory(at: dir, includingPropertiesForKeys: nil).first)
+ let stored = try Data(contentsOf: file)
+ XCTAssertThrowsError(
+ try LocalBatchCryptor(key: StaticLocalBatchKeyProvider.two.key).decrypt(stored)
+ )
+ }
+
+ func testEncryptedLocalBatchReplaySubmitsAndRemovesQueuedFile() async throws {
+ let dir = try makeTemporaryDirectory()
+ let failing = try Submitter(
+ endpoint: URL(string: "https://chronicle.example.com/submit")!,
+ localOnly: false,
+ authMode: .bearer(keychainService: "svc", keychainAccount: "acct"),
+ credentialProvider: StubCredentialProvider(token: "token"),
+ client: StubHTTPClient.status(503, body: #"{"error":"down"}"#),
+ batchDirectory: dir,
+ deviceId: "device_1",
... diff truncated: showing 800 of 1167 linesYou can send follow-ups to the cloud agent here.
|
Bugbot Autofix prepared fixes for both issues found in the latest run.
Preview (f50e899c41)diff --git a/README.md b/README.md
--- a/README.md
+++ b/README.md
@@ -28,11 +28,14 @@
(default 4096) with `ocrTextTruncated` set when the cap applies.
- Batches every 30s or 24 frames, whichever first.
- Local-only mode persists batches under `~/.evalops/agentd/batches/` as
- `0o600` JSON and sweeps old or over-budget batches; HTTP mode `POST`s a
- Connect/proto JSON `SubmitBatchRequest` to
+ `0o600` JSON by default and sweeps old or over-budget batches; HTTP mode
+ `POST`s a Connect/proto JSON `SubmitBatchRequest` to
`chronicle.v1.ChronicleService.SubmitBatch` and falls back to local on
failure. Remote HTTP is allowed only for loopback development; non-loopback
remote endpoints must use HTTPS and configured client auth.
+- Remote and Secret Broker modes encrypt local fallback batches at rest by
+ default using a per-device Keychain-backed AES-GCM key. Local-only mode can
+ opt in with `encryptLocalBatches: true`.
- Optional Secret Broker mode wraps the frame batch into a broker artifact
(`chronicle_frame_batch_json`) first, then sends only the artifact/session
reference to Chronicle so Platform can unwrap, meter, and revoke through ASB.
@@ -93,6 +96,8 @@
- `maxOcrTextChars: 4096`
- `maxBatchAgeDays: 7`
- `maxBatchBytes: 536870912`
+- `encryptLocalBatches: false` in local-only mode, `true` in remote or Secret
+ Broker mode when omitted
- `auth: { "mode": "none" }`
Remote mode requires `localOnly: false`, an HTTPS or loopback endpoint, and an
@@ -137,6 +142,12 @@
If wrapping fails, agentd persists the original inline `SubmitBatchRequest`
locally and does not write the broker session token to disk.
+Encrypted local batches use the `.agentdbatch` extension. The encryption key is
+created or loaded from Keychain service `dev.evalops.agentd.local-batch-key`,
+accounted by `deviceId`, and is never written to `config.json` or the batch
+directory. Retention sweeps apply to both plaintext `.json` batches and
+encrypted `.agentdbatch` batches.
+
## What's next
- Consume generated `chronicle.v1` Swift types when the platform SDK publishes
@@ -144,7 +155,6 @@
([evalops/platform#1078](https://github.com/evalops/platform/issues/1078)).
- Calendar / Zoom auto-pause via NATS subject
`chronicle.policy.pause` (siphon-fed).
-- Encryption-at-rest option for local batches.
- Hardware-backed permission-flow smoke test for Screen Recording and
Accessibility prompts.
diff --git a/Sources/agentd/Config.swift b/Sources/agentd/Config.swift
--- a/Sources/agentd/Config.swift
+++ b/Sources/agentd/Config.swift
@@ -74,6 +74,7 @@
var idleThresholdSeconds: Double
var idlePollSeconds: Double
var localOnly: Bool
+ var encryptLocalBatches: Bool
var auth: AuthMode
var secretBroker: SecretBrokerConfig?
@@ -100,6 +101,7 @@
case idleThresholdSeconds
case idlePollSeconds
case localOnly
+ case encryptLocalBatches
case auth
case secretBroker
}
@@ -126,6 +128,7 @@
idleThresholdSeconds: Double = 60,
idlePollSeconds: Double = 5,
localOnly: Bool,
+ encryptLocalBatches: Bool? = nil,
auth: AuthMode = .none,
secretBroker: SecretBrokerConfig? = nil
) {
@@ -150,6 +153,7 @@
self.idleThresholdSeconds = idleThresholdSeconds
self.idlePollSeconds = idlePollSeconds
self.localOnly = localOnly
+ self.encryptLocalBatches = encryptLocalBatches ?? (!localOnly || secretBroker != nil)
self.auth = auth
self.secretBroker = secretBroker
}
@@ -193,6 +197,9 @@
localOnly = try container.decodeIfPresent(Bool.self, forKey: .localOnly) ?? true
auth = try container.decodeIfPresent(AuthMode.self, forKey: .auth) ?? .none
secretBroker = try container.decodeIfPresent(SecretBrokerConfig.self, forKey: .secretBroker)
+ encryptLocalBatches =
+ try container.decodeIfPresent(Bool.self, forKey: .encryptLocalBatches)
+ ?? (!localOnly || secretBroker != nil)
}
func encode(to encoder: Encoder) throws {
@@ -218,6 +225,7 @@
try container.encode(idleThresholdSeconds, forKey: .idleThresholdSeconds)
try container.encode(idlePollSeconds, forKey: .idlePollSeconds)
try container.encode(localOnly, forKey: .localOnly)
+ try container.encode(encryptLocalBatches, forKey: .encryptLocalBatches)
try container.encode(auth, forKey: .auth)
try container.encodeIfPresent(secretBroker, forKey: .secretBroker)
}
@@ -277,6 +285,7 @@
idleThresholdSeconds: 60,
idlePollSeconds: 5,
localOnly: true,
+ encryptLocalBatches: false,
auth: .none,
secretBroker: nil
)
diff --git a/Sources/agentd/LocalBatchCrypto.swift b/Sources/agentd/LocalBatchCrypto.swift
new file mode 100644
--- /dev/null
+++ b/Sources/agentd/LocalBatchCrypto.swift
@@ -1,0 +1,106 @@
+// SPDX-License-Identifier: BUSL-1.1
+
+@preconcurrency import CryptoKit
+import Foundation
+import Security
+
+protocol LocalBatchKeyProviding: Sendable {
+ func localBatchKey(deviceId: String) throws -> SymmetricKey
+}
+
+struct KeychainLocalBatchKeyProvider: LocalBatchKeyProviding {
+ private let service = "dev.evalops.agentd.local-batch-key"
+
+ func localBatchKey(deviceId: String) throws -> SymmetricKey {
+ if let existing = try readKey(deviceId: deviceId) {
+ return SymmetricKey(data: existing)
+ }
+
+ var bytes = Data(count: 32)
+ let status = bytes.withUnsafeMutableBytes { buffer in
+ SecRandomCopyBytes(kSecRandomDefault, buffer.count, buffer.baseAddress!)
+ }
+ guard status == errSecSuccess else {
+ throw LocalBatchCryptoError.keyGenerationFailed(status)
+ }
+ try storeKey(bytes, deviceId: deviceId)
+ return SymmetricKey(data: bytes)
+ }
+
+ private func readKey(deviceId: String) throws -> Data? {
+ let query: [String: Any] = [
+ kSecClass as String: kSecClassGenericPassword,
+ kSecAttrService as String: service,
+ kSecAttrAccount as String: deviceId,
+ kSecReturnData as String: true,
+ kSecMatchLimit as String: kSecMatchLimitOne,
+ ]
+ var item: CFTypeRef?
+ let status = SecItemCopyMatching(query as CFDictionary, &item)
+ if status == errSecItemNotFound {
+ return nil
+ }
+ guard status == errSecSuccess, let data = item as? Data else {
+ throw LocalBatchCryptoError.keychainReadFailed(status)
+ }
+ return data
+ }
+
+ private func storeKey(_ key: Data, deviceId: String) throws {
+ let attributes: [String: Any] = [
+ kSecClass as String: kSecClassGenericPassword,
+ kSecAttrService as String: service,
+ kSecAttrAccount as String: deviceId,
+ kSecAttrAccessible as String: kSecAttrAccessibleAfterFirstUnlockThisDeviceOnly,
+ kSecValueData as String: key,
+ ]
+ let status = SecItemAdd(attributes as CFDictionary, nil)
+ guard status == errSecSuccess || status == errSecDuplicateItem else {
+ throw LocalBatchCryptoError.keychainWriteFailed(status)
+ }
+ }
+}
+
+struct LocalBatchCryptor: @unchecked Sendable {
+ static let encryptedExtension = "agentdbatch"
+ private static let magic = Data("AGENTD-BATCH-AESGCM-v1\n".utf8)
+
+ let key: SymmetricKey
+
+ func encrypt(_ plaintext: Data) throws -> Data {
+ let sealed = try AES.GCM.seal(plaintext, using: key)
+ guard let combined = sealed.combined else {
+ throw LocalBatchCryptoError.invalidCiphertext
+ }
+ return Self.magic + combined
+ }
+
+ func decrypt(_ ciphertext: Data) throws -> Data {
+ guard ciphertext.starts(with: Self.magic) else {
+ throw LocalBatchCryptoError.invalidCiphertext
+ }
+ let combined = ciphertext.dropFirst(Self.magic.count)
+ let box = try AES.GCM.SealedBox(combined: Data(combined))
+ return try AES.GCM.open(box, using: key)
+ }
+}
+
+enum LocalBatchCryptoError: Error, LocalizedError, Equatable {
+ case keyGenerationFailed(OSStatus)
+ case keychainReadFailed(OSStatus)
+ case keychainWriteFailed(OSStatus)
+ case invalidCiphertext
+
+ var errorDescription: String? {
+ switch self {
+ case .keyGenerationFailed(let status):
+ return "failed to generate local batch key: \(status)"
+ case .keychainReadFailed(let status):
+ return "failed to read local batch key from Keychain: \(status)"
+ case .keychainWriteFailed(let status):
+ return "failed to store local batch key in Keychain: \(status)"
+ case .invalidCiphertext:
+ return "local batch ciphertext is invalid"
+ }
+ }
+}
diff --git a/Sources/agentd/Submitter.swift b/Sources/agentd/Submitter.swift
--- a/Sources/agentd/Submitter.swift
+++ b/Sources/agentd/Submitter.swift
@@ -14,6 +14,7 @@
private let batchDirectory: URL
private let maxBatchBytes: Int64
private let maxBatchAgeDays: Double
+ private let localBatchCryptor: LocalBatchCryptor?
init(
endpoint: URL,
@@ -25,7 +26,10 @@
client: (any HTTPClient)? = nil,
batchDirectory: URL? = nil,
maxBatchBytes: Int64 = 512 * 1024 * 1024,
- maxBatchAgeDays: Double = 7
+ maxBatchAgeDays: Double = 7,
+ deviceId: String = "default",
+ encryptLocalBatches: Bool = false,
+ localBatchKeyProvider: any LocalBatchKeyProviding = KeychainLocalBatchKeyProvider()
) throws {
guard EndpointPolicy.isAllowed(endpoint: endpoint, localOnly: localOnly) else {
throw SubmitterInitError.insecureRemoteEndpoint(endpoint.absoluteString)
@@ -56,6 +60,13 @@
.appendingPathComponent(".evalops/agentd/batches")
self.maxBatchBytes = maxBatchBytes
self.maxBatchAgeDays = maxBatchAgeDays
+ if encryptLocalBatches {
+ self.localBatchCryptor = try LocalBatchCryptor(
+ key: localBatchKeyProvider.localBatchKey(deviceId: deviceId)
+ )
+ } else {
+ self.localBatchCryptor = nil
+ }
if let client {
self.client = client
@@ -102,35 +113,53 @@
}
let submitData: Data
- if let secretBroker {
- do {
- let wrapped = try await wrapFrameBatch(batch, using: secretBroker)
- submitData = try encodeBrokerSubmitBatchRequest(
- sessionToken: secretBroker.sessionToken,
- artifactId: wrapped.artifactId,
- grantId: wrapped.grantId,
- localOnly: localOnly
+ do {
+ submitData = try await makeRemoteSubmitData(for: batch, fallbackData: fallbackData)
+ } catch {
+ Log.submit.warning(
+ "remote submit prepare failed batch=\(batch.batchId, privacy: .public) error=\(error.localizedDescription, privacy: .public) — falling back to local"
+ )
+ await persistLocal(batch.batchId, data: fallbackData)
+ return .persistedLocal
+ }
+
+ let result = await submitRemoteData(submitData, batchId: batch.batchId)
+ if case .submitted = result {
+ let replay = await retryLocalBatches()
+ if replay.submitted > 0 || replay.failed > 0 {
+ Log.submit.info(
+ "local batch replay submitted=\(replay.submitted, privacy: .public) failed=\(replay.failed, privacy: .public)"
)
- } catch {
- Log.submit.warning(
- "secret broker wrap failed batch=\(batch.batchId, privacy: .public) error=\(error.localizedDescription, privacy: .public) — falling back to local"
- )
- await persistLocal(batch.batchId, data: fallbackData)
- return .persistedLocal
}
- } else {
- submitData = fallbackData
+ return result
}
- let req = makeRequest(body: submitData)
+ await persistLocal(batch.batchId, data: fallbackData)
+ return .persistedLocal
+ }
+
+ private func makeRemoteSubmitData(for batch: Batch, fallbackData: Data) async throws -> Data {
+ guard let secretBroker else {
+ return fallbackData
+ }
+ let wrapped = try await wrapFrameBatch(batch, using: secretBroker)
+ return try encodeBrokerSubmitBatchRequest(
+ sessionToken: secretBroker.sessionToken,
+ artifactId: wrapped.artifactId,
+ grantId: wrapped.grantId,
+ localOnly: localOnly
+ )
+ }
+
+ private func submitRemoteData(_ data: Data, batchId: String) async -> SubmitResult {
+ let req = makeRequest(body: data)
do {
let (body, resp) = try await client.data(for: req)
if let http = resp as? HTTPURLResponse, !(200..<300).contains(http.statusCode) {
Log.submit.warning(
- "submit status=\(http.statusCode, privacy: .public) batch=\(batch.batchId, privacy: .public) — falling back to local"
+ "submit status=\(http.statusCode, privacy: .public) batch=\(batchId, privacy: .public)"
)
- await persistLocal(batch.batchId, data: fallbackData)
- return .persistedLocal
+ return .failed
} else {
let response: SubmitBatchResponse?
if body.isEmpty {
@@ -140,23 +169,21 @@
response = try JSONDecoder().decode(SubmitBatchResponse.self, from: body)
} catch {
Log.submit.warning(
- "submit malformed response batch=\(batch.batchId, privacy: .public) — falling back to local"
+ "submit malformed response batch=\(batchId, privacy: .public)"
)
- await persistLocal(batch.batchId, data: fallbackData)
- return .persistedLocal
+ return .failed
}
}
Log.submit.info(
- "submit ok batch=\(batch.batchId, privacy: .public) frames=\(batch.frames.count, privacy: .public)"
+ "submit ok batch=\(batchId, privacy: .public)"
)
- await sweepLocalBatches()
+ _ = await retryLocalBatches()
return .submitted(response)
}
} catch {
Log.submit.warning(
"submit error \(error.localizedDescription, privacy: .public) — falling back to local")
- await persistLocal(batch.batchId, data: fallbackData)
- return .persistedLocal
+ return .failed
}
}
@@ -219,54 +246,105 @@
private func persistLocal(_ id: String, data: Data) async {
try? FileManager.default.createDirectory(at: batchDirectory, withIntermediateDirectories: true)
- let url = batchDirectory.appendingPathComponent("\(id).json")
- try? data.write(to: url, options: .atomic)
+ let url: URL
+ let dataToWrite: Data
+ if let localBatchCryptor {
+ do {
+ dataToWrite = try localBatchCryptor.encrypt(data)
+ url = batchDirectory.appendingPathComponent(
+ "\(id).\(LocalBatchCryptor.encryptedExtension)"
+ )
+ } catch {
+ Log.submit.error(
+ "local batch encrypt failed id=\(id, privacy: .public) error=\(error.localizedDescription, privacy: .public)"
+ )
+ return
+ }
+ } else {
+ dataToWrite = data
+ url = batchDirectory.appendingPathComponent("\(id).json")
+ }
+ try? dataToWrite.write(to: url, options: .atomic)
try? FileManager.default.setAttributes([.posixPermissions: 0o600], ofItemAtPath: url.path)
Log.submit.info("local persist \(url.path, privacy: .public)")
await sweepLocalBatches()
}
+ func retryLocalBatches() async -> LocalBatchReplayResult {
+ guard !localOnly else {
+ return LocalBatchReplayResult(submitted: 0, failed: 0)
+ }
+
+ var submitted = 0
+ var failed = 0
+ for file in localBatchFiles().sorted(by: { $0.modified < $1.modified }) {
+ let data: Data
+ do {
+ data = try readLocalBatch(file)
+ } catch {
+ failed += 1
+ Log.submit.warning(
+ "local batch replay read failed file=\(file.url.lastPathComponent, privacy: .public) error=\(error.localizedDescription, privacy: .public)"
+ )
+ continue
+ }
+
+ let submitData: Data
+ do {
+ submitData = try await makeReplaySubmitData(from: data)
+ } catch {
+ failed += 1
+ Log.submit.warning(
+ "local batch replay prepare failed file=\(file.url.lastPathComponent, privacy: .public) error=\(error.localizedDescription, privacy: .public)"
+ )
+ continue
+ }
+
+ let result = await submitRemoteData(submitData, batchId: file.batchId)
+ switch result {
+ case .submitted:
+ try? FileManager.default.removeItem(at: file.url)
+ submitted += 1
+ case .failed, .persistedLocal:
+ failed += 1
+ }
+ }
+ await sweepLocalBatches()
+ return LocalBatchReplayResult(submitted: submitted, failed: failed)
+ }
+
+ private func makeReplaySubmitData(from persistedData: Data) async throws -> Data {
+ guard secretBroker != nil else {
+ return persistedData
+ }
+ let request = try decodeSubmitBatchRequest(persistedData)
+ guard let batch = request.batch else {
+ throw SubmitterError.missingPersistedBatch
+ }
+ return try await makeRemoteSubmitData(for: batch, fallbackData: persistedData)
+ }
+
func sweepLocalBatches() async {
- let fm = FileManager.default
- guard
- let urls = try? fm.contentsOfDirectory(
- at: batchDirectory,
- includingPropertiesForKeys: [.contentModificationDateKey, .fileSizeKey, .isRegularFileKey],
- options: [.skipsHiddenFiles]
- )
- else { return }
-
- var files: [LocalBatchFile] = []
let now = Date()
let cutoff = now.addingTimeInterval(-maxBatchAgeDays * 24 * 60 * 60)
var removedCount = 0
var removedBytes: Int64 = 0
- for url in urls where url.pathExtension == "json" {
- guard
- let values = try? url.resourceValues(forKeys: [
- .contentModificationDateKey,
- .fileSizeKey,
- .isRegularFileKey,
- ]),
- values.isRegularFile == true
- else { continue }
-
- let modified = values.contentModificationDate ?? .distantPast
- let size = Int64(values.fileSize ?? 0)
- if modified < cutoff {
- if (try? fm.removeItem(at: url)) != nil {
+ var files: [LocalBatchFile] = []
+ for file in localBatchFiles() {
+ if file.modified < cutoff {
+ if (try? FileManager.default.removeItem(at: file.url)) != nil {
removedCount += 1
- removedBytes += size
+ removedBytes += file.size
}
- continue
+ } else {
+ files.append(file)
}
- files.append(LocalBatchFile(url: url, modified: modified, size: size))
}
var totalBytes = files.reduce(Int64(0)) { $0 + $1.size }
for file in files.sorted(by: { $0.modified < $1.modified }) where totalBytes > maxBatchBytes {
- if (try? fm.removeItem(at: file.url)) != nil {
+ if (try? FileManager.default.removeItem(at: file.url)) != nil {
totalBytes -= file.size
removedCount += 1
removedBytes += file.size
@@ -279,14 +357,66 @@
)
}
}
+
+ private func localBatchFiles() -> [LocalBatchFile] {
+ guard
+ let urls = try? FileManager.default.contentsOfDirectory(
+ at: batchDirectory,
+ includingPropertiesForKeys: [.contentModificationDateKey, .fileSizeKey, .isRegularFileKey],
+ options: [.skipsHiddenFiles]
+ )
+ else { return [] }
+
+ var files: [LocalBatchFile] = []
+ for url in urls
+ where url.pathExtension == "json" || url.pathExtension == LocalBatchCryptor.encryptedExtension {
+ guard
+ let values = try? url.resourceValues(forKeys: [
+ .contentModificationDateKey,
+ .fileSizeKey,
+ .isRegularFileKey,
+ ]),
+ values.isRegularFile == true
+ else { continue }
+ files.append(
+ LocalBatchFile(
+ url: url,
+ modified: values.contentModificationDate ?? .distantPast,
+ size: Int64(values.fileSize ?? 0),
+ encrypted: url.pathExtension == LocalBatchCryptor.encryptedExtension
+ ))
+ }
+ return files
+ }
+
+ private func readLocalBatch(_ file: LocalBatchFile) throws -> Data {
+ let data = try Data(contentsOf: file.url)
+ guard file.encrypted else {
+ return data
+ }
+ guard let localBatchCryptor else {
+ throw LocalBatchCryptoError.invalidCiphertext
+ }
+ return try localBatchCryptor.decrypt(data)
+ }
}
private struct LocalBatchFile {
let url: URL
let modified: Date
let size: Int64
+ let encrypted: Bool
+
+ var batchId: String {
+ url.deletingPathExtension().lastPathComponent
+ }
}
+struct LocalBatchReplayResult: Sendable, Equatable {
+ let submitted: Int
+ let failed: Int
+}
+
protocol HTTPClient: Sendable {
func data(for request: URLRequest) async throws -> (Data, URLResponse)
}
@@ -479,6 +609,7 @@
case invalidBatchPayload
case secretBrokerStatus(Int)
case malformedSecretBrokerResponse
+ case missingPersistedBatch
var errorDescription: String? {
switch self {
@@ -488,6 +619,8 @@
return "secret broker wrap returned HTTP \(status)"
case .malformedSecretBrokerResponse:
return "secret broker wrap response did not include artifact and grant ids"
+ case .missingPersistedBatch:
+ return "persisted local batch did not include an inline batch payload"
}
}
}
@@ -586,6 +719,12 @@
return try enc.encode(request)
}
+private func decodeSubmitBatchRequest(_ data: Data) throws -> SubmitBatchRequest {
+ let dec = JSONDecoder()
+ dec.dateDecodingStrategy = .iso8601
+ return try dec.decode(SubmitBatchRequest.self, from: data)
+}
+
func encodeFrameBatch(_ batch: Batch) throws -> Data {
let enc = JSONEncoder()
enc.dateEncodingStrategy = .iso8601
diff --git a/Sources/agentd/main.swift b/Sources/agentd/main.swift
--- a/Sources/agentd/main.swift
+++ b/Sources/agentd/main.swift
@@ -27,7 +27,9 @@
authMode: cfg.auth,
secretBroker: cfg.secretBroker,
maxBatchBytes: cfg.maxBatchBytes,
- maxBatchAgeDays: cfg.maxBatchAgeDays
+ maxBatchAgeDays: cfg.maxBatchAgeDays,
+ deviceId: cfg.deviceId,
+ encryptLocalBatches: cfg.encryptLocalBatches
)
} catch {
Log.submit.fault(
@@ -38,7 +40,9 @@
localOnly: true,
authMode: .none,
maxBatchBytes: cfg.maxBatchBytes,
- maxBatchAgeDays: cfg.maxBatchAgeDays
+ maxBatchAgeDays: cfg.maxBatchAgeDays,
+ deviceId: cfg.deviceId,
+ encryptLocalBatches: false
)
}
self.submitter = submitter
@@ -87,8 +91,12 @@
}
let interval = config.batchIntervalSeconds
+ let submitter = submitter
flushTimer = Timer.scheduledTimer(withTimeInterval: interval, repeats: true) { _ in
- Task { await pipeline.flush() }
+ Task {
+ await pipeline.flush()
+ _ = await submitter.retryLocalBatches()
+ }
}
idleTimer = Timer.scheduledTimer(
withTimeInterval: max(1, config.idlePollSeconds), repeats: true
diff --git a/Tests/agentdTests/SubmitterTests.swift b/Tests/agentdTests/SubmitterTests.swift
--- a/Tests/agentdTests/SubmitterTests.swift
+++ b/Tests/agentdTests/SubmitterTests.swift
@@ -1,5 +1,6 @@
// SPDX-License-Identifier: BUSL-1.1
+@preconcurrency import CryptoKit
import Foundation
import XCTest
@@ -130,6 +131,33 @@
XCTAssertNotNil(root["secretBroker"])
}
+ func testAgentConfigDefaultsEncryptedBatchesForRemoteMode() throws {
+ let remote = """
+ {
+ "deviceId": "device_1",
+ "organizationId": "org_1",
+ "endpoint": "https://chronicle.example.com/chronicle.v1.ChronicleService/SubmitBatch",
+ "localOnly": false,
+ "auth": {
+ "mode": "bearer",
+ "keychainService": "agentd",
+ "keychainAccount": "chronicle"
+ }
+ }
+ """.data(using: .utf8)!
+ let local = """
+ {
+ "deviceId": "device_1",
+ "organizationId": "org_1",
+ "endpoint": "http://127.0.0.1:8787/chronicle.v1.ChronicleService/SubmitBatch",
+ "localOnly": true
+ }
+ """.data(using: .utf8)!
+
+ XCTAssertTrue(try JSONDecoder().decode(AgentConfig.self, from: remote).encryptLocalBatches)
+ XCTAssertFalse(try JSONDecoder().decode(AgentConfig.self, from: local).encryptLocalBatches)
+ }
+
func testEndpointPolicyRejectsPlainHttpRemoteAndAllowsHttpsAndLoopback() throws {
let provider = StubCredentialProvider(token: "token")
XCTAssertNoThrow(
@@ -340,11 +368,252 @@
}
}
+ func testEncryptedLocalBatchPersistenceDoesNotWritePlaintext() async throws {
+ let dir = try makeTemporaryDirectory()
+ let submitter = try Submitter(
+ endpoint: URL(string: "http://127.0.0.1:8787/submit")!,
+ localOnly: true,
+ batchDirectory: dir,
+ deviceId: "device_1",
+ encryptLocalBatches: true,
+ localBatchKeyProvider: StaticLocalBatchKeyProvider.one
+ )
+
+ let result = await submitter.submit(Self.batch())
+
+ XCTAssertEqual(result, .persistedLocal)
+ let files = try FileManager.default.contentsOfDirectory(
+ at: dir,
+ includingPropertiesForKeys: nil
+ )
+ let file = try XCTUnwrap(files.first)
+ XCTAssertEqual(file.pathExtension, LocalBatchCryptor.encryptedExtension)
+ let stored = try Data(contentsOf: file)
+ XCTAssertFalse(String(data: stored, encoding: .utf8)?.contains("ChronicleService") ?? false)
+ let plaintext = try LocalBatchCryptor(key: StaticLocalBatchKeyProvider.one.key).decrypt(stored)
+ let root = try XCTUnwrap(JSONSerialization.jsonObject(with: plaintext) as? [String: Any])
+ XCTAssertNotNil(root["batch"])
+ }
+
+ func testEncryptedLocalBatchFailsClosedWithWrongKey() async throws {
+ let dir = try makeTemporaryDirectory()
+ let submitter = try Submitter(
+ endpoint: URL(string: "http://127.0.0.1:8787/submit")!,
+ localOnly: true,
+ batchDirectory: dir,
+ deviceId: "device_1",
+ encryptLocalBatches: true,
+ localBatchKeyProvider: StaticLocalBatchKeyProvider.one
+ )
+ _ = await submitter.submit(Self.batch())
+
+ let file = try XCTUnwrap(
+ FileManager.default.contentsOfDirectory(at: dir, includingPropertiesForKeys: nil).first)
+ let stored = try Data(contentsOf: file)
+ XCTAssertThrowsError(
+ try LocalBatchCryptor(key: StaticLocalBatchKeyProvider.two.key).decrypt(stored)
+ )
+ }
+
+ func testEncryptedLocalBatchReplaySubmitsAndRemovesQueuedFile() async throws {
+ let dir = try makeTemporaryDirectory()
+ let failing = try Submitter(
+ endpoint: URL(string: "https://chronicle.example.com/submit")!,
+ localOnly: false,
+ authMode: .bearer(keychainService: "svc", keychainAccount: "acct"),
+ credentialProvider: StubCredentialProvider(token: "token"),
+ client: StubHTTPClient.status(503, body: #"{"error":"down"}"#),
+ batchDirectory: dir,
+ deviceId: "device_1",
+ encryptLocalBatches: true,
+ localBatchKeyProvider: StaticLocalBatchKeyProvider.one
+ )
+
+ let persistedResult = await failing.submit(Self.batch())
+ XCTAssertEqual(persistedResult, .persistedLocal)
+ let encrypted = try XCTUnwrap(
+ FileManager.default.contentsOfDirectory(at: dir, includingPropertiesForKeys: nil).first)
+ XCTAssertEqual(encrypted.pathExtension, LocalBatchCryptor.encryptedExtension)
+
+ let recorder = RequestRecorder()
+ let replaying = try Submitter(
+ endpoint: URL(string: "https://chronicle.example.com/submit")!,
+ localOnly: false,
+ authMode: .bearer(keychainService: "svc", keychainAccount: "acct"),
+ credentialProvider: StubCredentialProvider(token: "token"),
+ client: StubHTTPClient { request in
+ await recorder.record(request)
+ let body = try XCTUnwrap(request.httpBody)
+ let root = try XCTUnwrap(JSONSerialization.jsonObject(with: body) as? [String: Any])
+ XCTAssertNotNil(root["batch"])
+ return (
+ Data(#"{"batchId":"batch_fixture"}"#.utf8),
+ Self.response(for: request.url!, statusCode: 200)
+ )
+ },
+ batchDirectory: dir,
+ deviceId: "device_1",
+ encryptLocalBatches: true,
+ localBatchKeyProvider: StaticLocalBatchKeyProvider.one
+ )
+
+ let replay = await replaying.retryLocalBatches()
+
+ XCTAssertEqual(replay, LocalBatchReplayResult(submitted: 1, failed: 0))
+ let requestCount = await recorder.count()
+ XCTAssertEqual(requestCount, 1)
+ let remaining = try FileManager.default.contentsOfDirectory(
+ at: dir,
+ includingPropertiesForKeys: nil
+ )
+ XCTAssertTrue(remaining.isEmpty)
+ }
+
+ func testSuccessfulSubmitRetriesQueuedLocalBatches() async throws {
+ let dir = try makeTemporaryDirectory()
+ let failing = try Submitter(
+ endpoint: URL(string: "https://chronicle.example.com/submit")!,
+ localOnly: false,
+ authMode: .bearer(keychainService: "svc", keychainAccount: "acct"),
+ credentialProvider: StubCredentialProvider(token: "token"),
+ client: StubHTTPClient.status(503, body: #"{"error":"down"}"#),
+ batchDirectory: dir
+ )
+
+ let persistedResult = await failing.submit(Self.batch(id: "queued_batch"))
... diff truncated: showing 800 of 984 linesYou can send follow-ups to the cloud agent here. |
f50e899 to
ab4b322
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is ON, but it could not run because the branch was deleted or merged before autofix could start.
Reviewed by Cursor Bugbot for commit e2a2503. Configure here.

Summary
.agentdbatchIssues
Refs #30.
Validation
swift testswift build -Xswiftc -warnings-as-errorsxcrun swift-format lint --strict --recursive Sources Tests Package.swiftscripts/package_app.shgit diff --check