data extract switch to simple_salesforce bulkv2

This commit is contained in:
Rene Kaßeböhmer
2025-04-09 11:21:08 +02:00
parent 2e6d82d9cc
commit 19eacdf2f5
4 changed files with 115 additions and 28 deletions

View File

@ -1 +1 @@
sf sfdmu run --sourceusername rene.kasseboehmer@vaillant.de --targetusername csvfile
python .\extract_via_simple_salesforce.py --context prod

View File

@ -4,10 +4,27 @@
# --output_path extracted_data
import os
import itertools
import threading
import time
import sys
import json
import pandas as pd
from dotenv import load_dotenv, find_dotenv
from simple_salesforce import Salesforce
done = False
#here is the animation
def animate():
for c in itertools.cycle(['|', '/', '-', '\\']):
if done:
break
sys.stdout.write('\rloading ' + c)
sys.stdout.flush()
time.sleep(0.1)
sys.stdout.write('\rDone! ')
def get_credentials(context):
"""
Get credentials for a given context from the .env file
@ -39,12 +56,11 @@ def get_credentials(context):
#print(f'{context}_SF_', key, value)
if f'{context}_SF_' in key:
credential_key = key.split(f'{context}_SF_')[-1].upper()
print(credential_key)
credentials[credential_key] = value
return credentials
def extract_data(object_id, output_path='output', context='qa2'):
def extract_data(object_id, query, output_path='output', context='qa2'):
"""
Extract data using Bulk API and save as CSV
@ -54,10 +70,12 @@ def extract_data(object_id, output_path='output', context='qa2'):
context (str): Context name for credentials (e.g., 'qa2', 'prod')
"""
try:
global done
done = False
# Get credentials based on context
credentials = get_credentials(context)
print(credentials)
if not all(credentials.values()):
raise ValueError(f"Missing credentials for context: {context}")
@ -66,19 +84,32 @@ def extract_data(object_id, output_path='output', context='qa2'):
username=credentials['USERNAME'],
password=credentials['PASSWORD'],
security_token=credentials['SECURITY_TOKEN'],
domain=credentials['DOMAIN']
domain=credentials['DOMAIN'],
version='62.0'
)
# Create a simple query for the desired object
soql_query = f"""
SELECT Id, Name
FROM SCInstalledBase__c
WHERE Country__c = 'NL' limit 1000
"""
#soql_query = f"""
# SELECT Id, City__c, Country__c, GeoY__c, GeoX__c, PostalCode__c, Street__c, Extension__c, HouseNo__c, FlatNo__c, Floor__c FROM SCInstalledBaseLocation__c WHERE Country__c = 'NL'
#"""
sf.bulk2.__getattr__("SCInstalledBase__c").download(
soql_query, path="./", max_records=200000
t = threading.Thread(target=animate)
t.start()
#sf.bulk2.__getattr__("SCInstalledBaseLocation__c").download(
# soql_query, path="./", max_records=2000000
#)
results = sf.bulk2.__getattr__(object_id).query(
query, max_records=2000000
)
print(f'Extracting: {object_id}')
for i, data in enumerate(results):
with open(f"results/{object_id}.csv", "w", encoding="utf-8") as bos:
bos.write(data)
time.sleep(10)
done = True
"""
# Execute the Bulk query job
@ -111,6 +142,7 @@ def extract_data(object_id, output_path='output', context='qa2'):
return csv_file
"""
except Exception as e:
done = True
raise ValueError(f'Error extracting data: {str(e)}')
if __name__ == '__main__':
@ -120,18 +152,42 @@ if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Extract Salesforce data via Bulk API')
parser.add_argument('--context', type=str, required=True,
help='Context name (e.g., "qa2", "prod")')
parser.add_argument('--object_id', type=str, required=True,
help='Account, SCInstalledBaseLocation__c, SCInstalledBase__c, Product2')
parser.add_argument('--output_path', type=str, required=False,
help='./')
parser.add_argument('--sobjects', type=str, required=False,
help='Define which sobjects from queries.json to extract. (--sobjects Account,Contact,CustomObject__c) Otherwise all from queries.json will be extracted')
args = parser.parse_args()
# Extract data using parameters
# Load queries from JSON file
with open('queries.json') as f:
queries = json.load(f)['queries']
# Process each query in sequence
selected_sobjects = []
if args.sobjects:
selected_sobjects = [sobject.strip() for sobject in args.sobjects.split(',')]
for query_def in queries:
sobject = query_def['sobject']
query_str = query_def['query']
# Skip if --sobjects is provided and current sobject isn't in the list
if selected_sobjects and sobject not in selected_sobjects:
print(f'Skipping {sobject} as it is not specified in --sobjects')
continue
try:
print(f'\nRunning query for {sobject}:')
output_file = extract_data(
object_id=args.object_id,
object_id=sobject,
query=query_str,
output_path=args.output_path,
context=args.context
)
print(f'File saved at: {output_file}')
except Exception as e:
print(f'Error running query for {sobject}: {str(e)}')
# Optionally, you can choose to skip failed queries or exit here
print('\nExtraction complete.')

View File

@ -0,0 +1,26 @@
{ "queries":
[
{
"sobject": "SCInstalledBaseLocation__c",
"query": "SELECT Id, City__c, Country__c, GeoY__c, GeoX__c, PostalCode__c, Street__c, Extension__c, HouseNo__c, FlatNo__c, Floor__c FROM SCInstalledBaseLocation__c WHERE Country__c = 'NL'"
},{
"sobject": "SCInstalledBase__c",
"query": "SELECT Id, Name, CommissioningDate__c,InstallationDate__c,ProductEnergy__c, ProductUnitClass__c,ArticleNo__c,SerialNo__c, SerialNoException__c, ProductUnitType__c, InstalledBaseLocation__c FROM SCInstalledBase__c WHERE Country__c = 'NL'"
},{
"sobject": "Asset",
"query": "SELECT Id, Serialnumber FROM Asset WHERE Location.ParentLocation.Name LIKE '%NL'"
},{
"sobject": "Address",
"query": "SELECT Id, Country, CountryCode, Street, City, ParentId, PostalCode FROM Address WHERE CountryCode = 'NL'"
},{
"sobject": "ParentLocation",
"query": "SELECT Id, Name FROM Location WHERE Name LIKE '%NL'"
},{
"sobject": "ChildLocation",
"query": "SELECT Id, ParentLocationId, Name FROM Location WHERE ParentLocation.Name LIKE '%NL'"
},{
"sobject": "Product2",
"query": "SELECT Id, Main_Product_Group__c, Family, MaterialType__c, Name, Product_Code__c, ProductCode, EAN_Product_Code__c FROM Product2"
}
]
}

View File

@ -5,9 +5,9 @@ country_mapping = {
}
# Read the input CSV file, assuming the second row is the header
read_df = pd.read_csv('../1_extract_data/target/SCInstalledBaseLocation__c_upsert_target.csv', header=0, keep_default_na=False, dtype=str)
read_df_ib = pd.read_csv('../1_extract_data/target/object-set-2/SCInstalledBase__c_upsert_target.csv', header=0, keep_default_na=False, dtype=str)
read_df_product2 = pd.read_csv('../1_extract_data/target/object-set-6/Product2_upsert_target.csv', header=0, keep_default_na=False, dtype=str)
read_df = pd.read_csv('../1_extract_data/results/SCInstalledBaseLocation__c.csv', header=0, keep_default_na=False, dtype=str)
read_df_ib = pd.read_csv('../1_extract_data/results/SCInstalledBase__c.csv', header=0, keep_default_na=False, dtype=str)
read_df_product2 = pd.read_csv('../1_extract_data/results/Product2.csv', header=0, keep_default_na=False, dtype=str)
for row in read_df.to_dict('records'):
try:
@ -17,11 +17,16 @@ for row in read_df.to_dict('records'):
print(f'KeyError: {e}')
# Columns for reindexing
reindex_columns = ['City__c','Country__c','Extension__c','FlatNo__c','Floor__c','GeoX__c','GeoY__c','HouseNo__c','Id','PostalCode__c','Street__c']
#"Id","City__c","Country__c","GeoY__c","GeoX__c","PostalCode__c","Street__c","Extension__c","HouseNo__c","FlatNo__c","Floor__c"
reindex_columns = ['Id','City__c','Country__c','GeoY__c','GeoX__c','PostalCode__c','Street__c','Extension__c','HouseNo__c','FlatNo__c','Floor__c']
# ArticleNo__c,CommissioningDate__c,Id,InstallationDate__c,InstalledBaseLocation__c,InstalledBaseLocation__r.Id,Name,ProductEnergy__c,ProductUnitClass__c,ProductUnitType__c,SerialNo__c,SerialNoException__c
reindex_columns_ib = ['ArticleNo__c','CommissioningDate__c','Id','InstallationDate__c','InstalledBaseLocation__c','InstalledBaseLocation__r.Id','Name','ProductEnergy__c','ProductUnitClass__c','ProductUnitType__c','SerialNo__c','SerialNoException__c']
#"Id","Name","CommissioningDate__c","InstallationDate__c","ProductEnergy__c","ProductUnitClass__c","ArticleNo__c","SerialNo__c","SerialNoException__c","ProductUnitType__c","InstalledBaseLocation__c"
reindex_columns_ib = ['Id','Name','CommissioningDate__c','InstallationDate__c','ProductEnergy__c','ProductUnitClass__c','ArticleNo__c','SerialNo__c','SerialNoException__c','ProductUnitType__c','InstalledBaseLocation__c']
#reindex_columns_ib = ['ArticleNo__c','CommissioningDate__c','Id','InstallationDate__c','InstalledBaseLocation__c','InstalledBaseLocation__r.Id','Name','ProductEnergy__c','ProductUnitClass__c','ProductUnitType__c','SerialNo__c','SerialNoException__c']
# EAN_Product_Code__c,Family,Id,Main_Product_Group__c,MaterialType__c,Name,Product_Code__c,ProductCode
reindex_columns_product2 = ['EAN_Product_Code__c','Family','Id','Main_Product_Group__c','MaterialType__c','Name','Product_Code__c','ProductCode']
#"Id","Main_Product_Group__c","Family","MaterialType__c","Name","Product_Code__c","ProductCode","EAN_Product_Code__c"
reindex_columns_product2 = ['Id','Main_Product_Group__c','Family','MaterialType__c','Name','Product_Code__c','ProductCode','EAN_Product_Code__c']
#reindex_columns_product2 = ['EAN_Product_Code__c','Family','Id','Main_Product_Group__c','MaterialType__c','Name','Product_Code__c','ProductCode']
# Reindex the columns to match the desired format
df = read_df.reindex(reindex_columns, axis=1)
@ -153,7 +158,7 @@ child_df['LocationType'] = 'Site'
#ArticleNo__c,CommissioningDate__c,Id,InstallationDate__c,InstalledBaseLocation__c,InstalledBaseLocation__r.Extension__c,InstalledBaseLocation__r.FlatNo__c,InstalledBaseLocation__r.Floor__c,InstalledBaseLocation__r.Id,Name,ProductEnergy__c,ProductUnitClass__c,ProductUnitType__c,SerialNo__c,SerialNoException__c
merged_df_ib = merged_df_ib.drop('InstalledBaseLocation__c', axis=1)
merged_df_ib = merged_df_ib.drop('InstalledBaseLocation__r.Id', axis=1)
#merged_df_ib = merged_df_ib.drop('InstalledBaseLocation__r.Id', axis=1)
merged_df_ib = merged_df_ib.drop('Id_y', axis=1)
print(merged_df_ib.columns)
merged_df_ib.columns = ['Product2.Product_Code__c', 'FSL_1st_Ignition_Date__c', 'Id', 'InstallDate', 'Name', 'Kind_of_Energy__c', 'Kind_of_Installation__c', 'Main_Product_Group__c', 'SerialNumber', 'Serialnumber_Exception__c', 'Location.ExternalReference']