From d4f944dca4598c17da5c04dcd54ed2899735b76b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rene=20Ka=C3=9Feb=C3=B6hmer?= Date: Wed, 14 May 2025 07:41:18 +0200 Subject: [PATCH] general approach for insert script --- .gitignore | 6 +- __pycache__/utils.cpython-313.pyc | Bin 0 -> 2445 bytes .../FillServiceContractFields.py | 12 ++++ .../InsertServiceContracts.py | 47 --------------- .../15_insert_servicecontract/command.txt | 2 +- .../15_insert_servicecontract/run_insert.py | 14 +++++ .../extract_via_simple_salesforce.py | 4 +- utils.py | 56 ++++++++++++++++++ 8 files changed, 90 insertions(+), 51 deletions(-) create mode 100644 __pycache__/utils.cpython-313.pyc delete mode 100644 prepared_steps/15_insert_servicecontract/InsertServiceContracts.py create mode 100644 prepared_steps/15_insert_servicecontract/run_insert.py create mode 100644 utils.py diff --git a/.gitignore b/.gitignore index 2f4f2b6..a65adcc 100644 --- a/.gitignore +++ b/.gitignore @@ -31,4 +31,8 @@ ServiceContract.csv* SCContract__c.csv AssociatedLocation_beforetransform.csv AssociatedLocation.csv* -ServiceContract_beforetransform.csv \ No newline at end of file +ServiceContract_beforetransform.csv +failed_records_*.csv +successful_records_*.csv +failed_records.csv +successful_records.csv \ No newline at end of file diff --git a/__pycache__/utils.cpython-313.pyc b/__pycache__/utils.cpython-313.pyc new file mode 100644 index 0000000000000000000000000000000000000000..07ce3457649a1e1287cb9a6aae57b397fef697b7 GIT binary patch literal 2445 zcmai0O>7fK6rQ!c_S$PNP8@6}fv^c6$4%luP)cxVs{jcQ1f|(7jTD2~*yF_D?3&qi z2rE^pxYWlWwW_05YK}qeAxC=SK&2knibbsjB&2eor&Q^oYA>Bx|HL7nj^v$r^WMDg zdvD&%c-!am0NiEqJIy#lHqXIz}Q2fD7+o-fqz? zSopF>Xp>o-85Wayqs{y%w4bu!9Ph9Xh#tHH9U_Y{96P)(4+6jf9*EBW!P+l-F_!NQ zAS}-OvNTa6VozoUh*`Pd*Ixg(84aF<6)IufNBjaA9;I@d7b zGe}wFb$cup%lab#ye5SkwV9qdNlPsx)vOLp7W+;W5Q1{*9wH~s49bPYIf$lf*}008 z6jW81HUqonCnZceZMN^4{7liLoZjKNXeA!s9yn!-`5e?>TxX_~IsRfg&YR9#%3Ly= z*7sTF4YF}w_X>p+0m~F}SYm1rc+{S;z>@V1i>1^ha&?i^=%Jcb!;7`Y*gvm(cvV`0 zX_8Als5Tw9I?=m#0QWj@O(3M8IDaMTMh=n~I*+olX1eelDQOxqX_EL6(^)Gsvz^?X zyP29n>ptBBZn`IyQ!r2NSJO^PjWb>K(lgnX8ZsFis#PAQP0g4L0ZdBytm#5fE#x%S z^wlH7H+BJYV>Pk^b-`OFQ&Tx>OOTg@64=eN1`*nurJ90*Dmks>fb29AosB0%e8AKM zx-H=fnNt$@8e>R>F@6trwkjpgr|0V@M_+j=>mBbXa`2i1r8HTn{4wMS|0xv!^m zcheUs+MY8$!{5K&z3$t%@%X~y_^*P&eOh6r|8jzk?$XE)W2<9pvzvkAMdoFD*SEtD zhO6F=YUn^U+_~iffxa!^2zZLFEidTg*QiyGarlFB`1GT@kMI1f8&k>h$2ZI2n?+wW zc(5crxMlQ?mV@sX*=ne#bmxI?^qnk+#){rogu~Ga6TxIwm{Mr1ZMAFdLb-Fu2*(U= zxWXJGihXNr=?jA!sxUDkdP~dCoU$UW6?tBx>&CZzu9o+4;8t; zU$GtAz=k&gN8$S3UK;<-xw333zwKw&=#x{Y3=+&u#F7>I!w%%(~C;Fkb@tg s0W6X~C+Z>m*|Jj<^$fUQfQ~9~e8t{pi{~q#;{`an?HHjrgkM(hAHds$vj6}9 literal 0 HcmV?d00001 diff --git a/prepared_steps/14_fill_pricebook2id_in_servicecontract/FillServiceContractFields.py b/prepared_steps/14_fill_pricebook2id_in_servicecontract/FillServiceContractFields.py index 2f69ee4..5769170 100644 --- a/prepared_steps/14_fill_pricebook2id_in_servicecontract/FillServiceContractFields.py +++ b/prepared_steps/14_fill_pricebook2id_in_servicecontract/FillServiceContractFields.py @@ -42,6 +42,18 @@ print(merged_df_sc) #Rename columns merged_df_sc.columns = ['PKey__c','Status','BillingCountryCode','Term','EndDate','StartDate','AccountId','Service_Recipient__c','IoT_Registration_Status__c','Name','Pricebook2Id', 'TemplateId__c'] +# Convert StartDate and EndDate to datetime +merged_df_sc['StartDate'] = pd.to_datetime(merged_df_sc['StartDate']) +merged_df_sc['EndDate'] = pd.to_datetime(merged_df_sc['EndDate']) + +# Calculate Term in months +merged_df_sc['Term'] = ((merged_df_sc['EndDate'] - merged_df_sc['StartDate']) / pd.Timedelta(days=30.44)).round().astype(int) + +# Convert dates back to string format (YYYY-MM-DD) +merged_df_sc['StartDate'] = merged_df_sc['StartDate'].dt.strftime('%Y-%m-%d') +merged_df_sc['EndDate'] = merged_df_sc['EndDate'].dt.strftime('%Y-%m-%d') + +merged_df_sc = merged_df_sc.drop('Status', axis=1) #safe csv merged_df_sc.to_csv('../15_insert_servicecontract/ServiceContract.csv', index=False) diff --git a/prepared_steps/15_insert_servicecontract/InsertServiceContracts.py b/prepared_steps/15_insert_servicecontract/InsertServiceContracts.py deleted file mode 100644 index c95980f..0000000 --- a/prepared_steps/15_insert_servicecontract/InsertServiceContracts.py +++ /dev/null @@ -1,47 +0,0 @@ -import os -import argparse -from sys import path -path.append('../..') -from sf_auth import get_sf_connection - -def insert_service_contracts(context): - """ - Insert ServiceContract records using Bulk API 2.0 - - Args: - context (str): Salesforce org context (e.g., 'qa2', 'prod') - """ - try: - # Get Salesforce connection - sf = get_sf_connection(context) - - csv_file = 'ServiceContract.csv' - print(f'Starting bulk insert of ServiceContract records from {csv_file}...') - - # Use bulk API 2.0 to insert records directly from CSV - results = sf.bulk2.ServiceContract.insert( - csv_file, - batch_size=10000, - concurrency=5 - ) - - # Count successes and failures - success_count = sum(1 for result in results if result['success']) - total_count = len(results) - - print(f'\nInsertion complete:') - print(f'Total records: {total_count}') - print(f'Successful: {success_count}') - print(f'Failed: {total_count - success_count}') - - except Exception as e: - print(f'Error: {str(e)}') - raise - -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Insert ServiceContract records via Bulk API') - parser.add_argument('--context', type=str, required=True, - help='Salesforce org context (e.g., "qa2", "prod")') - - args = parser.parse_args() - insert_service_contracts(args.context) \ No newline at end of file diff --git a/prepared_steps/15_insert_servicecontract/command.txt b/prepared_steps/15_insert_servicecontract/command.txt index 0389686..1727c23 100644 --- a/prepared_steps/15_insert_servicecontract/command.txt +++ b/prepared_steps/15_insert_servicecontract/command.txt @@ -1 +1 @@ -sf sfdmu run --sourceusername rene.kasseboehmer@vaillant.de.devrene --targetusername rene.kasseboehmer@vaillant.de.devrene \ No newline at end of file +python run_insert.py --context qa2 --csv ServiceContract.csv \ No newline at end of file diff --git a/prepared_steps/15_insert_servicecontract/run_insert.py b/prepared_steps/15_insert_servicecontract/run_insert.py new file mode 100644 index 0000000..ce7392e --- /dev/null +++ b/prepared_steps/15_insert_servicecontract/run_insert.py @@ -0,0 +1,14 @@ +import argparse +from sys import path +path.append('../..') +from utils import bulk_insert_records + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Insert ServiceContract records via Bulk API') + parser.add_argument('--context', type=str, required=True, + help='Salesforce org context (e.g., "qa2", "prod")') + parser.add_argument('--csv', type=str, default='ServiceContract.csv', + help='CSV file to process (default: ServiceContract.csv)') + + args = parser.parse_args() + bulk_insert_records(args.context, 'ServiceContract', args.csv) \ No newline at end of file diff --git a/prepared_steps/1_extract_data/extract_via_simple_salesforce.py b/prepared_steps/1_extract_data/extract_via_simple_salesforce.py index 09fe953..fba97e2 100644 --- a/prepared_steps/1_extract_data/extract_via_simple_salesforce.py +++ b/prepared_steps/1_extract_data/extract_via_simple_salesforce.py @@ -47,8 +47,8 @@ def extract_data(object_id, query, output_path='output', context='qa2'): t = threading.Thread(target=animate) t.start() - results = sf.bulk2.__getattr__(object_id).query( - query, max_records=2000000 + results = sf.bulk2.__getattr__(object_id).query_all( + query ) print(f'Extracting: {object_id}') for i, data in enumerate(results): diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..7146bf5 --- /dev/null +++ b/utils.py @@ -0,0 +1,56 @@ +import os +from sys import path +path.append('../..') +from sf_auth import get_sf_connection + +def bulk_insert_records(context, object_name, csv_file): + """ + Generic bulk insert function for Salesforce records + + Args: + context (str): Salesforce org context (e.g., 'qa2', 'prod') + object_name (str): Salesforce object API name + csv_file (str): Path to the CSV file containing records + """ + try: + sf = get_sf_connection(context) + print(f'Starting bulk insert of {object_name} records from {csv_file}...') + + # Get the bulk API object dynamically + bulk_api = getattr(sf.bulk2, object_name) + + results = bulk_api.insert( + csv_file, + batch_size=10000, + concurrency=5 + ) + + success_count = results[0]['numberRecordsProcessed'] - results[0]['numberRecordsFailed'] + total_count = results[0]['numberRecordsTotal'] + failed_count = results[0]['numberRecordsFailed'] + + print(f'\nInsertion complete:') + print(f'Job Id: {results[0]["job_id"]}') + print(f'Total records: {total_count}') + print(f'Successful: {success_count}') + print(f'Failed: {failed_count}') + + if failed_count > 0: + for result in results: + job_id = result['job_id'] + failed_records_file = f'failed_records.csv' + bulk_api.get_failed_records(job_id, file=failed_records_file) + print(f'Failed records for job {job_id} saved to {failed_records_file}') + + if success_count > 0: + for result in results: + job_id = result['job_id'] + successful_records_file = f'successful_records.csv' + bulk_api.get_successful_records(job_id, file=successful_records_file) + print(f'Successful records for job {job_id} saved to {successful_records_file}') + + return results + + except Exception as e: + print(f'Error: {str(e)}') + raise \ No newline at end of file