80 lines
2.6 KiB
Python
80 lines
2.6 KiB
Python
import os
|
|
from dotenv import load_dotenv, find_dotenv
|
|
from simple_salesforce import Salesforce
|
|
|
|
def get_credentials(context):
|
|
"""
|
|
Get credentials for a given context from the .env file
|
|
"""
|
|
context = context.upper()
|
|
|
|
# Initialize credentials dictionary with all possible auth methods
|
|
credentials = {
|
|
'USERNAME': None,
|
|
'PASSWORD': None,
|
|
'SECURITY_TOKEN': None,
|
|
'ORGANIZATIONID': None,
|
|
'CONSUMER_KEY': None,
|
|
'PRIVATEKEY_FILE': None
|
|
}
|
|
|
|
if context != 'PROD':
|
|
credentials['DOMAIN'] = 'test'
|
|
|
|
# Load the .env file
|
|
env_file = find_dotenv(".env")
|
|
load_dotenv(env_file)
|
|
|
|
# Load all environment variables
|
|
for key, value in os.environ.items():
|
|
if f'{context}_SF_' in key:
|
|
credential_key = key.split(f'{context}_SF_')[-1].upper()
|
|
credentials[credential_key] = value
|
|
|
|
return credentials
|
|
|
|
def get_sf_connection(context):
|
|
"""
|
|
Create Salesforce connection based on context, trying JWT first
|
|
"""
|
|
credentials = get_credentials(context)
|
|
|
|
# Common parameters for all auth methods
|
|
sf_params = {
|
|
'username': credentials['USERNAME'],
|
|
'version': '62.0'
|
|
}
|
|
|
|
# Add domain for non-prod environments
|
|
if context.lower() != 'prod':
|
|
sf_params['domain'] = 'test'
|
|
|
|
try:
|
|
# Try JWT authentication first
|
|
if credentials['CONSUMER_KEY'] and credentials['PRIVATEKEY_FILE']:
|
|
print(f"Attempting JWT authentication for {context}")
|
|
sf_params.update({
|
|
'consumer_key': credentials['CONSUMER_KEY'],
|
|
'privatekey_file': credentials['PRIVATEKEY_FILE']
|
|
})
|
|
return Salesforce(**sf_params)
|
|
|
|
# Fall back to password + security token or org ID
|
|
elif credentials['PASSWORD']:
|
|
print(f"Falling back to password authentication for {context}")
|
|
sf_params['password'] = credentials['PASSWORD']
|
|
|
|
if credentials['ORGANIZATIONID']:
|
|
sf_params['organizationId'] = credentials['ORGANIZATIONID']
|
|
elif credentials['SECURITY_TOKEN']:
|
|
sf_params['security_token'] = credentials['SECURITY_TOKEN']
|
|
else:
|
|
raise ValueError("Neither security token nor organization ID provided")
|
|
|
|
return Salesforce(**sf_params)
|
|
|
|
else:
|
|
raise ValueError("No valid authentication credentials provided")
|
|
|
|
except Exception as e:
|
|
raise ConnectionError(f"Failed to connect to Salesforce {context}: {str(e)}") |