# Ingest

inseam's typed, plugin-facing **write path** into the corpus. One call — `ctx.ingest({ source, envelope, parties })` — find-or-creates a `Source`, writes its `Envelope` + party rows, stamps plugin provenance, and emits a single `core.ingested` event in one Store batch. Gated by the `ingest` capability; replaces the prior `ctx.source.write` kludge.

**See also:** [arch/ingest/spec.md](../../../../arch/ingest/spec.md) · [arch/ingest/design.md](../../../../arch/ingest/design.md) · Starlight reference at `/reference/ingest/`

## What it does

One composed write owned by core. Plugins make one call; the framework guarantees:

- **Find-or-create the Source** by `(locationKind, normalize(pathId))`. New row → `source.indexed` event; existing row → no source write.
- **Append the Envelope** with a content-derived id over `(sourceId, transport, receivedAt, package, version)`. Same tuple twice → idempotent no-op (no rows, no events). The `rawPayload` is deliberately **not** in the hash — byte-noisy variants of one logical assertion collapse.
- **Stamp provenance** (`assertedByPluginPackage` / `assertedByPluginVersion`) from the `RuntimeContext` closure. Plugins cannot forge or override it.
- **Emit `core.ingested`** in the same batch as the writes, but only when at least one of source/envelope was newly created. Idempotent collapse is silent.
- **Commit one `Store.batch`** per attempt. Transparent retry on `UNIQUE(location_kind, path_id)` TOCTOU up to `tuning.findOrCreateMaxRetries` (default 1).

`ctx.source.write` is gone. The only path that writes envelope rows is `AccessRegistry.appendEnvelope`, which is core-internal — plugins cannot bypass ingest to write partial state.

## Public surface at a glance

From `@inseam/plugin-contract`:

- **`PluginIngest`** — `(input: IngestInput) => Promise<IngestResult>`. Reachable as `ctx.ingest`.
- **`IngestInput`** — `{ source: IngestSourceInput; envelope: IngestEnvelopeInput; parties: ReadonlyArray<IngestPartyInput> }`. Provenance fields are absent by design.
- **`IngestResult`** — `{ source: { id, wasCreated }; envelope: { id, wasCreated } }`. The two `wasCreated` booleans let callers distinguish a fresh write from idempotent collapse without a re-read.
- **`Capability`** — gains `"ingest"`. `"source_write"` and `"emit_envelopes"` remain as deprecated entries; a loader-side bridge auto-upgrades `[source_write, emit_envelopes]` → adds `ingest` so existing host configs keep working.
- **`IngestError` hierarchy** — `EnvelopePayloadTooLargeError`, `IngestProvenanceError`, `IngestInputInvalidError`. `CapabilityDeniedError`, `LocationKindNotRegisteredError`, `InvalidPathIdError`, `UnknownIdentifierKindError`, `InvalidRoleError`, `EmitGuardError`, `BatchError` surface unwrapped from the underlying layers.

Core-internal (not on the plugin contract): `SourceRegistry.findOrAppendCreate(builder, input)` and `AccessRegistry.appendEnvelope(builder, input)` — the builder-shaped helpers ingest composes into one batch.

Host knobs threaded through `createCore`'s `tuning`:

- `maxEnvelopePayloadBytes` (default `65_536` / 64 KiB) — `rawPayload` size cap, checked before any I/O.
- `findOrCreateMaxRetries` (default `1`) — TOCTOU retry budget.
- `parseIngestInput?: StandardSchemaV1<IngestInput>` — optional runtime input validator; runs before the capability check. Plugins may bring Zod / Valibot / ArkType / Effect Schema interchangeably. Omit → TypeScript is the only gate.

Full signatures: [arch/ingest/spec.md](../../../../arch/ingest/spec.md) · generated TSDoc at `/api/plugin-contract/`.

## How to call it

