diff --git a/.gitignore b/.gitignore index 2f4f2b6..a65adcc 100644 --- a/.gitignore +++ b/.gitignore @@ -31,4 +31,8 @@ ServiceContract.csv* SCContract__c.csv AssociatedLocation_beforetransform.csv AssociatedLocation.csv* -ServiceContract_beforetransform.csv \ No newline at end of file +ServiceContract_beforetransform.csv +failed_records_*.csv +successful_records_*.csv +failed_records.csv +successful_records.csv \ No newline at end of file diff --git a/__pycache__/utils.cpython-313.pyc b/__pycache__/utils.cpython-313.pyc new file mode 100644 index 0000000..07ce345 Binary files /dev/null and b/__pycache__/utils.cpython-313.pyc differ diff --git a/prepared_steps/14_fill_pricebook2id_in_servicecontract/FillServiceContractFields.py b/prepared_steps/14_fill_pricebook2id_in_servicecontract/FillServiceContractFields.py index 2f69ee4..5769170 100644 --- a/prepared_steps/14_fill_pricebook2id_in_servicecontract/FillServiceContractFields.py +++ b/prepared_steps/14_fill_pricebook2id_in_servicecontract/FillServiceContractFields.py @@ -42,6 +42,18 @@ print(merged_df_sc) #Rename columns merged_df_sc.columns = ['PKey__c','Status','BillingCountryCode','Term','EndDate','StartDate','AccountId','Service_Recipient__c','IoT_Registration_Status__c','Name','Pricebook2Id', 'TemplateId__c'] +# Convert StartDate and EndDate to datetime +merged_df_sc['StartDate'] = pd.to_datetime(merged_df_sc['StartDate']) +merged_df_sc['EndDate'] = pd.to_datetime(merged_df_sc['EndDate']) + +# Calculate Term in months +merged_df_sc['Term'] = ((merged_df_sc['EndDate'] - merged_df_sc['StartDate']) / pd.Timedelta(days=30.44)).round().astype(int) + +# Convert dates back to string format (YYYY-MM-DD) +merged_df_sc['StartDate'] = merged_df_sc['StartDate'].dt.strftime('%Y-%m-%d') +merged_df_sc['EndDate'] = merged_df_sc['EndDate'].dt.strftime('%Y-%m-%d') + +merged_df_sc = merged_df_sc.drop('Status', axis=1) #safe csv merged_df_sc.to_csv('../15_insert_servicecontract/ServiceContract.csv', index=False) diff --git a/prepared_steps/15_insert_servicecontract/InsertServiceContracts.py b/prepared_steps/15_insert_servicecontract/InsertServiceContracts.py deleted file mode 100644 index c95980f..0000000 --- a/prepared_steps/15_insert_servicecontract/InsertServiceContracts.py +++ /dev/null @@ -1,47 +0,0 @@ -import os -import argparse -from sys import path -path.append('../..') -from sf_auth import get_sf_connection - -def insert_service_contracts(context): - """ - Insert ServiceContract records using Bulk API 2.0 - - Args: - context (str): Salesforce org context (e.g., 'qa2', 'prod') - """ - try: - # Get Salesforce connection - sf = get_sf_connection(context) - - csv_file = 'ServiceContract.csv' - print(f'Starting bulk insert of ServiceContract records from {csv_file}...') - - # Use bulk API 2.0 to insert records directly from CSV - results = sf.bulk2.ServiceContract.insert( - csv_file, - batch_size=10000, - concurrency=5 - ) - - # Count successes and failures - success_count = sum(1 for result in results if result['success']) - total_count = len(results) - - print(f'\nInsertion complete:') - print(f'Total records: {total_count}') - print(f'Successful: {success_count}') - print(f'Failed: {total_count - success_count}') - - except Exception as e: - print(f'Error: {str(e)}') - raise - -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Insert ServiceContract records via Bulk API') - parser.add_argument('--context', type=str, required=True, - help='Salesforce org context (e.g., "qa2", "prod")') - - args = parser.parse_args() - insert_service_contracts(args.context) \ No newline at end of file diff --git a/prepared_steps/15_insert_servicecontract/command.txt b/prepared_steps/15_insert_servicecontract/command.txt index 0389686..1727c23 100644 --- a/prepared_steps/15_insert_servicecontract/command.txt +++ b/prepared_steps/15_insert_servicecontract/command.txt @@ -1 +1 @@ -sf sfdmu run --sourceusername rene.kasseboehmer@vaillant.de.devrene --targetusername rene.kasseboehmer@vaillant.de.devrene \ No newline at end of file +python run_insert.py --context qa2 --csv ServiceContract.csv \ No newline at end of file diff --git a/prepared_steps/15_insert_servicecontract/run_insert.py b/prepared_steps/15_insert_servicecontract/run_insert.py new file mode 100644 index 0000000..ce7392e --- /dev/null +++ b/prepared_steps/15_insert_servicecontract/run_insert.py @@ -0,0 +1,14 @@ +import argparse +from sys import path +path.append('../..') +from utils import bulk_insert_records + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Insert ServiceContract records via Bulk API') + parser.add_argument('--context', type=str, required=True, + help='Salesforce org context (e.g., "qa2", "prod")') + parser.add_argument('--csv', type=str, default='ServiceContract.csv', + help='CSV file to process (default: ServiceContract.csv)') + + args = parser.parse_args() + bulk_insert_records(args.context, 'ServiceContract', args.csv) \ No newline at end of file diff --git a/prepared_steps/1_extract_data/extract_via_simple_salesforce.py b/prepared_steps/1_extract_data/extract_via_simple_salesforce.py index 09fe953..fba97e2 100644 --- a/prepared_steps/1_extract_data/extract_via_simple_salesforce.py +++ b/prepared_steps/1_extract_data/extract_via_simple_salesforce.py @@ -47,8 +47,8 @@ def extract_data(object_id, query, output_path='output', context='qa2'): t = threading.Thread(target=animate) t.start() - results = sf.bulk2.__getattr__(object_id).query( - query, max_records=2000000 + results = sf.bulk2.__getattr__(object_id).query_all( + query ) print(f'Extracting: {object_id}') for i, data in enumerate(results): diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..7146bf5 --- /dev/null +++ b/utils.py @@ -0,0 +1,56 @@ +import os +from sys import path +path.append('../..') +from sf_auth import get_sf_connection + +def bulk_insert_records(context, object_name, csv_file): + """ + Generic bulk insert function for Salesforce records + + Args: + context (str): Salesforce org context (e.g., 'qa2', 'prod') + object_name (str): Salesforce object API name + csv_file (str): Path to the CSV file containing records + """ + try: + sf = get_sf_connection(context) + print(f'Starting bulk insert of {object_name} records from {csv_file}...') + + # Get the bulk API object dynamically + bulk_api = getattr(sf.bulk2, object_name) + + results = bulk_api.insert( + csv_file, + batch_size=10000, + concurrency=5 + ) + + success_count = results[0]['numberRecordsProcessed'] - results[0]['numberRecordsFailed'] + total_count = results[0]['numberRecordsTotal'] + failed_count = results[0]['numberRecordsFailed'] + + print(f'\nInsertion complete:') + print(f'Job Id: {results[0]["job_id"]}') + print(f'Total records: {total_count}') + print(f'Successful: {success_count}') + print(f'Failed: {failed_count}') + + if failed_count > 0: + for result in results: + job_id = result['job_id'] + failed_records_file = f'failed_records.csv' + bulk_api.get_failed_records(job_id, file=failed_records_file) + print(f'Failed records for job {job_id} saved to {failed_records_file}') + + if success_count > 0: + for result in results: + job_id = result['job_id'] + successful_records_file = f'successful_records.csv' + bulk_api.get_successful_records(job_id, file=successful_records_file) + print(f'Successful records for job {job_id} saved to {successful_records_file}') + + return results + + except Exception as e: + print(f'Error: {str(e)}') + raise \ No newline at end of file