diff --git a/prepared_steps/1_extract_data/export.json b/prepared_steps/1_extract_data/export.json index f5d1293..ea15dc5 100644 --- a/prepared_steps/1_extract_data/export.json +++ b/prepared_steps/1_extract_data/export.json @@ -3,12 +3,12 @@ "useSeparatedCSVFiles": true, "pollingQueryTimeoutMs": 1000000, "bulkApiVersion": "2.0", - "parallelRestJobs": 2, + "queryBulkApiThreshold ": 100, "objectSets": [ { "objects": [ { - "query": "SELECT Id, City__c, Country__c, GeoY__c, GeoX__c, PostalCode__c, Street__c, Extension__c, HouseNo__c, FlatNo__c, Floor__c FROM SCInstalledBaseLocation__c WHERE Country__c = 'NL'", + "query": "SELECT Id, City__c, Country__c, GeoY__c, GeoX__c, PostalCode__c, Street__c, Extension__c, HouseNo__c, FlatNo__c, Floor__c FROM SCInstalledBaseLocation__c WHERE Country__c = 'NL' limit 1000", "externalId": "Name", "operation": "Readonly" } @@ -19,7 +19,18 @@ "query": "SELECT Id, Name, CommissioningDate__c,InstallationDate__c,ProductEnergy__c, ProductUnitClass__c,ArticleNo__c,SerialNo__c, SerialNoException__c, ProductUnitType__c, InstalledBaseLocation__c FROM SCInstalledBase__c WHERE Country__c = 'NL'", "externalId": "Name", "operation": "Readonly", - "excludedFromUpdateFields": ["InstalledBaseLocation__c"] + "master":true, + "excludedFromUpdateFields": ["InstalledBaseLocation__c"], + "skipRecordsComparison": true, + "parallelRestJobs": 4, + "restApiBatchSize": 9500, + "fieldMapping": [ + { + "sourceField": "InstalledBaseLocation__c", + "targetField": "Id", + "targetObject": "SCInstalledBaseLocation__c" + } + ] } ] },{ @@ -33,10 +44,10 @@ },{ "objects": [ { - "query": "SELECT Id, Country, CountryCode, Street, City, ParentId PostalCode FROM Address WHERE CountryCode = 'NL'", + "query": "SELECT Id, Country, CountryCode, Street, City, ParentId, PostalCode FROM Address WHERE CountryCode = 'NL'", "externalId": "Name", "operation": "Readonly", - "excludedFields": ["ParentId"] + "excludedFromUpdateFields": ["ParentId"] } ] @@ -48,6 +59,14 @@ "operation": "Readonly" } ] + },{ + "objects": [ + { + "query": "SELECT Id, Main_Product_Group__c, Family, MaterialType__c, Name, Product_Code__c, ProductCode, EAN_Product_Code__c FROM Product2", + "externalId": "Name", + "operation": "Readonly" + } + ] } ] } \ No newline at end of file diff --git a/prepared_steps/1_extract_data/extract_via_simple_salesforce.py b/prepared_steps/1_extract_data/extract_via_simple_salesforce.py new file mode 100644 index 0000000..5686350 --- /dev/null +++ b/prepared_steps/1_extract_data/extract_via_simple_salesforce.py @@ -0,0 +1,137 @@ +# python extract_via_simple_salesforce.py \ +# --context qa2 \ +# --object_id Account \ +# --output_path extracted_data + +import os +import pandas as pd +from dotenv import load_dotenv, find_dotenv +from simple_salesforce import Salesforce + +def get_credentials(context): + """ + Get credentials for a given context from the .env file + + Args: + context (str): Context name (e.g., 'qa2', 'prod') + + Returns: + dict: Credentials dictionary with username, password, and security_token + """ + context = context.upper() + + # Initialize credentials dictionary + credentials = { + 'USERNAME': None, + 'PASSWORD': None, + 'SECURITY_TOKEN': None, + 'DOMAIN': 'test' + } + + # Load the .env file explicitly from one directory above + env_file = find_dotenv("../.env") + load_dotenv(env_file) + + # Load all environment variables + env_vars = os.environ + + for key, value in env_vars.items(): + #print(f'{context}_SF_', key, value) + if f'{context}_SF_' in key: + credential_key = key.split(f'{context}_SF_')[-1].upper() + print(credential_key) + credentials[credential_key] = value + + return credentials + +def extract_data(object_id, output_path='output', context='qa2'): + """ + Extract data using Bulk API and save as CSV + + Args: + object_id (str): Salesforce object ID + output_path (str): Path to save the output file (default 'output') + context (str): Context name for credentials (e.g., 'qa2', 'prod') + """ + try: + # Get credentials based on context + credentials = get_credentials(context) + + print(credentials) + if not all(credentials.values()): + raise ValueError(f"Missing credentials for context: {context}") + + # Initialize Salesforce bulk connector + sf = Salesforce( + username=credentials['USERNAME'], + password=credentials['PASSWORD'], + security_token=credentials['SECURITY_TOKEN'], + domain=credentials['DOMAIN'] + ) + + # Create a simple query for the desired object + soql_query = f""" + SELECT Id, Name + FROM SCInstalledBase__c + WHERE Country__c = 'NL' limit 1000 + """ + + sf.bulk2.__getattr__("SCInstalledBase__c").download( + soql_query, path="./", max_records=200000 + ) + + """ + # Execute the Bulk query job + job = sf.bulk2.__getattr__("SCInstalledBase__c").query(soql_query) + + # Polling for job completion (might take a moment) + job_id = job['id'] + while True: + status = sf.bulk.job(job_id).get()['status'] + if status == 'Complete' or status == 'Closed' : + break + if status == 'Aborted': + exit(1) + if status == 'Failed': + raise ValueError(f'Job failed: {job_id}') + + + # Get the results + result = sf.bulk.result(job_id) + df = pd.DataFrame(result.records) + + # Create output directory if it doesn't exist + os.makedirs(output_path, exist_ok=True) + + # Save to CSV file + csv_file = os.path.join(output_path, f'{object_id}_data.csv') + df.to_csv(csv_file, index=False) + + print(f'Successfully extracted {len(df)} records from {object_id}') + return csv_file + """ + except Exception as e: + raise ValueError(f'Error extracting data: {str(e)}') + +if __name__ == '__main__': + import argparse + + # Parse command-line arguments + parser = argparse.ArgumentParser(description='Extract Salesforce data via Bulk API') + parser.add_argument('--context', type=str, required=True, + help='Context name (e.g., "qa2", "prod")') + parser.add_argument('--object_id', type=str, required=True, + help='Account, SCInstalledBaseLocation__c, SCInstalledBase__c, Product2') + parser.add_argument('--output_path', type=str, required=False, + help='./') + + args = parser.parse_args() + + # Extract data using parameters + output_file = extract_data( + object_id=args.object_id, + output_path=args.output_path, + context=args.context + ) + + print(f'File saved at: {output_file}') \ No newline at end of file diff --git a/prepared_steps/2_transform_via_script/LocationScript.py b/prepared_steps/2_transform_via_script/LocationScript.py index c090ae7..81915d5 100644 --- a/prepared_steps/2_transform_via_script/LocationScript.py +++ b/prepared_steps/2_transform_via_script/LocationScript.py @@ -5,8 +5,10 @@ country_mapping = { } # Read the input CSV file, assuming the second row is the header -read_df = pd.read_csv('../1_extract_data/SCInstalledBaseLocation__c.csv', header=0, keep_default_na=False, dtype=str) -read_df_ib = pd.read_csv('../1_extract_data/SCInstalledBase__c.csv', header=0, keep_default_na=False, dtype=str) +read_df = pd.read_csv('../1_extract_data/target/SCInstalledBaseLocation__c_upsert_target.csv', header=0, keep_default_na=False, dtype=str) +read_df_ib = pd.read_csv('../1_extract_data/target/object-set-2/SCInstalledBase__c_upsert_target.csv', header=0, keep_default_na=False, dtype=str) +read_df_product2 = pd.read_csv('../1_extract_data/target/object-set-6/Product2_upsert_target.csv', header=0, keep_default_na=False, dtype=str) + for row in read_df.to_dict('records'): try: # Your processing logic here @@ -18,10 +20,13 @@ for row in read_df.to_dict('records'): reindex_columns = ['City__c','Country__c','Extension__c','FlatNo__c','Floor__c','GeoX__c','GeoY__c','HouseNo__c','Id','PostalCode__c','Street__c'] # ArticleNo__c,CommissioningDate__c,Id,InstallationDate__c,InstalledBaseLocation__c,InstalledBaseLocation__r.Id,Name,ProductEnergy__c,ProductUnitClass__c,ProductUnitType__c,SerialNo__c,SerialNoException__c reindex_columns_ib = ['ArticleNo__c','CommissioningDate__c','Id','InstallationDate__c','InstalledBaseLocation__c','InstalledBaseLocation__r.Id','Name','ProductEnergy__c','ProductUnitClass__c','ProductUnitType__c','SerialNo__c','SerialNoException__c'] +# EAN_Product_Code__c,Family,Id,Main_Product_Group__c,MaterialType__c,Name,Product_Code__c,ProductCode +reindex_columns_product2 = ['EAN_Product_Code__c','Family','Id','Main_Product_Group__c','MaterialType__c','Name','Product_Code__c','ProductCode'] # Reindex the columns to match the desired format df = read_df.reindex(reindex_columns, axis=1) df_ib = read_df_ib.reindex(reindex_columns_ib, axis=1) +df_product2 = read_df_product2.reindex(reindex_columns_product2, axis=1) df['Street'] = ( df['Street__c'].astype(str) + ' ' + @@ -151,7 +156,20 @@ merged_df_ib = merged_df_ib.drop('InstalledBaseLocation__c', axis=1) merged_df_ib = merged_df_ib.drop('InstalledBaseLocation__r.Id', axis=1) merged_df_ib = merged_df_ib.drop('Id_y', axis=1) print(merged_df_ib.columns) -merged_df_ib.columns = ['Product2.EAN_Product_Code__c', 'FSL_1st_Ignition_Date__c', 'Id', 'InstallDate', 'Name', 'Kind_of_Energy__c', 'Kind_of_Installation__c', 'Main_Product_Group__c', 'SerialNumber', 'Serialnumber_Exception__c', 'Location.ExternalReference'] +merged_df_ib.columns = ['Product2.Product_Code__c', 'FSL_1st_Ignition_Date__c', 'Id', 'InstallDate', 'Name', 'Kind_of_Energy__c', 'Kind_of_Installation__c', 'Main_Product_Group__c', 'SerialNumber', 'Serialnumber_Exception__c', 'Location.ExternalReference'] +merged_df_ib = merged_df_ib.drop('Main_Product_Group__c', axis=1) + +# assign Main_Product_Group__c based on product2 records +merged_df_ib = pd.merge(merged_df_ib, + df_product2[['Product_Code__c', 'Main_Product_Group__c']], + left_on='Product2.Product_Code__c', + right_on='Product_Code__c', + how='left') + +merged_df_ib = merged_df_ib.drop('Product_Code__c', axis=1) + +merged_df_ib = merged_df_ib.drop_duplicates(subset=['Id'], keep='first') + # Write each DataFrame to a separate CSV file address_df.to_csv('../3_upsert_address_and_parent_location/Address.csv', index=False) @@ -159,4 +177,6 @@ parent_df.to_csv('../3_upsert_address_and_parent_location/Location.csv', index=F child_df.to_csv('../5_upsert_child_location/Location.csv', index=False) merged_df_ib.to_csv('../7_upsert_assets/Asset.csv', index=False) +## end mapping + print('Data has been successfully split into Address.csv, Parent_Location.csv, and Child_Location.csv files with duplicate checks applied.') \ No newline at end of file diff --git a/prepared_steps/7_upsert_assets/ValueMapping.csv b/prepared_steps/7_upsert_assets/ValueMapping.csv index a3be9b2..f013545 100644 --- a/prepared_steps/7_upsert_assets/ValueMapping.csv +++ b/prepared_steps/7_upsert_assets/ValueMapping.csv @@ -1,2 +1,3 @@ ObjectName,FieldName,RawValue,Value -Asset,Kind_of_Energy__c,2, \ No newline at end of file +Asset,Kind_of_Energy__c,4,3 +Asset,Kind_of_Energy__c,5,3 \ No newline at end of file diff --git a/prepared_steps/7_upsert_assets/export.json b/prepared_steps/7_upsert_assets/export.json index 39b6e30..7855644 100644 --- a/prepared_steps/7_upsert_assets/export.json +++ b/prepared_steps/7_upsert_assets/export.json @@ -7,9 +7,9 @@ "operation": "Readonly", "externalId": "ExternalReference" },{ - "query": "SELECT EAN_Product_Code__c FROM Product2 WHERE EAN_Product_Code__c != null", + "query": "SELECT Product_Code__c FROM Product2 WHERE Product_Code__c != null", "operation": "Readonly", - "externalId": "EAN_Product_Code__c" + "externalId": "Product_Code__c " },{ "query": "SELECT Product2Id,Id,InstallDate,Name,Kind_of_Energy__c,Kind_of_Installation__c,Main_Product_Group__c,SerialNumber,Serialnumber_Exception__c,LocationId FROM Asset", "operation": "Insert"