```ts
const { source, envelope } = await ctx.ingest({
  source: {
    locationKind: "gmail.message",
    pathId: msg.id,
    ownerHostId: ctx.self.hostId,
    retrieval: { contentType: "text/rfc822", displayName: msg.subject },
    metadata: { threadId: msg.threadId },
  },
  envelope: {
    transport: "gmail.poll",
    rawPayload: new TextEncoder().encode(JSON.stringify({ snippet: msg.snippet })),
    receivedAt: new Date(Number(msg.internalDate)),
  },
  parties: [
    { identifierKind: "email", identifierScope: "global", identifierValue: msg.from, role: "sender", trust: "provider-asserted" },
  ],
});

if (envelope.wasCreated) ctx.log.info("ingested", { sourceId: source.id, fresh: source.wasCreated });
```

Footguns to surface in plugin docs:

- **Always prefer upstream-observed time** for `receivedAt` (Gmail `internalDate`, webhook `Date` header, filesystem `mtime`). Fall back to `ctx.clock.now()` only when the upstream supplies none. `receivedAt` participates in envelope-id derivation — jittering it on every poll fragments idempotency.
- **Plugins must handle fetch failure at consumption time**, regardless of ingest succeeding. The corpus is an observation log; sources can 404 / 401 / expire between ingest and the next `Connection.fetch`. Ingest does not verify reachability — that is owned by a future `connection-health` concept.
- **Parties may be empty.** Webhooks and system events have no parties; the envelope still writes and `core.envelope_indexed` + `core.ingested` still fire.

## How to extend it

There is no plugin-facing extension surface for ingest itself — the seam is deliberately narrow. Hosts extend by injecting `tuning.parseIngestInput` (any `StandardSchemaV1<IngestInput>`); plugins extend transitively by contributing to the underlying registries:

- New `LocationKindDefinition` → see [source](./source.md).
- New `IdentifierKindDefinition` or `CeremonyAdapter` → see [access](./access.md).
- New listener handlers that consume `core.ingested` → see [events](./events.md).

The single new event type — `core.ingested`, payload `{ sourceId, envelopeId, sourceCreated }`, `schemaVersion: 1` — is the canonical "a source is now visible end-to-end" signal. Plugin-side `ctx.emit({ type: "core.ingested", ... })` is rejected with `EmitGuardError(reason: "namespace")`; only the core-internal ingest path may produce it.

## Where the code lives

- Contract additions: `pkgs/plugin-contract/src/index.ts` (`PluginIngest`, `IngestInput`/`IngestResult`, `IngestError` hierarchy, `"ingest"` capability literal).
- Runtime composition: `pkgs/core/src/plugin-system/registry.ts` (`ctx.ingest` wiring, capability bridge, namespace guard, TOCTOU retry).
- Builder helpers: `pkgs/core/src/source/registry.ts` (`findOrAppendCreate`), `pkgs/core/src/access/registry.ts` (`appendEnvelope`), `pkgs/core/src/access/ids.ts` (content-derived envelope id over the five-tuple).
- Tuning shape: `pkgs/core/src/plugin-system/types.ts` (`IngestTuning`).
- Behavior tests: `pkgs/plugin-contract/*ingest*.test.ts`, `pkgs/core/src/source/find-or-append-create.test.ts`, `pkgs/core/src/access/append-envelope.test.ts`, `pkgs/core/src/plugin-system/runtime-ingest*.test.ts`, `pkgs/core/src/plugin-system/capability-migration.test.ts`.

## Related

- [Source](./source.md) — owns the rows ingest find-or-creates; `findOrAppendCreate` is the builder-shaped sibling of `createSource`.
- [Access](./access.md) — owns Envelope, party rows, `core.envelope_indexed`; `appendEnvelope` is the only path that writes them.
- [Events](./events.md) — the outbox `core.ingested` rides; namespace guard rejects plugin-side emits of `core.ingested`.
- [Store](./store.md) — `BatchBuilder` + `store.batch` is the atomicity primitive every ingest call commits through.
- [Plugin system](./plugin-system.md) — the `RuntimeContext` ingest hangs off; the `ingest` capability; the loader-side migration bridge.
