Files
migration_via_sfdmu/utils.py
2025-05-14 07:41:18 +02:00

56 lines
2.0 KiB
Python

import os
from sys import path
path.append('../..')
from sf_auth import get_sf_connection
def bulk_insert_records(context, object_name, csv_file):
"""
Generic bulk insert function for Salesforce records
Args:
context (str): Salesforce org context (e.g., 'qa2', 'prod')
object_name (str): Salesforce object API name
csv_file (str): Path to the CSV file containing records
"""
try:
sf = get_sf_connection(context)
print(f'Starting bulk insert of {object_name} records from {csv_file}...')
# Get the bulk API object dynamically
bulk_api = getattr(sf.bulk2, object_name)
results = bulk_api.insert(
csv_file,
batch_size=10000,
concurrency=5
)
success_count = results[0]['numberRecordsProcessed'] - results[0]['numberRecordsFailed']
total_count = results[0]['numberRecordsTotal']
failed_count = results[0]['numberRecordsFailed']
print(f'\nInsertion complete:')
print(f'Job Id: {results[0]["job_id"]}')
print(f'Total records: {total_count}')
print(f'Successful: {success_count}')
print(f'Failed: {failed_count}')
if failed_count > 0:
for result in results:
job_id = result['job_id']
failed_records_file = f'failed_records.csv'
bulk_api.get_failed_records(job_id, file=failed_records_file)
print(f'Failed records for job {job_id} saved to {failed_records_file}')
if success_count > 0:
for result in results:
job_id = result['job_id']
successful_records_file = f'successful_records.csv'
bulk_api.get_successful_records(job_id, file=successful_records_file)
print(f'Successful records for job {job_id} saved to {successful_records_file}')
return results
except Exception as e:
print(f'Error: {str(e)}')
raise