From 19eacdf2f52ed38bf54573177b051373dab561e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rene=20Ka=C3=9Feb=C3=B6hmer?= Date: Wed, 9 Apr 2025 11:21:08 +0200 Subject: [PATCH] data extract switch to simple_salesforce bulkv2 --- prepared_steps/1_extract_data/command.txt | 2 +- .../extract_via_simple_salesforce.py | 96 +++++++++++++++---- prepared_steps/1_extract_data/queries.json | 26 +++++ .../2_transform_via_script/LocationScript.py | 19 ++-- 4 files changed, 115 insertions(+), 28 deletions(-) create mode 100644 prepared_steps/1_extract_data/queries.json diff --git a/prepared_steps/1_extract_data/command.txt b/prepared_steps/1_extract_data/command.txt index a28faa4..c8840c6 100644 --- a/prepared_steps/1_extract_data/command.txt +++ b/prepared_steps/1_extract_data/command.txt @@ -1 +1 @@ -sf sfdmu run --sourceusername rene.kasseboehmer@vaillant.de --targetusername csvfile \ No newline at end of file +python .\extract_via_simple_salesforce.py --context prod \ No newline at end of file diff --git a/prepared_steps/1_extract_data/extract_via_simple_salesforce.py b/prepared_steps/1_extract_data/extract_via_simple_salesforce.py index 5686350..b58fb44 100644 --- a/prepared_steps/1_extract_data/extract_via_simple_salesforce.py +++ b/prepared_steps/1_extract_data/extract_via_simple_salesforce.py @@ -4,10 +4,27 @@ # --output_path extracted_data import os +import itertools +import threading +import time +import sys +import json import pandas as pd from dotenv import load_dotenv, find_dotenv from simple_salesforce import Salesforce +done = False + +#here is the animation +def animate(): + for c in itertools.cycle(['|', '/', '-', '\\']): + if done: + break + sys.stdout.write('\rloading ' + c) + sys.stdout.flush() + time.sleep(0.1) + sys.stdout.write('\rDone! ') + def get_credentials(context): """ Get credentials for a given context from the .env file @@ -39,12 +56,11 @@ def get_credentials(context): #print(f'{context}_SF_', key, value) if f'{context}_SF_' in key: credential_key = key.split(f'{context}_SF_')[-1].upper() - print(credential_key) credentials[credential_key] = value return credentials -def extract_data(object_id, output_path='output', context='qa2'): +def extract_data(object_id, query, output_path='output', context='qa2'): """ Extract data using Bulk API and save as CSV @@ -54,10 +70,12 @@ def extract_data(object_id, output_path='output', context='qa2'): context (str): Context name for credentials (e.g., 'qa2', 'prod') """ try: + + global done + done = False # Get credentials based on context credentials = get_credentials(context) - print(credentials) if not all(credentials.values()): raise ValueError(f"Missing credentials for context: {context}") @@ -66,19 +84,32 @@ def extract_data(object_id, output_path='output', context='qa2'): username=credentials['USERNAME'], password=credentials['PASSWORD'], security_token=credentials['SECURITY_TOKEN'], - domain=credentials['DOMAIN'] + domain=credentials['DOMAIN'], + version='62.0' ) # Create a simple query for the desired object - soql_query = f""" - SELECT Id, Name - FROM SCInstalledBase__c - WHERE Country__c = 'NL' limit 1000 - """ + #soql_query = f""" + # SELECT Id, City__c, Country__c, GeoY__c, GeoX__c, PostalCode__c, Street__c, Extension__c, HouseNo__c, FlatNo__c, Floor__c FROM SCInstalledBaseLocation__c WHERE Country__c = 'NL' + #""" - sf.bulk2.__getattr__("SCInstalledBase__c").download( - soql_query, path="./", max_records=200000 + t = threading.Thread(target=animate) + t.start() + + #sf.bulk2.__getattr__("SCInstalledBaseLocation__c").download( + # soql_query, path="./", max_records=2000000 + #) + + results = sf.bulk2.__getattr__(object_id).query( + query, max_records=2000000 ) + print(f'Extracting: {object_id}') + for i, data in enumerate(results): + with open(f"results/{object_id}.csv", "w", encoding="utf-8") as bos: + bos.write(data) + + time.sleep(10) + done = True """ # Execute the Bulk query job @@ -111,6 +142,7 @@ def extract_data(object_id, output_path='output', context='qa2'): return csv_file """ except Exception as e: + done = True raise ValueError(f'Error extracting data: {str(e)}') if __name__ == '__main__': @@ -120,18 +152,42 @@ if __name__ == '__main__': parser = argparse.ArgumentParser(description='Extract Salesforce data via Bulk API') parser.add_argument('--context', type=str, required=True, help='Context name (e.g., "qa2", "prod")') - parser.add_argument('--object_id', type=str, required=True, - help='Account, SCInstalledBaseLocation__c, SCInstalledBase__c, Product2') parser.add_argument('--output_path', type=str, required=False, help='./') + parser.add_argument('--sobjects', type=str, required=False, + help='Define which sobjects from queries.json to extract. (--sobjects Account,Contact,CustomObject__c) Otherwise all from queries.json will be extracted') args = parser.parse_args() + + # Load queries from JSON file + with open('queries.json') as f: + queries = json.load(f)['queries'] - # Extract data using parameters - output_file = extract_data( - object_id=args.object_id, - output_path=args.output_path, - context=args.context - ) + # Process each query in sequence + selected_sobjects = [] + if args.sobjects: + selected_sobjects = [sobject.strip() for sobject in args.sobjects.split(',')] - print(f'File saved at: {output_file}') \ No newline at end of file + for query_def in queries: + sobject = query_def['sobject'] + query_str = query_def['query'] + + # Skip if --sobjects is provided and current sobject isn't in the list + if selected_sobjects and sobject not in selected_sobjects: + print(f'Skipping {sobject} as it is not specified in --sobjects') + continue + + try: + print(f'\nRunning query for {sobject}:') + output_file = extract_data( + object_id=sobject, + query=query_str, + output_path=args.output_path, + context=args.context + ) + print(f'File saved at: {output_file}') + except Exception as e: + print(f'Error running query for {sobject}: {str(e)}') + # Optionally, you can choose to skip failed queries or exit here + + print('\nExtraction complete.') \ No newline at end of file diff --git a/prepared_steps/1_extract_data/queries.json b/prepared_steps/1_extract_data/queries.json new file mode 100644 index 0000000..1d16a71 --- /dev/null +++ b/prepared_steps/1_extract_data/queries.json @@ -0,0 +1,26 @@ +{ "queries": + [ + { + "sobject": "SCInstalledBaseLocation__c", + "query": "SELECT Id, City__c, Country__c, GeoY__c, GeoX__c, PostalCode__c, Street__c, Extension__c, HouseNo__c, FlatNo__c, Floor__c FROM SCInstalledBaseLocation__c WHERE Country__c = 'NL'" + },{ + "sobject": "SCInstalledBase__c", + "query": "SELECT Id, Name, CommissioningDate__c,InstallationDate__c,ProductEnergy__c, ProductUnitClass__c,ArticleNo__c,SerialNo__c, SerialNoException__c, ProductUnitType__c, InstalledBaseLocation__c FROM SCInstalledBase__c WHERE Country__c = 'NL'" + },{ + "sobject": "Asset", + "query": "SELECT Id, Serialnumber FROM Asset WHERE Location.ParentLocation.Name LIKE '%NL'" + },{ + "sobject": "Address", + "query": "SELECT Id, Country, CountryCode, Street, City, ParentId, PostalCode FROM Address WHERE CountryCode = 'NL'" + },{ + "sobject": "ParentLocation", + "query": "SELECT Id, Name FROM Location WHERE Name LIKE '%NL'" + },{ + "sobject": "ChildLocation", + "query": "SELECT Id, ParentLocationId, Name FROM Location WHERE ParentLocation.Name LIKE '%NL'" + },{ + "sobject": "Product2", + "query": "SELECT Id, Main_Product_Group__c, Family, MaterialType__c, Name, Product_Code__c, ProductCode, EAN_Product_Code__c FROM Product2" + } + ] +} \ No newline at end of file diff --git a/prepared_steps/2_transform_via_script/LocationScript.py b/prepared_steps/2_transform_via_script/LocationScript.py index 81915d5..ff9e14b 100644 --- a/prepared_steps/2_transform_via_script/LocationScript.py +++ b/prepared_steps/2_transform_via_script/LocationScript.py @@ -5,9 +5,9 @@ country_mapping = { } # Read the input CSV file, assuming the second row is the header -read_df = pd.read_csv('../1_extract_data/target/SCInstalledBaseLocation__c_upsert_target.csv', header=0, keep_default_na=False, dtype=str) -read_df_ib = pd.read_csv('../1_extract_data/target/object-set-2/SCInstalledBase__c_upsert_target.csv', header=0, keep_default_na=False, dtype=str) -read_df_product2 = pd.read_csv('../1_extract_data/target/object-set-6/Product2_upsert_target.csv', header=0, keep_default_na=False, dtype=str) +read_df = pd.read_csv('../1_extract_data/results/SCInstalledBaseLocation__c.csv', header=0, keep_default_na=False, dtype=str) +read_df_ib = pd.read_csv('../1_extract_data/results/SCInstalledBase__c.csv', header=0, keep_default_na=False, dtype=str) +read_df_product2 = pd.read_csv('../1_extract_data/results/Product2.csv', header=0, keep_default_na=False, dtype=str) for row in read_df.to_dict('records'): try: @@ -17,11 +17,16 @@ for row in read_df.to_dict('records'): print(f'KeyError: {e}') # Columns for reindexing -reindex_columns = ['City__c','Country__c','Extension__c','FlatNo__c','Floor__c','GeoX__c','GeoY__c','HouseNo__c','Id','PostalCode__c','Street__c'] +#"Id","City__c","Country__c","GeoY__c","GeoX__c","PostalCode__c","Street__c","Extension__c","HouseNo__c","FlatNo__c","Floor__c" +reindex_columns = ['Id','City__c','Country__c','GeoY__c','GeoX__c','PostalCode__c','Street__c','Extension__c','HouseNo__c','FlatNo__c','Floor__c'] # ArticleNo__c,CommissioningDate__c,Id,InstallationDate__c,InstalledBaseLocation__c,InstalledBaseLocation__r.Id,Name,ProductEnergy__c,ProductUnitClass__c,ProductUnitType__c,SerialNo__c,SerialNoException__c -reindex_columns_ib = ['ArticleNo__c','CommissioningDate__c','Id','InstallationDate__c','InstalledBaseLocation__c','InstalledBaseLocation__r.Id','Name','ProductEnergy__c','ProductUnitClass__c','ProductUnitType__c','SerialNo__c','SerialNoException__c'] +#"Id","Name","CommissioningDate__c","InstallationDate__c","ProductEnergy__c","ProductUnitClass__c","ArticleNo__c","SerialNo__c","SerialNoException__c","ProductUnitType__c","InstalledBaseLocation__c" +reindex_columns_ib = ['Id','Name','CommissioningDate__c','InstallationDate__c','ProductEnergy__c','ProductUnitClass__c','ArticleNo__c','SerialNo__c','SerialNoException__c','ProductUnitType__c','InstalledBaseLocation__c'] +#reindex_columns_ib = ['ArticleNo__c','CommissioningDate__c','Id','InstallationDate__c','InstalledBaseLocation__c','InstalledBaseLocation__r.Id','Name','ProductEnergy__c','ProductUnitClass__c','ProductUnitType__c','SerialNo__c','SerialNoException__c'] # EAN_Product_Code__c,Family,Id,Main_Product_Group__c,MaterialType__c,Name,Product_Code__c,ProductCode -reindex_columns_product2 = ['EAN_Product_Code__c','Family','Id','Main_Product_Group__c','MaterialType__c','Name','Product_Code__c','ProductCode'] +#"Id","Main_Product_Group__c","Family","MaterialType__c","Name","Product_Code__c","ProductCode","EAN_Product_Code__c" +reindex_columns_product2 = ['Id','Main_Product_Group__c','Family','MaterialType__c','Name','Product_Code__c','ProductCode','EAN_Product_Code__c'] +#reindex_columns_product2 = ['EAN_Product_Code__c','Family','Id','Main_Product_Group__c','MaterialType__c','Name','Product_Code__c','ProductCode'] # Reindex the columns to match the desired format df = read_df.reindex(reindex_columns, axis=1) @@ -153,7 +158,7 @@ child_df['LocationType'] = 'Site' #ArticleNo__c,CommissioningDate__c,Id,InstallationDate__c,InstalledBaseLocation__c,InstalledBaseLocation__r.Extension__c,InstalledBaseLocation__r.FlatNo__c,InstalledBaseLocation__r.Floor__c,InstalledBaseLocation__r.Id,Name,ProductEnergy__c,ProductUnitClass__c,ProductUnitType__c,SerialNo__c,SerialNoException__c merged_df_ib = merged_df_ib.drop('InstalledBaseLocation__c', axis=1) -merged_df_ib = merged_df_ib.drop('InstalledBaseLocation__r.Id', axis=1) +#merged_df_ib = merged_df_ib.drop('InstalledBaseLocation__r.Id', axis=1) merged_df_ib = merged_df_ib.drop('Id_y', axis=1) print(merged_df_ib.columns) merged_df_ib.columns = ['Product2.Product_Code__c', 'FSL_1st_Ignition_Date__c', 'Id', 'InstallDate', 'Name', 'Kind_of_Energy__c', 'Kind_of_Installation__c', 'Main_Product_Group__c', 'SerialNumber', 'Serialnumber_Exception__c', 'Location.ExternalReference']