diff --git a/apps/sim/app/api/table/[tableId]/import/route.test.ts b/apps/sim/app/api/table/[tableId]/import/route.test.ts index b821961cb6d..3e3e7be2369 100644 --- a/apps/sim/app/api/table/[tableId]/import/route.test.ts +++ b/apps/sim/app/api/table/[tableId]/import/route.test.ts @@ -11,11 +11,13 @@ const { mockBatchInsertRowsWithTx, mockReplaceTableRowsWithTx, mockAddTableColumnsWithTx, + mockDispatchAfterBatchInsert, } = vi.hoisted(() => ({ mockCheckAccess: vi.fn(), mockBatchInsertRowsWithTx: vi.fn(), mockReplaceTableRowsWithTx: vi.fn(), mockAddTableColumnsWithTx: vi.fn(), + mockDispatchAfterBatchInsert: vi.fn(), })) vi.mock('@sim/utils/id', () => ({ @@ -44,6 +46,7 @@ vi.mock('@/lib/table/service', () => ({ batchInsertRowsWithTx: mockBatchInsertRowsWithTx, replaceTableRowsWithTx: mockReplaceTableRowsWithTx, addTableColumnsWithTx: mockAddTableColumnsWithTx, + dispatchAfterBatchInsert: mockDispatchAfterBatchInsert, })) import { POST } from '@/app/api/table/[tableId]/import/route' diff --git a/apps/sim/app/api/table/[tableId]/import/route.ts b/apps/sim/app/api/table/[tableId]/import/route.ts index c51cde1b2ab..1a4e5df1e4e 100644 --- a/apps/sim/app/api/table/[tableId]/import/route.ts +++ b/apps/sim/app/api/table/[tableId]/import/route.ts @@ -23,11 +23,13 @@ import { type CsvHeaderMapping, CsvImportValidationError, coerceRowsForTable, + dispatchAfterBatchInsert, inferColumnType, parseCsvBuffer, replaceTableRowsWithTx, sanitizeName, type TableDefinition, + type TableRow, type TableSchema, validateMapping, } from '@/lib/table' @@ -213,13 +215,13 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro } try { - const inserted = await db.transaction(async (trx) => { + const txResult = await db.transaction(async (trx) => { let working = table if (additions.length > 0) { working = await addTableColumnsWithTx(trx, table, additions, requestId) } - let total = 0 + const allInserted: TableRow[] = [] for (let i = 0; i < coerced.length; i += CSV_MAX_BATCH_SIZE) { const batch = coerced.slice(i, i + CSV_MAX_BATCH_SIZE) const batchRequestId = generateId().slice(0, 8) @@ -234,10 +236,15 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro working, batchRequestId ) - total += result.length + allInserted.push(...result) } - return total + return { inserted: allInserted, working } }) + const { inserted: insertedRows, working: finalTable } = txResult + const inserted = insertedRows.length + // Fire trigger + scheduler AFTER the tx commits — both read through the + // global db connection and would otherwise see no rows. + dispatchAfterBatchInsert(finalTable, insertedRows, requestId) logger.info(`[${requestId}] Append CSV imported`, { tableId: table.id, diff --git a/apps/sim/lib/table/service.ts b/apps/sim/lib/table/service.ts index cfdb544f7c5..3edb53cdda6 100644 --- a/apps/sim/lib/table/service.ts +++ b/apps/sim/lib/table/service.ts @@ -571,7 +571,12 @@ export async function addTableColumnsWithTx( ) } - const updatedSchema: TableSchema = { columns: [...table.schema.columns, ...additions] } + // Spread `table.schema` first so workflow groups (and any future top-level + // schema fields) survive a CSV import that only adds plain columns. + const updatedSchema: TableSchema = { + ...table.schema, + columns: [...table.schema.columns, ...additions], + } const now = new Date() await trx @@ -945,7 +950,9 @@ export async function batchInsertRows( table: TableDefinition, requestId: string ): Promise { - return db.transaction((trx) => batchInsertRowsWithTx(trx, data, table, requestId)) + const result = await db.transaction((trx) => batchInsertRowsWithTx(trx, data, table, requestId)) + dispatchAfterBatchInsert(table, result, requestId) + return result } /** @@ -1043,12 +1050,24 @@ export async function batchInsertRowsWithTx( updatedAt: r.updatedAt, })) - void fireTableTrigger(data.tableId, table.name, 'insert', result, null, table.schema, requestId) - void scheduleRunsForRows(table, result) - return result } +/** + * Side-effect dispatch for an insert batch. Caller fires this AFTER the + * surrounding transaction commits — `fireTableTrigger` and + * `scheduleRunsForRows` both read through the global db connection, so firing + * inside the tx can see no rows and no-op. + */ +export function dispatchAfterBatchInsert( + table: TableDefinition, + result: TableRow[], + requestId: string +): void { + void fireTableTrigger(table.id, table.name, 'insert', result, null, table.schema, requestId) + void scheduleRunsForRows(table, result) +} + /** * Replaces all rows in a table with a new set of rows. Deletes existing rows * and inserts the provided rows inside a single transaction so the table is