Phase 6: Data Flow Analysis
Traces every significant data flow through the copilot-sdk across all four language SDKs (TypeScript Python Go C#). All SDKs implement the same protocol; differences in mechanics are noted where they occur.
1. End-to-End Message Flow
High-Level Overview
flowchart LR
User["User Code"] -->|"prompt"| SDK["SDK Session"]
SDK -->|"JSON-RPC request"| CLI["CLI Process"]
CLI -->|"HTTP / API"| LLM["LLM Backend"]
LLM -->|"streams response"| CLI
CLI -->|"session.event notifications"| SDK
SDK -->|"_dispatchEvent"| User
Detailed Step-by-Step Flow
Step 1: User sends a prompt. All four SDKs invoke the session.send JSON-RPC request with identical wire payloads:
// nodejs/src/session.ts:120-128
async send(options: MessageOptions): Promise<string> {
const response = await this.connection.sendRequest("session.send", {
sessionId: this.sessionId,
prompt: options.prompt,
attachments: options.attachments,
mode: options.mode,
});
return (response as { messageId: string }).messageId;
}
# python/copilot/session.py:142-151
response = await self._client.request(
"session.send",
{
"sessionId": self.session_id,
"prompt": options["prompt"],
"attachments": options.get("attachments"),
"mode": options.get("mode"),
},
)
return response["messageId"]
// go/session.go:113-131
result, err := s.client.Request("session.send", req)
// dotnet/src/Session.cs:140-153
var response = await InvokeRpcAsync<SendMessageResponse>(
"session.send", [request], cancellationToken);
return response.MessageId;
Step 2: CLI processes the prompt — communicates with LLM backend, orchestrates tool usage, streams events back.
Step 3: SDK receives events as JSON-RPC notifications — session.event carries {sessionId, event}. Client looks up session, dispatches event.
Step 4: User code receives typed events. session.idle signals the assistant's turn is complete.
The sendAndWait Convenience Flow
flowchart TD
A["sendAndWait(prompt, timeout)"] --> B["Register idle/error listener"]
B --> C["send(prompt) → session.send RPC"]
C --> D["Events stream in: assistant.message..."]
D --> E{"session.idle received?"}
E -->|"Yes"| F["Resolve with lastAssistantMessage"]
E -->|"Timeout"| G["Reject with TimeoutError"]
send() to prevent a race where session.idle could fire before the listener is attached.
2. JSON-RPC Protocol Flow
Transport Layer
flowchart LR
A["User Code"] <-->|"calls"| B["SDK\nMessageConnection / JsonRpcClient"]
B <-->|"stdio (default)\nor TCP"| C["CLI Process"]
stdio (default): SDK spawns CLI as child process, creates JSON-RPC connection over stdin/stdout pipes.
TCP: SDK spawns CLI with --port or connects to external CLI server at cliUrl.
Connection Establishment (TypeScript)
// nodejs/src/client.ts:1280-1360
connectToServer():
if isChildProcess: connectToParentProcessViaStdio()
elif useStdio: connectToChildProcessViaStdio()
else: connectViaTcp()
// Each path ends with:
this.connection = createMessageConnection(reader, writer)
this.attachConnectionHandlers()
this.connection.listen()
Complete JSON-RPC Method Catalog
Server-Scoped Requests (SDK → CLI)
| Method | Purpose |
|---|---|
ping | Health check; returns protocolVersion |
status.get | Get CLI version and protocol version |
auth.getStatus | Get authentication status |
models.list | List available LLM models |
tools.list | List available built-in tools |
account.getQuota | Get usage quota information |
Session Lifecycle Requests (SDK → CLI)
| Method | Purpose |
|---|---|
session.create | Create a new conversation session |
session.resume | Resume an existing session |
session.destroy | Disconnect session (preserve disk data) |
session.delete | Permanently delete session and all data |
session.send | Send a user message |
session.abort | Abort current processing |
session.getMessages | Retrieve full conversation history |
session.getLastId | Get most recently updated session ID |
session.list | List all sessions with metadata |
session.getForeground | Get foreground session (TUI+server mode) |
session.setForeground | Switch TUI to display a session |
Session-Scoped RPC Requests (SDK → CLI)
| Method | Purpose |
|---|---|
session.model.getCurrent | Get current model for session |
session.model.switchTo | Change session model |
session.mode.get / .set | Get/set agent mode (interactive/plan/autopilot) |
session.plan.read / .update / .delete | Plan file operations |
session.workspace.listFiles / .readFile / .createFile | Workspace operations |
session.fleet.start | Start fleet mode |
session.agent.list / .getCurrent / .select / .deselect | Custom agent operations |
session.compaction.compact | Trigger context compaction |
session.tools.handlePendingToolCall | Respond to a pending tool call |
session.permissions.handlePendingPermissionRequest | Respond to a pending permission request |
session.log | Log message to session timeline |
CLI → SDK Notifications
| Method | Purpose |
|---|---|
session.event | Deliver a session event (all event types) |
session.lifecycle | Session lifecycle change (create/delete/etc.) |
CLI → SDK Requests (v2 Protocol — backward compat)
| Method | Purpose |
|---|---|
tool.call | Request tool execution (v2 protocol) |
permission.request | Request permission decision (v2 protocol) |
userInput.request | Request user input |
hooks.invoke | Invoke a hook handler |
Protocol Versioning
// nodejs/src/client.ts:837-865
verifyProtocolVersion():
pingResult = await this.ping()
serverVersion = pingResult.protocolVersion
if serverVersion < MIN || serverVersion > max:
throw "SDK protocol version mismatch"
this.negotiatedProtocolVersion = serverVersion
Protocol v2: Tool calls and permissions sent as JSON-RPC requests from CLI to SDK (synchronous).
Protocol v3: Tool calls and permissions broadcast as session events. SDK handles internally and responds asynchronously via session.tools.handlePendingToolCall.
3. Session Lifecycle Data Flow
Complete Session Creation Flow
flowchart TD
A["createSession(config)"] --> B["Validate config"]
B --> C["Auto-start client if not connected"]
C --> D["Generate sessionId (UUID v4)"]
D --> E["Create CopilotSession object\nRegister tools, permissions, hooks\nsessions.set(sessionId, session)"]
E --> F["sendRequest 'session.create' → CLI"]
F --> G["CLI initializes context, model, tools, MCP"]
G --> H["Store workspacePath on session"]
H --> I["Return session to caller"]
style E fill:#22C55E,color:#0F172A,stroke:#22C55E
session.start events before the SDK has a handler registered. All four SDKs implement this identically.
Session Resume Flow
Nearly identical to creation except: uses session.resume RPC, session ID is provided by user, additional disableResume option available.
Session Disconnect Flow
flowchart TD
A["session.disconnect()"] --> B["sendRequest session.destroy, sessionId\nCLI releases in-memory resources\ndisk preserved"]
B --> C["Clear eventHandlers,\ntoolHandlers, permissionHandler"]
Session Delete Flow (Permanent)
flowchart TD
A["client.deleteSession(sessionId)"] --> B["sendRequest session.delete, sessionId\nCLI deletes all disk data"]
B --> C["Remove session from\nclient.sessions map"]
Client Shutdown Flow
flowchart TD
A["client.stop()"] --> B["For each session: session.disconnect()\nTS: 3 retries, exponential backoff"]
B --> C["Clear sessions map\nDispose JSON-RPC connection\nClear models cache"]
C --> D["Close TCP socket if used\nKill CLI process if spawned\nstate = disconnected"]
4. Streaming Events Flow
Event Delivery Architecture
flowchart LR
A["CLI\nLLM generates tokens"] -->|"notification:\nsession.event"| B["SDK Client\nhandleSessionEventNotification\nsessions.get sessionId"]
B --> C["Session\n_dispatchEvent\n_handleBroadcastEvent"]
C --> D["User Code\nhandler event"]
Event Type Categories
Session Lifecycle Events
| Event Type | Description |
|---|---|
session.start | Session initialized (sessionId, version, producer, startTime, context) |
session.resume | Session resumed from disk |
session.idle | Session finished processing (critical for sendAndWait) |
session.error | Session-level error occurred |
session.shutdown | Session shutting down |
session.title_changed | Session title updated |
session.model_change | Model switched |
session.mode_changed | Agent mode changed |
session.truncation | Context truncation occurred |
session.compaction_start / .complete | Context compaction lifecycle |
session.usage_info | Token usage information |
Assistant Events (Streaming)
| Event Type | Description |
|---|---|
assistant.turn_start | Assistant begins processing |
assistant.intent | Detected intent/plan |
assistant.reasoning / .reasoning_delta | Full/incremental reasoning |
assistant.streaming_delta | Incremental response token |
assistant.message / .message_delta | Complete/incremental message |
assistant.turn_end | Assistant finished processing |
assistant.usage | Token usage for this turn |
Tool Events
| Event Type | Description |
|---|---|
tool.execution_start | Tool execution started |
tool.execution_partial_result | Partial tool result |
tool.execution_progress | Tool progress update |
tool.execution_complete | Tool finished (text, terminal, image, audio, resource) |
Broadcast Request Events (Protocol v3)
| Event Type | Description |
|---|---|
external_tool.requested | Tool call broadcast (requestId, toolName, arguments, toolCallId) |
permission.requested | Permission request broadcast (requestId, permissionRequest) |
Event Chain Structure
Events form a linked list via parentId:
flowchart LR
A["Event 1: session.start\nparentId: null"] --> B["Event 2: user.message\nparentId: aaa"]
B --> C["Event 3: assistant.turn_start"]
C --> D["...\nEvent N: session.idle"]
Events can be marked ephemeral: true for transient data that should not be persisted.
5. Tool Execution Flow
Protocol v3 (Broadcast Model) — Current
sequenceDiagram
participant CLI as CLI Server
participant SDK as SDK Client
participant Handler as Tool Handler
CLI->>SDK: notification: external_tool.requested
SDK->>Handler: handler(args, invocation)
Handler-->>SDK: result
SDK->>CLI: rpc: tools.handlePendingToolCall
Note over CLI: Continues with tool result
Implementation per SDK
// nodejs/src/session.ts:355-389
private async _executeToolAndRespond(
requestId, toolName, toolCallId, args, handler
): Promise<void> {
try {
const rawResult = await handler(args, {
sessionId: this.sessionId, toolCallId, toolName, arguments: args,
});
let result: string;
if (rawResult == null) result = "";
else if (typeof rawResult === "string") result = rawResult;
else result = JSON.stringify(rawResult);
await this.rpc.tools.handlePendingToolCall({ requestId, result });
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
await this.rpc.tools.handlePendingToolCall({ requestId, error: message });
}
}
# python/copilot/session.py:292-296
asyncio.ensure_future(
self._execute_tool_and_respond(
request_id, tool_name, tool_call_id, arguments, handler
)
)
// go/session.go:499-536 -- Uses goroutines
go s.executeToolAndRespond(
*requestID, *toolName, toolCallID,
event.Data.Arguments, handler)
// dotnet/src/Session.cs:358-389 -- async void
private async void HandleBroadcastEventAsync(SessionEvent sessionEvent)
{
await ExecuteToolAndRespondAsync(
data.RequestId, data.ToolName, ...);
}
Protocol v2 (Synchronous RPC) — Legacy
sequenceDiagram
participant CLI as CLI
participant SDK as SDK
participant Handler as Handler
CLI->>SDK: request: tool.call
Note over SDK: handleToolCallRequestV2<br/>session.getToolHandler(toolName)
SDK->>Handler: handler(invocation)
Handler-->>SDK: result
SDK-->>CLI: response: textResultForLLM, resultType
Tool Registration Data Flow
flowchart TD
A["User provides tools in SessionConfig:\ntools: name, description, parameters, handler"] --> B["Wire Payload"]
A --> C["Local Registration"]
B --> D["CLI receives:\ntools: name, desc, params\nno handlers"]
C --> E["Session stores:\ntoolHandlers.set name, handler\nlocal only"]
6. Hook Data Flow
Hook Invocation Flow
sequenceDiagram
participant CLI as CLI
participant SDK as SDK
participant Hook as User Hook
CLI->>SDK: request: hooks.invoke(hookType, input)
Note over SDK: _handleHooksInvoke<br/>handler = handlerMap[hookType]
SDK->>Hook: handler(input, sessionId)
Hook-->>SDK: result
SDK-->>CLI: response: output
Note over CLI: Applies hook output
Hook Types and Their Data
| Hook Type | When Invoked | Input | Output Effect |
|---|---|---|---|
preToolUse | Before a tool executes | Tool name, args | Can modify/cancel tool execution |
postToolUse | After a tool executes | Tool name, result | Can modify tool result |
userPromptSubmitted | When user sends a message | User prompt | Can modify/reject the prompt |
sessionStart | Session starts | Session context | Can configure session |
sessionEnd | Session ends | Session summary | Cleanup |
errorOccurred | Error in session | Error details | Error handling/logging |
Hook Registration
// Hooks are registered at session creation time:
config.hooks = {
onPreToolUse: async (input, { sessionId }) => { ... },
onPostToolUse: async (input, { sessionId }) => { ... },
}
// CLI is told hooks are enabled via boolean flag:
payload["hooks"] = true // (not the actual handler functions)
hooks: true flag tells CLI to invoke hooks.invoke RPC at trigger points. Actual handler logic stays in the SDK process.7. Permission Request Flow
Protocol v3 Permission Flow (Broadcast)
sequenceDiagram
participant CLI as CLI
participant SDK as SDK
participant Handler as User Handler
CLI->>SDK: notification: session.event<br/>permission.requested
Note over SDK: _handleBroadcastEvent
SDK->>Handler: permissionHandler(request, sessionId)
Handler-->>SDK: kind: approved
SDK->>CLI: rpc: permissions.handlePendingPermissionRequest<br/>requestId, result
Note over CLI: Proceeds or aborts tool
Protocol v2 Permission Flow (Synchronous)
sequenceDiagram
participant CLI as CLI
participant SDK as SDK
CLI->>SDK: request: permission.request
Note over SDK: handlePermissionRequestV2<br/>permissionHandler(request, sessionId)
SDK-->>CLI: response: result kind
Permission Handler Contract
Permission Result Types
| Kind | Description |
|---|---|
approved | Tool execution permitted |
denied-by-rules | Denied by policy rules |
denied-interactively-by-user | User explicitly denied (optional feedback) |
denied-no-approval-rule-and-could-not-request-from-user | No rule matched and user unreachable |
denied-by-content-exclusion-policy | Content exclusion (path, message) |
Error Fallback
If the permission handler throws, all SDKs catch the error and return a denial:
// nodejs/src/session.ts:404-418
catch (_error) {
await this.rpc.permissions.handlePendingPermissionRequest({
requestId,
result: { kind: "denied-no-approval-rule-and-could-not-request-from-user" },
});
}
8. Error Flow
Error Propagation Layers
flowchart TD
L4["Layer 4: Session Event Errors\nsession.error event: message, stack\nDelivered to event handlers, causes sendAndWait to reject"]
L3["Layer 3: SDK Application Errors\nClient not connected | onPermissionRequest handler required\nprotocol version mismatch | CLI process exited with code N"]
L2["Layer 2: JSON-RPC Protocol Errors\nResponseError with code/message\nTS: ResponseError, Py: JsonRpcError, Go: jsonrpc2.Error, C#: RemoteInvocationException"]
L1["Layer 1: JSON-RPC Transport Errors\nConnection refused / broken pipe / timeout\nProcess exit before response"]
L4 --- L3 --- L2 --- L1
Client.start() Error Flow
flowchart TD
A["client.start()\nstate = connecting"] --> B["startCLIServer()\nmay fail: CLI not found"]
B --> C["connectToServer()\nmay fail: TCP connect error"]
C --> D["verifyProtocolVersion()\nmay fail: version mismatch"]
D --> E["state = connected\nor error on catch"]
Protocol Version Verification Race
TypeScript races ping against process exit to detect early CLI crashes:
// nodejs/src/client.ts:841-846
if (this.processExitPromise) {
pingResult = await Promise.race([this.ping(), this.processExitPromise]);
} else {
pingResult = await this.ping();
}
Client.stop() Error Aggregation
| SDK | Error Aggregation |
|---|---|
| TypeScript | Returns Error[] array |
| Python | Raises ExceptionGroup[StopError] |
| Go | Returns errors.Join(errs...) |
| C# | Throws AggregateException if multiple, single if one |
Connection Error Recovery
// nodejs/src/client.ts:1414-1418
this.connection.onClose(() => {
if (this.state === "connected" && this.options.autoRestart) {
void this.reconnect();
}
});
9. Configuration Data Flow
Configuration Propagation Chain
flowchart TD
A["User-Provided Options\nCopilotClientOptions\ncliPath, cliUrl, useStdio, port, cwd,\nenv, githubToken, autoStart, logLevel, ..."] -->|"Validated and merged\nwith defaults"| B["client.options internal\nUsed during start to spawn CLI / connect"]
B --> C["SessionConfig\nmodel, tools, systemMessage,\nprovider, hooks, mcpServers,\ncustomAgents, ..."]
C -->|"Wire Payload\nsent to CLI"| D["Wire Payload: session.create RPC\nmodel, sessionId, clientName, reasoningEffort\ntools: name, desc, params - no handlers\nsystemMessage, provider, mcpServers\nrequestPermission: true, hooks: true/false\nworkingDirectory, streaming, customAgents"]
C -->|"Local Registration\nkept in SDK"| E["Local Registration\nsession.registerTools\nsession.registerPermissionHandler\nsession.registerUserInputHandler\nsession.registerHooks\nsession.on onEvent"]
Key Separation Principle
Data kept local in SDK: Handler functions (tool, permission, event, hooks, user input). These are callable objects that cannot be serialized over JSON-RPC.
Default Values
| Option | Default |
|---|---|
useStdio | true (unless cliUrl provided) |
port | 0 (OS-assigned when TCP) |
cwd | process.cwd() / os.getcwd() |
autoStart | true |
autoRestart | true |
logLevel | "debug" |
useLoggedInUser | true (unless githubToken provided) |
envValueMode | "direct" (always) |
requestPermission | true (always) |
10. State Management
State Distribution Map
flowchart LR
A["SDK Client Process\n\nCopilotClient: state, connection,\ncliProcess, socket, actualPort,\nmodelsCache, negotiatedProtocolVersion,\nsessions Map, lifecycleHandlers, options\n\nCopilotSession per session: sessionId,\nworkspacePath, connection ref,\neventHandlers, typedEventHandlers,\ntoolHandlers, permissionHandler,\nuserInputHandler, hooks, _rpc"]
B["CLI Process\n\nConversation history persisted\nPlanning state\nTool registry built-in + SDK schemas\nModel state\nWorking directory / Git context\nMCP server connections\nAuthentication state\nSession workspace files"]
A <-->|"JSON-RPC"| B
State Ownership Summary
| State | Owned By | Persisted? | Shared? |
|---|---|---|---|
| Connection state | Client | No (in-memory) | No |
| Session map | Client | No (in-memory) | No |
| Models cache | Client | No (in-memory) | No |
| Protocol version | Client | No (in-memory) | No |
| Event/tool/permission handlers | Session | No (in-memory) | No |
| Conversation history | CLI | Yes (disk) | Via RPC |
| Tool schemas | CLI | Yes (disk) | Via RPC |
| Session workspace | CLI | Yes (disk) | Via RPC |
| Authentication | CLI | Yes (config) | Via RPC |
Concurrency and Thread Safety
| SDK | Session Map | Handler Collections | Models Cache |
|---|---|---|---|
| TypeScript | Single-threaded (event loop) | Set (single-threaded) | Promise-based lock |
| Python | threading.Lock | threading.Lock | asyncio.Lock |
| Go | sync.Mutex | sync.RWMutex | sync.Mutex |
| C# | ConcurrentDictionary | event (CAS loop) | SemaphoreSlim |
Session-Client Relationship
flowchart LR
A["CopilotClient 1\nOwns connection, CLI process\nRoutes notifications by sessionId"] -->|"manages N"| B["CopilotSession N\nReferences same connection\nDispatches events to handlers\nExecutes tools locally"]
Sessions hold a reference to the client's connection (not a copy), so all sessions communicate through the same JSON-RPC channel.
Lifecycle of Cached Models
ListModels():
acquire lock
if cache != null: return copy of cache // defensive copy
models = RPC("models.list")
cache = copy of models // defensive copy
release lock
return copy of models // defensive copy
Client.stop():
cache = null // invalidated
The triple-copy pattern prevents external mutation of the cached data.
Appendix: Cross-SDK Implementation Consistency
All four SDKs implement the identical JSON-RPC protocol and exhibit the same observable behavior. Major implementation differences:
| Aspect | TypeScript | Python | Go | C# |
|---|---|---|---|---|
| JSON-RPC library | vscode-jsonrpc | Custom JsonRpcClient | Custom jsonrpc2 | StreamJsonRpc |
| Event dispatch | Sync iteration over Set | Sync iteration over copied set | Sync iteration over copied slice | Multicast delegate |
| Tool exec (v3) | void async function | asyncio.ensure_future() | go goroutine | async void |
| Session map | Map<string, Session> | dict[str, Session] | map[string]*Session | ConcurrentDictionary |
| Typed events | Discriminated unions | @dataclass + enum | Struct + type string | Record subclasses |
| Error cleanup | try/catch + delete | try/except + pop | if err + delete | try/catch + TryRemove |
| Generated RPC | createServerRpc / createSessionRpc | ServerRpc / SessionRpc | rpc.NewServerRpc / NewSessionRpc | ServerRpc / SessionRpc |