asset warranty saves

This commit is contained in:
Rene Kaßeböhmer
2025-05-16 12:13:54 +02:00
parent d4f944dca4
commit d5742d7d30
7 changed files with 204 additions and 104 deletions

View File

@ -5,20 +5,17 @@ from simple_salesforce import Salesforce
def get_credentials(context):
"""
Get credentials for a given context from the .env file
Args:
context (str): Context name (e.g., 'qa2', 'prod')
Returns:
dict: Credentials dictionary with username, password, and security_token
"""
context = context.upper()
# Initialize credentials dictionary
# Initialize credentials dictionary with all possible auth methods
credentials = {
'USERNAME': None,
'PASSWORD': None,
'SECURITY_TOKEN': None
'SECURITY_TOKEN': None,
'ORGANIZATIONID': None,
'CONSUMER_KEY': None,
'PRIVATEKEY_FILE': None
}
if context != 'PROD':
@ -29,9 +26,7 @@ def get_credentials(context):
load_dotenv(env_file)
# Load all environment variables
env_vars = os.environ
for key, value in env_vars.items():
for key, value in os.environ.items():
if f'{context}_SF_' in key:
credential_key = key.split(f'{context}_SF_')[-1].upper()
credentials[credential_key] = value
@ -40,31 +35,46 @@ def get_credentials(context):
def get_sf_connection(context):
"""
Create Salesforce connection based on context
Args:
context (str): Context name (e.g., 'qa2', 'prod')
Returns:
Salesforce: Authenticated Salesforce connection
Create Salesforce connection based on context, trying JWT first
"""
credentials = get_credentials(context)
if not all(credentials.values()):
raise ValueError(f"Missing credentials for context: {context}")
# Common parameters for all auth methods
sf_params = {
'username': credentials['USERNAME'],
'version': '62.0'
}
if context.lower() == 'prod':
return Salesforce(
username=credentials['USERNAME'],
password=credentials['PASSWORD'],
security_token=credentials['SECURITY_TOKEN'],
version='62.0'
)
else:
return Salesforce(
username=credentials['USERNAME'],
password=credentials['PASSWORD'],
security_token=credentials['SECURITY_TOKEN'],
domain=credentials['DOMAIN'],
version='62.0'
)
# Add domain for non-prod environments
if context.lower() != 'prod':
sf_params['domain'] = 'test'
try:
# Try JWT authentication first
if credentials['CONSUMER_KEY'] and credentials['PRIVATEKEY_FILE']:
print(f"Attempting JWT authentication for {context}")
sf_params.update({
'consumer_key': credentials['CONSUMER_KEY'],
'privatekey_file': credentials['PRIVATEKEY_FILE']
})
return Salesforce(**sf_params)
# Fall back to password + security token or org ID
elif credentials['PASSWORD']:
print(f"Falling back to password authentication for {context}")
sf_params['password'] = credentials['PASSWORD']
if credentials['ORGANIZATIONID']:
sf_params['organizationId'] = credentials['ORGANIZATIONID']
elif credentials['SECURITY_TOKEN']:
sf_params['security_token'] = credentials['SECURITY_TOKEN']
else:
raise ValueError("Neither security token nor organization ID provided")
return Salesforce(**sf_params)
else:
raise ValueError("No valid authentication credentials provided")
except Exception as e:
raise ConnectionError(f"Failed to connect to Salesforce {context}: {str(e)}")