general approach for insert script

This commit is contained in:
Rene Kaßeböhmer
2025-05-14 07:41:18 +02:00
parent 8af0ef8e78
commit d4f944dca4
8 changed files with 90 additions and 51 deletions

4
.gitignore vendored
View File

@ -32,3 +32,7 @@ SCContract__c.csv
AssociatedLocation_beforetransform.csv
AssociatedLocation.csv*
ServiceContract_beforetransform.csv
failed_records_*.csv
successful_records_*.csv
failed_records.csv
successful_records.csv

Binary file not shown.

View File

@ -42,6 +42,18 @@ print(merged_df_sc)
#Rename columns
merged_df_sc.columns = ['PKey__c','Status','BillingCountryCode','Term','EndDate','StartDate','AccountId','Service_Recipient__c','IoT_Registration_Status__c','Name','Pricebook2Id', 'TemplateId__c']
# Convert StartDate and EndDate to datetime
merged_df_sc['StartDate'] = pd.to_datetime(merged_df_sc['StartDate'])
merged_df_sc['EndDate'] = pd.to_datetime(merged_df_sc['EndDate'])
# Calculate Term in months
merged_df_sc['Term'] = ((merged_df_sc['EndDate'] - merged_df_sc['StartDate']) / pd.Timedelta(days=30.44)).round().astype(int)
# Convert dates back to string format (YYYY-MM-DD)
merged_df_sc['StartDate'] = merged_df_sc['StartDate'].dt.strftime('%Y-%m-%d')
merged_df_sc['EndDate'] = merged_df_sc['EndDate'].dt.strftime('%Y-%m-%d')
merged_df_sc = merged_df_sc.drop('Status', axis=1)
#safe csv
merged_df_sc.to_csv('../15_insert_servicecontract/ServiceContract.csv', index=False)

View File

@ -1,47 +0,0 @@
import os
import argparse
from sys import path
path.append('../..')
from sf_auth import get_sf_connection
def insert_service_contracts(context):
"""
Insert ServiceContract records using Bulk API 2.0
Args:
context (str): Salesforce org context (e.g., 'qa2', 'prod')
"""
try:
# Get Salesforce connection
sf = get_sf_connection(context)
csv_file = 'ServiceContract.csv'
print(f'Starting bulk insert of ServiceContract records from {csv_file}...')
# Use bulk API 2.0 to insert records directly from CSV
results = sf.bulk2.ServiceContract.insert(
csv_file,
batch_size=10000,
concurrency=5
)
# Count successes and failures
success_count = sum(1 for result in results if result['success'])
total_count = len(results)
print(f'\nInsertion complete:')
print(f'Total records: {total_count}')
print(f'Successful: {success_count}')
print(f'Failed: {total_count - success_count}')
except Exception as e:
print(f'Error: {str(e)}')
raise
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Insert ServiceContract records via Bulk API')
parser.add_argument('--context', type=str, required=True,
help='Salesforce org context (e.g., "qa2", "prod")')
args = parser.parse_args()
insert_service_contracts(args.context)

View File

@ -1 +1 @@
sf sfdmu run --sourceusername rene.kasseboehmer@vaillant.de.devrene --targetusername rene.kasseboehmer@vaillant.de.devrene
python run_insert.py --context qa2 --csv ServiceContract.csv

View File

@ -0,0 +1,14 @@
import argparse
from sys import path
path.append('../..')
from utils import bulk_insert_records
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Insert ServiceContract records via Bulk API')
parser.add_argument('--context', type=str, required=True,
help='Salesforce org context (e.g., "qa2", "prod")')
parser.add_argument('--csv', type=str, default='ServiceContract.csv',
help='CSV file to process (default: ServiceContract.csv)')
args = parser.parse_args()
bulk_insert_records(args.context, 'ServiceContract', args.csv)

View File

@ -47,8 +47,8 @@ def extract_data(object_id, query, output_path='output', context='qa2'):
t = threading.Thread(target=animate)
t.start()
results = sf.bulk2.__getattr__(object_id).query(
query, max_records=2000000
results = sf.bulk2.__getattr__(object_id).query_all(
query
)
print(f'Extracting: {object_id}')
for i, data in enumerate(results):

56
utils.py Normal file
View File

@ -0,0 +1,56 @@
import os
from sys import path
path.append('../..')
from sf_auth import get_sf_connection
def bulk_insert_records(context, object_name, csv_file):
"""
Generic bulk insert function for Salesforce records
Args:
context (str): Salesforce org context (e.g., 'qa2', 'prod')
object_name (str): Salesforce object API name
csv_file (str): Path to the CSV file containing records
"""
try:
sf = get_sf_connection(context)
print(f'Starting bulk insert of {object_name} records from {csv_file}...')
# Get the bulk API object dynamically
bulk_api = getattr(sf.bulk2, object_name)
results = bulk_api.insert(
csv_file,
batch_size=10000,
concurrency=5
)
success_count = results[0]['numberRecordsProcessed'] - results[0]['numberRecordsFailed']
total_count = results[0]['numberRecordsTotal']
failed_count = results[0]['numberRecordsFailed']
print(f'\nInsertion complete:')
print(f'Job Id: {results[0]["job_id"]}')
print(f'Total records: {total_count}')
print(f'Successful: {success_count}')
print(f'Failed: {failed_count}')
if failed_count > 0:
for result in results:
job_id = result['job_id']
failed_records_file = f'failed_records.csv'
bulk_api.get_failed_records(job_id, file=failed_records_file)
print(f'Failed records for job {job_id} saved to {failed_records_file}')
if success_count > 0:
for result in results:
job_id = result['job_id']
successful_records_file = f'successful_records.csv'
bulk_api.get_successful_records(job_id, file=successful_records_file)
print(f'Successful records for job {job_id} saved to {successful_records_file}')
return results
except Exception as e:
print(f'Error: {str(e)}')
raise