Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions packages/opencode/src/share/share-next.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ const ShareSchema = Schema.Struct({
export type Share = typeof ShareSchema.Type

type State = {
queue: Map<string, { data: Map<string, Data> }>
queue: Map<SessionID, Map<string, Data>>
scope: Scope.Closeable
shared: Map<SessionID, Share | null>
}

type Data =
Expand Down Expand Up @@ -118,17 +119,20 @@ export const layer = Layer.effect(
function sync(sessionID: SessionID, data: Data[]): Effect.Effect<void> {
return Effect.gen(function* () {
if (disabled) return
const share = yield* getCached(sessionID)
if (!share) return

const s = yield* InstanceState.get(state)
const existing = s.queue.get(sessionID)
if (existing) {
for (const item of data) {
existing.data.set(key(item), item)
existing.set(key(item), item)
}
return
}

const next = new Map(data.map((item) => [key(item), item]))
s.queue.set(sessionID, { data: next })
s.queue.set(sessionID, next)
yield* flush(sessionID).pipe(
Effect.delay(1000),
Effect.catchCause((cause) =>
Expand All @@ -143,13 +147,14 @@ export const layer = Layer.effect(

const state: InstanceState.InstanceState<State> = yield* InstanceState.make<State>(
Effect.fn("ShareNext.state")(function* (_ctx) {
const cache: State = { queue: new Map(), scope: yield* Scope.make() }
const cache: State = { queue: new Map(), scope: yield* Scope.make(), shared: new Map() }

yield* Effect.addFinalizer(() =>
Scope.close(cache.scope, Exit.void).pipe(
Effect.andThen(
Effect.sync(() => {
cache.queue.clear()
cache.shared.clear()
}),
),
),
Expand Down Expand Up @@ -227,6 +232,18 @@ export const layer = Layer.effect(
return { id: row.id, secret: row.secret, url: row.url } satisfies Share
})

const getCached = Effect.fnUntraced(function* (sessionID: SessionID) {
const s = yield* InstanceState.get(state)
if (s.shared.has(sessionID)) {
const cached = s.shared.get(sessionID)
return cached === null ? undefined : cached
}

const share = yield* get(sessionID)
s.shared.set(sessionID, share ?? null)
return share
})

const flush = Effect.fn("ShareNext.flush")(function* (sessionID: SessionID) {
if (disabled) return
const s = yield* InstanceState.get(state)
Expand All @@ -235,13 +252,13 @@ export const layer = Layer.effect(

s.queue.delete(sessionID)

const share = yield* get(sessionID)
const share = yield* getCached(sessionID)
if (!share) return

const req = yield* request()
const res = yield* HttpClientRequest.post(`${req.baseUrl}${req.api.sync(share.id)}`).pipe(
HttpClientRequest.setHeaders(req.headers),
HttpClientRequest.bodyJson({ secret: share.secret, data: Array.from(queued.data.values()) }),
HttpClientRequest.bodyJson({ secret: share.secret, data: Array.from(queued.values()) }),
Effect.flatMap((r) => http.execute(r)),
)

Expand Down Expand Up @@ -307,6 +324,7 @@ export const layer = Layer.effect(
.run(),
)
const s = yield* InstanceState.get(state)
s.shared.set(sessionID, result)
yield* full(sessionID).pipe(
Effect.catchCause((cause) =>
Effect.sync(() => {
Expand All @@ -321,8 +339,13 @@ export const layer = Layer.effect(
const remove = Effect.fn("ShareNext.remove")(function* (sessionID: SessionID) {
if (disabled) return
log.info("removing share", { sessionID })
const share = yield* get(sessionID)
if (!share) return
const s = yield* InstanceState.get(state)
const share = yield* getCached(sessionID)
if (!share) {
s.shared.delete(sessionID)
s.queue.delete(sessionID)
return
}

const req = yield* request()
yield* HttpClientRequest.delete(`${req.baseUrl}${req.api.remove(share.id)}`).pipe(
Expand All @@ -332,6 +355,8 @@ export const layer = Layer.effect(
)

yield* db((db) => db.delete(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).run())
s.shared.delete(sessionID)
s.queue.delete(sessionID)
})

return Service.of({ init, url, request, create, remove })
Expand Down
Loading