change to simple_salesforce for data load

This commit is contained in:
Rene Kaßeböhmer
2025-04-08 16:02:58 +02:00
parent 5eb314fbf7
commit 2e6d82d9cc
5 changed files with 188 additions and 11 deletions

View File

@ -3,12 +3,12 @@
"useSeparatedCSVFiles": true,
"pollingQueryTimeoutMs": 1000000,
"bulkApiVersion": "2.0",
"parallelRestJobs": 2,
"queryBulkApiThreshold ": 100,
"objectSets": [
{
"objects": [
{
"query": "SELECT Id, City__c, Country__c, GeoY__c, GeoX__c, PostalCode__c, Street__c, Extension__c, HouseNo__c, FlatNo__c, Floor__c FROM SCInstalledBaseLocation__c WHERE Country__c = 'NL'",
"query": "SELECT Id, City__c, Country__c, GeoY__c, GeoX__c, PostalCode__c, Street__c, Extension__c, HouseNo__c, FlatNo__c, Floor__c FROM SCInstalledBaseLocation__c WHERE Country__c = 'NL' limit 1000",
"externalId": "Name",
"operation": "Readonly"
}
@ -19,7 +19,18 @@
"query": "SELECT Id, Name, CommissioningDate__c,InstallationDate__c,ProductEnergy__c, ProductUnitClass__c,ArticleNo__c,SerialNo__c, SerialNoException__c, ProductUnitType__c, InstalledBaseLocation__c FROM SCInstalledBase__c WHERE Country__c = 'NL'",
"externalId": "Name",
"operation": "Readonly",
"excludedFromUpdateFields": ["InstalledBaseLocation__c"]
"master":true,
"excludedFromUpdateFields": ["InstalledBaseLocation__c"],
"skipRecordsComparison": true,
"parallelRestJobs": 4,
"restApiBatchSize": 9500,
"fieldMapping": [
{
"sourceField": "InstalledBaseLocation__c",
"targetField": "Id",
"targetObject": "SCInstalledBaseLocation__c"
}
]
}
]
},{
@ -33,10 +44,10 @@
},{
"objects": [
{
"query": "SELECT Id, Country, CountryCode, Street, City, ParentId PostalCode FROM Address WHERE CountryCode = 'NL'",
"query": "SELECT Id, Country, CountryCode, Street, City, ParentId, PostalCode FROM Address WHERE CountryCode = 'NL'",
"externalId": "Name",
"operation": "Readonly",
"excludedFields": ["ParentId"]
"excludedFromUpdateFields": ["ParentId"]
}
]
@ -48,6 +59,14 @@
"operation": "Readonly"
}
]
},{
"objects": [
{
"query": "SELECT Id, Main_Product_Group__c, Family, MaterialType__c, Name, Product_Code__c, ProductCode, EAN_Product_Code__c FROM Product2",
"externalId": "Name",
"operation": "Readonly"
}
]
}
]
}

View File

@ -0,0 +1,137 @@
# python extract_via_simple_salesforce.py \
# --context qa2 \
# --object_id Account \
# --output_path extracted_data
import os
import pandas as pd
from dotenv import load_dotenv, find_dotenv
from simple_salesforce import Salesforce
def get_credentials(context):
"""
Get credentials for a given context from the .env file
Args:
context (str): Context name (e.g., 'qa2', 'prod')
Returns:
dict: Credentials dictionary with username, password, and security_token
"""
context = context.upper()
# Initialize credentials dictionary
credentials = {
'USERNAME': None,
'PASSWORD': None,
'SECURITY_TOKEN': None,
'DOMAIN': 'test'
}
# Load the .env file explicitly from one directory above
env_file = find_dotenv("../.env")
load_dotenv(env_file)
# Load all environment variables
env_vars = os.environ
for key, value in env_vars.items():
#print(f'{context}_SF_', key, value)
if f'{context}_SF_' in key:
credential_key = key.split(f'{context}_SF_')[-1].upper()
print(credential_key)
credentials[credential_key] = value
return credentials
def extract_data(object_id, output_path='output', context='qa2'):
"""
Extract data using Bulk API and save as CSV
Args:
object_id (str): Salesforce object ID
output_path (str): Path to save the output file (default 'output')
context (str): Context name for credentials (e.g., 'qa2', 'prod')
"""
try:
# Get credentials based on context
credentials = get_credentials(context)
print(credentials)
if not all(credentials.values()):
raise ValueError(f"Missing credentials for context: {context}")
# Initialize Salesforce bulk connector
sf = Salesforce(
username=credentials['USERNAME'],
password=credentials['PASSWORD'],
security_token=credentials['SECURITY_TOKEN'],
domain=credentials['DOMAIN']
)
# Create a simple query for the desired object
soql_query = f"""
SELECT Id, Name
FROM SCInstalledBase__c
WHERE Country__c = 'NL' limit 1000
"""
sf.bulk2.__getattr__("SCInstalledBase__c").download(
soql_query, path="./", max_records=200000
)
"""
# Execute the Bulk query job
job = sf.bulk2.__getattr__("SCInstalledBase__c").query(soql_query)
# Polling for job completion (might take a moment)
job_id = job['id']
while True:
status = sf.bulk.job(job_id).get()['status']
if status == 'Complete' or status == 'Closed' :
break
if status == 'Aborted':
exit(1)
if status == 'Failed':
raise ValueError(f'Job failed: {job_id}')
# Get the results
result = sf.bulk.result(job_id)
df = pd.DataFrame(result.records)
# Create output directory if it doesn't exist
os.makedirs(output_path, exist_ok=True)
# Save to CSV file
csv_file = os.path.join(output_path, f'{object_id}_data.csv')
df.to_csv(csv_file, index=False)
print(f'Successfully extracted {len(df)} records from {object_id}')
return csv_file
"""
except Exception as e:
raise ValueError(f'Error extracting data: {str(e)}')
if __name__ == '__main__':
import argparse
# Parse command-line arguments
parser = argparse.ArgumentParser(description='Extract Salesforce data via Bulk API')
parser.add_argument('--context', type=str, required=True,
help='Context name (e.g., "qa2", "prod")')
parser.add_argument('--object_id', type=str, required=True,
help='Account, SCInstalledBaseLocation__c, SCInstalledBase__c, Product2')
parser.add_argument('--output_path', type=str, required=False,
help='./')
args = parser.parse_args()
# Extract data using parameters
output_file = extract_data(
object_id=args.object_id,
output_path=args.output_path,
context=args.context
)
print(f'File saved at: {output_file}')