From 50817dc4a3342a5f45bbd6783f974b3b3c557ea0 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Thu, 7 May 2026 15:40:42 -0700 Subject: [PATCH] fix(table): preserve workflow groups on CSV column-add and dispatch after tx commit Two bugs in the CSV-import path: - addTableColumnsWithTx rebuilt the schema with only `columns`, dropping `workflowGroups` (and any other top-level schema fields). Importing CSV into a table that has workflow groups erased the group config. Spread `table.schema` first so siblings survive. - batchInsertRowsWithTx fired fireTableTrigger and scheduleRunsForRows from inside the caller's transaction. Both read through the global db connection, so they could run before the inserts committed and see no rows. Extracted the dispatch into dispatchAfterBatchInsert; non-tx wrapper fires it after `db.transaction(...)` resolves, and the CSV import route does the same after its tx. --- .../api/table/[tableId]/import/route.test.ts | 3 ++ .../app/api/table/[tableId]/import/route.ts | 15 +++++++--- apps/sim/lib/table/service.ts | 29 +++++++++++++++---- 3 files changed, 38 insertions(+), 9 deletions(-) diff --git a/apps/sim/app/api/table/[tableId]/import/route.test.ts b/apps/sim/app/api/table/[tableId]/import/route.test.ts index b821961cb6d..3e3e7be2369 100644 --- a/apps/sim/app/api/table/[tableId]/import/route.test.ts +++ b/apps/sim/app/api/table/[tableId]/import/route.test.ts @@ -11,11 +11,13 @@ const { mockBatchInsertRowsWithTx, mockReplaceTableRowsWithTx, mockAddTableColumnsWithTx, + mockDispatchAfterBatchInsert, } = vi.hoisted(() => ({ mockCheckAccess: vi.fn(), mockBatchInsertRowsWithTx: vi.fn(), mockReplaceTableRowsWithTx: vi.fn(), mockAddTableColumnsWithTx: vi.fn(), + mockDispatchAfterBatchInsert: vi.fn(), })) vi.mock('@sim/utils/id', () => ({ @@ -44,6 +46,7 @@ vi.mock('@/lib/table/service', () => ({ batchInsertRowsWithTx: mockBatchInsertRowsWithTx, replaceTableRowsWithTx: mockReplaceTableRowsWithTx, addTableColumnsWithTx: mockAddTableColumnsWithTx, + dispatchAfterBatchInsert: mockDispatchAfterBatchInsert, })) import { POST } from '@/app/api/table/[tableId]/import/route' diff --git a/apps/sim/app/api/table/[tableId]/import/route.ts b/apps/sim/app/api/table/[tableId]/import/route.ts index c51cde1b2ab..1a4e5df1e4e 100644 --- a/apps/sim/app/api/table/[tableId]/import/route.ts +++ b/apps/sim/app/api/table/[tableId]/import/route.ts @@ -23,11 +23,13 @@ import { type CsvHeaderMapping, CsvImportValidationError, coerceRowsForTable, + dispatchAfterBatchInsert, inferColumnType, parseCsvBuffer, replaceTableRowsWithTx, sanitizeName, type TableDefinition, + type TableRow, type TableSchema, validateMapping, } from '@/lib/table' @@ -213,13 +215,13 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro } try { - const inserted = await db.transaction(async (trx) => { + const txResult = await db.transaction(async (trx) => { let working = table if (additions.length > 0) { working = await addTableColumnsWithTx(trx, table, additions, requestId) } - let total = 0 + const allInserted: TableRow[] = [] for (let i = 0; i < coerced.length; i += CSV_MAX_BATCH_SIZE) { const batch = coerced.slice(i, i + CSV_MAX_BATCH_SIZE) const batchRequestId = generateId().slice(0, 8) @@ -234,10 +236,15 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro working, batchRequestId ) - total += result.length + allInserted.push(...result) } - return total + return { inserted: allInserted, working } }) + const { inserted: insertedRows, working: finalTable } = txResult + const inserted = insertedRows.length + // Fire trigger + scheduler AFTER the tx commits — both read through the + // global db connection and would otherwise see no rows. + dispatchAfterBatchInsert(finalTable, insertedRows, requestId) logger.info(`[${requestId}] Append CSV imported`, { tableId: table.id, diff --git a/apps/sim/lib/table/service.ts b/apps/sim/lib/table/service.ts index cfdb544f7c5..3edb53cdda6 100644 --- a/apps/sim/lib/table/service.ts +++ b/apps/sim/lib/table/service.ts @@ -571,7 +571,12 @@ export async function addTableColumnsWithTx( ) } - const updatedSchema: TableSchema = { columns: [...table.schema.columns, ...additions] } + // Spread `table.schema` first so workflow groups (and any future top-level + // schema fields) survive a CSV import that only adds plain columns. + const updatedSchema: TableSchema = { + ...table.schema, + columns: [...table.schema.columns, ...additions], + } const now = new Date() await trx @@ -945,7 +950,9 @@ export async function batchInsertRows( table: TableDefinition, requestId: string ): Promise { - return db.transaction((trx) => batchInsertRowsWithTx(trx, data, table, requestId)) + const result = await db.transaction((trx) => batchInsertRowsWithTx(trx, data, table, requestId)) + dispatchAfterBatchInsert(table, result, requestId) + return result } /** @@ -1043,12 +1050,24 @@ export async function batchInsertRowsWithTx( updatedAt: r.updatedAt, })) - void fireTableTrigger(data.tableId, table.name, 'insert', result, null, table.schema, requestId) - void scheduleRunsForRows(table, result) - return result } +/** + * Side-effect dispatch for an insert batch. Caller fires this AFTER the + * surrounding transaction commits — `fireTableTrigger` and + * `scheduleRunsForRows` both read through the global db connection, so firing + * inside the tx can see no rows and no-op. + */ +export function dispatchAfterBatchInsert( + table: TableDefinition, + result: TableRow[], + requestId: string +): void { + void fireTableTrigger(table.id, table.name, 'insert', result, null, table.schema, requestId) + void scheduleRunsForRows(table, result) +} + /** * Replaces all rows in a table with a new set of rows. Deletes existing rows * and inserts the provided rows inside a single transaction so the table is