diff --git a/.gitignore b/.gitignore index a65adcc..804eb5d 100644 --- a/.gitignore +++ b/.gitignore @@ -35,4 +35,7 @@ ServiceContract_beforetransform.csv failed_records_*.csv successful_records_*.csv failed_records.csv -successful_records.csv \ No newline at end of file +successful_records.csv +server.key +server.crt +AssetWarranty.csv \ No newline at end of file diff --git a/__pycache__/sf_auth.cpython-313.pyc b/__pycache__/sf_auth.cpython-313.pyc index b01a630..4a0041a 100644 Binary files a/__pycache__/sf_auth.cpython-313.pyc and b/__pycache__/sf_auth.cpython-313.pyc differ diff --git a/prepared_steps/14_fill_pricebook2id_in_servicecontract/FillServiceContractFields.py b/prepared_steps/14_fill_pricebook2id_in_servicecontract/FillServiceContractFields.py index 5769170..88317f5 100644 --- a/prepared_steps/14_fill_pricebook2id_in_servicecontract/FillServiceContractFields.py +++ b/prepared_steps/14_fill_pricebook2id_in_servicecontract/FillServiceContractFields.py @@ -4,7 +4,7 @@ read_df_sc = pd.read_csv('../15_insert_servicecontract/ServiceCOntract_beforetra read_df_pb2 = pd.read_csv('../12_insert_pricebook2_and_pricebookentries/target/Pricebook2_insert_target.csv', header=0, keep_default_na=False, dtype=str) read_df_sct = pd.read_csv('../13_insert_servicecontracttemplates_dummies/target/ServiceContract_insert_target.csv', header=0, keep_default_na=False, dtype=str) -reindex_columns_sc = ['PKey__c','TemplateId__r.PKey__c','Status','BillingCountryCode','Term','EndDate','StartDate','AccountId','Service_Recipient__c','IoT_Registration_Status__c','Pricebook2.Name','Name'] +reindex_columns_sc = ['PKey__c', 'Name', 'TemplateId__r.PKey__c','Status','BillingCountryCode','Term','EndDate','StartDate','AccountId','Service_Recipient__c','IoT_Registration_Status__c','Pricebook2.Name','Name'] reindex_columns_pb2 = ['Brand__c','Business_Type__c','Country__c','Errors','Id','IsActive','Name'] reindex_columns_sct = ['BillingCountryCode','Errors','Id','IsTemplate__c','Name','PKey__c','Pricebook2Id','Term'] diff --git a/prepared_steps/1_extract_data/extract_via_simple_salesforce.py b/prepared_steps/1_extract_data/extract_via_simple_salesforce.py index fba97e2..c0a4a13 100644 --- a/prepared_steps/1_extract_data/extract_via_simple_salesforce.py +++ b/prepared_steps/1_extract_data/extract_via_simple_salesforce.py @@ -26,43 +26,73 @@ def animate(): time.sleep(0.1) sys.stdout.write('\rDone! ') -# ...existing code for animate() function... - -def extract_data(object_id, query, output_path='output', context='qa2'): +def extract_data(object_id, query, output_path='output', context='qa2', use_rest=False): """ - Extract data using Bulk API and save as CSV + Extract data using either REST or Bulk API and save as CSV Args: object_id (str): Salesforce object ID - output_path (str): Path to save the output file (default 'output') - context (str): Context name for credentials (e.g., 'qa2', 'prod') + query (str): SOQL query + output_path (str): Path to save the output file + context (str): Context name for credentials + use_rest (bool): Whether to use REST API instead of Bulk API """ + global done + done = False + t = None + try: - global done - done = False - - # Get Salesforce connection using the new module + # Get Salesforce connection sf = get_sf_connection(context) + # Start animation thread t = threading.Thread(target=animate) t.start() - results = sf.bulk2.__getattr__(object_id).query_all( - query - ) print(f'Extracting: {object_id}') - for i, data in enumerate(results): - with open(f"results/{object_id}.csv", "w", encoding="utf-8") as bos: - bos.write(data) + + # Choose API method based on use_rest flag + if use_rest: + # Use REST API query + results = sf.query(query) + records = results['records'] + + # Handle pagination if needed + while not results['done']: + results = sf.query_more(results['nextRecordsUrl'], identifier_is_url=True) + records.extend(results['records']) + + # Convert records to CSV format + if records: + df = pd.DataFrame(records) + df = df.drop('attributes', axis=1) # Remove Salesforce metadata + output_file = f"results/{object_id}.csv" + os.makedirs('results', exist_ok=True) + df.to_csv(output_file, index=False) + + else: + # Use Bulk API query + results = sf.bulk2.__getattr__(object_id).query( + query, max_records=2000000 + ) + + # Write results to file + output_file = f"results/{object_id}.csv" + os.makedirs('results', exist_ok=True) + for data in results: + with open(output_file, "w", encoding="utf-8") as bos: + bos.write(data) - time.sleep(10) - done = True - t.do_run = False + return output_file except Exception as e: - done = True - t.do_run = False raise ValueError(f'Error extracting data: {str(e)}') + + finally: + # Clean up thread and animation state + done = True + if t and t.is_alive(): + t.join(timeout=1) if __name__ == '__main__': import argparse @@ -92,23 +122,19 @@ if __name__ == '__main__': for query_def in queries: sobject = query_def['sobject'] query_str = query_def['query'].replace('{country}', args.country) + use_rest = query_def.get('useREST', False) # Default to False if not specified - # Skip if --sobjects is provided and current sobject isn't in the list - if selected_sobjects and sobject not in selected_sobjects: - print(f'Skipping {sobject} as it is not specified in --sobjects') - continue - - try: - print(f'\nRunning query for {sobject}:') - output_file = extract_data( - object_id=sobject, - query=query_str, - output_path=args.output_path, - context=args.context - ) - print(f'File saved at: {output_file}') - except Exception as e: - print(f'Error running query for {sobject}: {str(e)}') - # Optionally, you can choose to skip failed queries or exit here + if not selected_sobjects or sobject in selected_sobjects: + try: + output_file = extract_data( + object_id=sobject, + query=query_str, + output_path=args.output_path, + context=args.context, + use_rest=use_rest + ) + print(f'Successfully extracted {sobject} to {output_file}') + except Exception as e: + print(f'Error extracting {sobject}: {str(e)}') print('\nExtraction complete.') \ No newline at end of file diff --git a/prepared_steps/1_extract_data/queries.json b/prepared_steps/1_extract_data/queries.json index fb3efa9..be6ba64 100644 --- a/prepared_steps/1_extract_data/queries.json +++ b/prepared_steps/1_extract_data/queries.json @@ -5,7 +5,7 @@ "query": "SELECT Id, City__c, Country__c, GeoY__c, GeoX__c, PostalCode__c, Street__c, Extension__c, HouseNo__c, FlatNo__c, Floor__c FROM SCInstalledBaseLocation__c WHERE Country__c = '{country}' limit 3" },{ "sobject": "SCInstalledBase__c", - "query": "SELECT Id, Name, CommissioningDate__c,InstallationDate__c,ProductEnergy__c, ProductUnitClass__c,ArticleNo__c,SerialNo__c, SerialNoException__c, ProductUnitType__c, InstalledBaseLocation__c FROM SCInstalledBase__c WHERE Country__c = '{country}' limit 3" + "query": "SELECT Id, Name, CommissioningDate__c,InstallationDate__c,ProductEnergy__c, ProductUnitClass__c,ArticleNo__c,SerialNo__c, SerialNoException__c, ProductUnitType__c, InstalledBaseLocation__c, WarrantyDuration__c, GuaranteeStandard__c, GuaranteeExtended__c FROM SCInstalledBase__c WHERE Country__c = '{country}' limit 3" },{ "sobject": "Asset", "query": "SELECT Id, Serialnumber FROM Asset WHERE Location.ParentLocation.Name LIKE '%{country}'" @@ -39,6 +39,10 @@ },{ "sobject": "SCContract__c", "query": "SELECT id, name, Template__c, status__c, Brand__r.Name, Country__c, Runtime__c, EndDate__c, StartDate__c, Account__c, AccountOwner__c, IoT_Registration_Status__c FROM SCContract__c WHERE Template__c != null AND EndDate__c >= TODAY AND Country__c = '{country}' limit 3" + },{ + "sobject": "WarrantyTerm", + "useREST": true, + "query": "SELECT Id, WarrantyTermName, WarrantyDuration, WarrantyType, pricebook2.country__c FROM WarrantyTerm WHERE Pricebook2Id = null OR pricebook2.Country__c = '{country}'" } ] } \ No newline at end of file diff --git a/prepared_steps/2_transform_via_script/TransformScript.py b/prepared_steps/2_transform_via_script/TransformScript.py index 4ae200c..95017f3 100644 --- a/prepared_steps/2_transform_via_script/TransformScript.py +++ b/prepared_steps/2_transform_via_script/TransformScript.py @@ -40,10 +40,11 @@ read_df_address_iot = pd.read_csv('../1_extract_data/results/Address.csv', heade read_df_location_iot = pd.read_csv('../1_extract_data/results/ParentLocation.csv', header=0, keep_default_na=False, dtype=str) read_df_servicecontracttemplates = pd.read_csv('../1_extract_data/results/ContractTemplates.csv', header=0, keep_default_na=False, dtype=str) read_df_servicecontracts = pd.read_csv('../1_extract_data/results/SCContract__c.csv', header=0, keep_default_na=False, dtype=str) +read_df_warrantyterm = pd.read_csv('../1_extract_data/results/WarrantyTerm.csv', header=0, keep_default_na=False, dtype=str) # Columns for reindexing reindex_columns = ['Id','City__c','Country__c','GeoY__c','GeoX__c','PostalCode__c','Street__c','Extension__c','HouseNo__c','FlatNo__c','Floor__c'] -reindex_columns_ib = ['Id','Name','CommissioningDate__c','InstallationDate__c','ProductEnergy__c','ProductUnitClass__c','ArticleNo__c','SerialNo__c','SerialNoException__c','ProductUnitType__c','InstalledBaseLocation__c'] +reindex_columns_ib = ['Id', 'Name', 'CommissioningDate__c', 'InstallationDate__c', 'ProductEnergy__c', 'ProductUnitClass__c', 'ArticleNo__c', 'SerialNo__c', 'SerialNoException__c', 'ProductUnitType__c', 'InstalledBaseLocation__c', 'WarrantyDuration__c', 'GuaranteeStandard__c', 'GuaranteeExtended__c'] reindex_columns_product2 = ['Id','Main_Product_Group__c','Family','MaterialType__c','Name','Product_Code__c','ProductCode','EAN_Product_Code__c'] reindex_columns_ibr = ['Id', 'InstalledBaseLocation__c', 'Role__c', 'ValidFrom__c', 'ValidTo__c', 'Account__c'] reindex_columns_pricelist = ['Id', 'Name', 'Brand__r.Name', 'Country__c'] @@ -53,6 +54,7 @@ reindex_columns_address_iot = ['Id', 'Country', 'CountryCode', 'Street', 'City', reindex_columns_location_iot = ['Id', 'Name'] reindex_columns_servicecontracttemplates = ['Id', 'Name', 'TemplateName__c', 'Status__c', 'Brand__r.Name', 'Country__c', 'Runtime__c'] reindex_columns_servicecontracts = ['Id', 'Name', 'Template__c', 'Status__c', 'Brand__r.Name', 'Country__c', 'Runtime__c', 'EndDate__c', 'StartDate__c', 'Account__c', 'AccountOwner__c', 'IoT_Registration_Status__c'] +reindex_columns_warrantyterm = ['Id', 'WarrantyTermName', 'WarrantyDuration', 'WarrantyType', 'Pricebook2'] # Reindex the columns to match the desired format df = read_df.reindex(reindex_columns, axis=1) @@ -66,6 +68,7 @@ df_address_iot = read_df_address_iot.reindex(reindex_columns_address_iot, axis=1 df_location_iot = read_df_location_iot.reindex(reindex_columns_location_iot, axis=1) df_servicecontracttemplates = read_df_servicecontracttemplates.reindex(reindex_columns_servicecontracttemplates, axis=1) df_servicecontract = read_df_servicecontracts.reindex(reindex_columns_servicecontracts, axis=1) +df_warrantyterm = read_df_warrantyterm.reindex(reindex_columns_warrantyterm, axis=1) ##--------------------------------------------------------------------------## ## Update for IoT Addresses and Locations @@ -112,30 +115,6 @@ df['PKey__c'] = ( df['Country__c'].astype(str) ) -# Merge df_ib with df including additional columns -merged_df_ib = pd.merge(df_ib, - df[['Id', 'PKey__c', 'Extension__c', 'FlatNo__c', 'Floor__c']], - left_on='InstalledBaseLocation__c', - right_on='Id', - how='left') - -print(merged_df_ib.columns) - -# If there are missing values (no match found), you can fill them with a placeholder -#merged_df_ib['PKey__c'].fillna('Not Found', inplace=True) -#merged_df_ib = merged_df_ib['PKey__c'].fillna('Not Found') - -merged_df_ib['PKey__c'] = ( - merged_df_ib['PKey__c'].astype(str) + ';' + - merged_df_ib['Extension__c'].astype(str) + ';' + - merged_df_ib['FlatNo__c'].astype(str) + ';' + - merged_df_ib['Floor__c'].astype(str) -) - -merged_df_ib = merged_df_ib.drop('Extension__c', axis=1) -merged_df_ib = merged_df_ib.drop('FlatNo__c', axis=1) -merged_df_ib = merged_df_ib.drop('Floor__c', axis=1) - ## 1. Address.csv # Columns needed for Address table based on the input CSV structure address_columns = ['City__c', 'Country__c', @@ -224,6 +203,84 @@ child_df['IsInventoryLocation'] = 'false' child_df['IsMobile'] = 'false' child_df['LocationType'] = 'Site' +##--------------------------------------------------------------------------## +## Asset, AssociatedLocation, Asset Warranty +##--------------------------------------------------------------------------## + +# Merge df_ib with df including additional columns +merged_df_ib = pd.merge(df_ib, + df[['Id', 'PKey__c', 'Extension__c', 'FlatNo__c', 'Floor__c']], + left_on='InstalledBaseLocation__c', + right_on='Id', + how='left') + +print(merged_df_ib.columns) + +#Fetching data for standard warranty +df_assetwarranty_standard = df_ib[['Id', 'InstallationDate__c', 'GuaranteeStandard__c']] + +# Filter df_warrantyterm to get only standard warranty type rows +standard_warranty = df_warrantyterm[df_warrantyterm['WarrantyType'] == 'Standard'] + +# Add the warranty term ID to df_assetwarranty_standard +df_assetwarranty_standard['WarrantyTermId'] = standard_warranty['Id'].iloc[0] +print(df_assetwarranty_standard) + +# Rename columns for asset warranty +df_assetwarranty_standard.columns = ['Asset.PKey__c', 'StartDate', 'EndDate', 'WarrantyTerm.Id'] + +#Fetching data for extended warranty where GuaranteeExtended__c is filled and different from GuaranteeStandard__c +df_assetwarranty_extended = df_ib[ + (df_ib['GuaranteeExtended__c'].notna()) & + (df_ib['GuaranteeExtended__c'] != df_ib['GuaranteeStandard__c']) +][['Id', 'GuaranteeExtended__c', 'WarrantyDuration__c']] + +if(not df_assetwarranty_extended.empty): + # Calculate start date for extended warranty based on warranty duration + df_assetwarranty_extended['StartDate'] = pd.to_datetime(df_assetwarranty_extended['GuaranteeExtended__c']) - pd.to_timedelta(df_assetwarranty_extended['WarrantyDuration__c'].astype(float) * 30, unit='D') + + # Filter df_warrantyterm to get only extended warranty type rows + # Filter for extended warranty and matching warranty duration + extended_warranty = df_warrantyterm[(df_warrantyterm['WarrantyType'] == 'Extended') & + (df_warrantyterm['WarrantyDuration'] == df_assetwarranty_extended['WarrantyDuration__c'].iloc[0])] + + # If multiple or no warranty terms found, set WarrantyTermId to empty + if len(extended_warranty) != 1: + df_assetwarranty_extended['WarrantyTermId'] = '' + else: + df_assetwarranty_extended['WarrantyTermId'] = extended_warranty['Id'].iloc[0] + + df_assetwarranty_extended = df_assetwarranty_extended.drop('WarrantyDuration__c', axis=1) + + print(df_assetwarranty_extended) + + # Rename columns for asset warranty + df_assetwarranty_extended.columns = ['Asset.PKey__c', 'StartDate', 'EndDate', 'WarrantyTerm.Id'] + + # Add them to a merged df for saving purposes + df_assetwarranty_save = pd.concat(df_assetwarranty_standard, df_assetwarranty_extended) +else: + df_assetwarranty_save = df_assetwarranty_standard + +merged_df_ib = merged_df_ib.drop('GuaranteeStandard__c', axis=1) +merged_df_ib = merged_df_ib.drop('GuaranteeExtended__c', axis=1) +merged_df_ib = merged_df_ib.drop('WarrantyDuration__c', axis=1) + +# If there are missing values (no match found), you can fill them with a placeholder +#merged_df_ib['PKey__c'].fillna('Not Found', inplace=True) +#merged_df_ib = merged_df_ib['PKey__c'].fillna('Not Found') + +merged_df_ib['PKey__c'] = ( + merged_df_ib['PKey__c'].astype(str) + ';' + + merged_df_ib['Extension__c'].astype(str) + ';' + + merged_df_ib['FlatNo__c'].astype(str) + ';' + + merged_df_ib['Floor__c'].astype(str) +) + +merged_df_ib = merged_df_ib.drop('Extension__c', axis=1) +merged_df_ib = merged_df_ib.drop('FlatNo__c', axis=1) +merged_df_ib = merged_df_ib.drop('Floor__c', axis=1) + ## 4. Assets.csv #ArticleNo__c,CommissioningDate__c,Id,InstallationDate__c,InstalledBaseLocation__c,InstalledBaseLocation__r.Extension__c,InstalledBaseLocation__r.FlatNo__c,InstalledBaseLocation__r.Floor__c,InstalledBaseLocation__r.Id,Name,ProductEnergy__c,ProductUnitClass__c,ProductUnitType__c,SerialNo__c,SerialNoException__c @@ -468,13 +525,12 @@ df_servicecontract['Pricebook2.Name'] = ( "SERVICE" ) -df_servicecontract = df_servicecontract.drop('Name', axis=1) df_servicecontract = df_servicecontract.drop('Brand__r.Name', axis=1) -df_servicecontract.columns = ['PKey__c', 'TemplateId__r.PKey__c', 'Status', 'BillingCountryCode', 'Term', 'EndDate', 'StartDate', 'AccountId', 'Service_Recipient__c', 'IoT_Registration_Status__c', 'Pricebook2.Name'] +df_servicecontract.columns = ['PKey__c', 'Name', 'TemplateId__r.PKey__c', 'Status', 'BillingCountryCode', 'Term', 'EndDate', 'StartDate', 'AccountId', 'Service_Recipient__c', 'IoT_Registration_Status__c', 'Pricebook2.Name'] df_servicecontract['IoT_Registration_Status__c'] = df_servicecontract['IoT_Registration_Status__c'].replace('', 'Open') -df_servicecontract['Name'] = df_servicecontract['PKey__c'] +#df_servicecontract['Name'] = df_servicecontract['PKey__c'] df_servicecontract['TemplateCountry__c'] = df_servicecontract['BillingCountryCode'] #df_servicecontract = df_servicecontract.drop('TemplateId__r.PKey__c', axis=1) @@ -498,6 +554,7 @@ df_pricelistitem.to_csv('../12_insert_pricebook2_and_pricebookentries/PricebookE merged_df_location_iot.to_csv('../3_update_address_and_location_data_for_migration/Location.csv', index=False) df_servicecontracttemplates.to_csv('../13_insert_servicecontracttemplates_dummies/ServiceContract.csv', index=False) df_servicecontract.to_csv('../15_insert_servicecontract/ServiceContract_beforetransform.csv', index=False) +df_assetwarranty_save.to_csv('../8_upsert_assets/AssetWarranty.csv', index=False) ## end mapping print('Data has been successfully transformed and saved to CSV files.') \ No newline at end of file diff --git a/sf_auth.py b/sf_auth.py index 36536ed..b0ee50f 100644 --- a/sf_auth.py +++ b/sf_auth.py @@ -5,20 +5,17 @@ from simple_salesforce import Salesforce def get_credentials(context): """ Get credentials for a given context from the .env file - - Args: - context (str): Context name (e.g., 'qa2', 'prod') - - Returns: - dict: Credentials dictionary with username, password, and security_token """ context = context.upper() - # Initialize credentials dictionary + # Initialize credentials dictionary with all possible auth methods credentials = { 'USERNAME': None, 'PASSWORD': None, - 'SECURITY_TOKEN': None + 'SECURITY_TOKEN': None, + 'ORGANIZATIONID': None, + 'CONSUMER_KEY': None, + 'PRIVATEKEY_FILE': None } if context != 'PROD': @@ -29,9 +26,7 @@ def get_credentials(context): load_dotenv(env_file) # Load all environment variables - env_vars = os.environ - - for key, value in env_vars.items(): + for key, value in os.environ.items(): if f'{context}_SF_' in key: credential_key = key.split(f'{context}_SF_')[-1].upper() credentials[credential_key] = value @@ -40,31 +35,46 @@ def get_credentials(context): def get_sf_connection(context): """ - Create Salesforce connection based on context - - Args: - context (str): Context name (e.g., 'qa2', 'prod') - - Returns: - Salesforce: Authenticated Salesforce connection + Create Salesforce connection based on context, trying JWT first """ credentials = get_credentials(context) - if not all(credentials.values()): - raise ValueError(f"Missing credentials for context: {context}") + # Common parameters for all auth methods + sf_params = { + 'username': credentials['USERNAME'], + 'version': '62.0' + } - if context.lower() == 'prod': - return Salesforce( - username=credentials['USERNAME'], - password=credentials['PASSWORD'], - security_token=credentials['SECURITY_TOKEN'], - version='62.0' - ) - else: - return Salesforce( - username=credentials['USERNAME'], - password=credentials['PASSWORD'], - security_token=credentials['SECURITY_TOKEN'], - domain=credentials['DOMAIN'], - version='62.0' - ) \ No newline at end of file + # Add domain for non-prod environments + if context.lower() != 'prod': + sf_params['domain'] = 'test' + + try: + # Try JWT authentication first + if credentials['CONSUMER_KEY'] and credentials['PRIVATEKEY_FILE']: + print(f"Attempting JWT authentication for {context}") + sf_params.update({ + 'consumer_key': credentials['CONSUMER_KEY'], + 'privatekey_file': credentials['PRIVATEKEY_FILE'] + }) + return Salesforce(**sf_params) + + # Fall back to password + security token or org ID + elif credentials['PASSWORD']: + print(f"Falling back to password authentication for {context}") + sf_params['password'] = credentials['PASSWORD'] + + if credentials['ORGANIZATIONID']: + sf_params['organizationId'] = credentials['ORGANIZATIONID'] + elif credentials['SECURITY_TOKEN']: + sf_params['security_token'] = credentials['SECURITY_TOKEN'] + else: + raise ValueError("Neither security token nor organization ID provided") + + return Salesforce(**sf_params) + + else: + raise ValueError("No valid authentication credentials provided") + + except Exception as e: + raise ConnectionError(f"Failed to connect to Salesforce {context}: {str(e)}") \ No newline at end of file