How to create meaningful log files for simple-salesforce bulk methods (Salesforce x Python)
This article discusses how to create meaningful error logs from simple-salesforce bulk methods.
In this example, the python script is
- connecting to Salesforce
- querying all opportunities
- setting the ‘Description’ field on all opps to ‘Testing’
- bulk upserting the opportunities
import pandas as pd from simple_salesforce import Salesforce import json from datetime import datetime def getSFConnection(): username = "SFDC_USERNAME" password = "SFDC_PASSWORD" token = "SFDC_SECURITY_TOKEN" sf = Salesforce(instance_url="https://YOUR_INSTANCE.my.salesforce.com/", username=username, password=password, security_token=token) # when connecting to a sandbox, use this to connect # sf = Salesforce(instance_url="https://YOUR_INSTANCE.my.salesforce.com/", username=username, password=password, security_token=token, domain='test') print("Connected successfully to {0}".format(sf.sf_instance)) return sf def getAllSFOpps(sf): soql_query = "SELECT Id, AccountId, Description FROM Opportunity" opps_df = pd.DataFrame(sf.query_all(soql_query)['records']) # Delete the attributes column del opps_df['attributes'] return opps_df def main(): print("===== Connecting to Salesforce =====") sf = getSFConnection() print("===== Querying Salesforce for Opportunities =====") opps_to_upsert_df = getAllSFOpps(sf) print("===== Modify Opps, Parsing =====") opps_to_upsert_df['Description'] = 'Testing' opps_parsed = json.loads(opps_to_upsert_df.to_json(orient="records")) # Upsert the records in bulk print("===== Upserting ... =====") results = sf.bulk.Opportunity.upsert(opps_parsed, 'Id', batch_size=10,use_serial=True) print("===== Finished Upsert=====") if __name__ == '__main__': main()
Bulk data load job result logs
This creates a bulk data load job in Salesforce. You can view it in Setup > Environments > Jobs > Bulk Data Load Jobs, including the logs. However, there are a few downsides:
- Logs are stored per batch, there isn’t a unified file to view all results, or all errors
- Error logs are sparse: They list the encountered error but no information on the affected record. Specifically, the record id is missing
- Depending on how many bulk data load jobs run in a day, it can be difficult to find which job in Salesforce belongs to which script execution
Here’s an example of a bulk data load job result log:
[{
"success" : true,
"created" : false,
"id" : "0063XXXXXXXXXXXXXX",
"errors" : [ ]
}, {
"success" : false,
"created" : false,
"id" : null,
"errors" : [ {
"statusCode" : "FIELD_CUSTOM_VALIDATION_EXCEPTION",
"message" : "Test Error Message.",
"fields" : [ "Test_Field__c" ]
} ]
}]
Writing better logs
There’s already one advantage when you use the simple-salesforce package for bulk methods: It returns once unified results log, instead of one for each batch.
The goal here is to write create 2 log files from it- one for successes, and one for errors. For the error log file, I want to include both the error message, the affected record id and all other fields passed into the bulk DML.
Saving success and error logs to a datetime-stamped file
The method reads the results from the bulk DML and puts it into a pandas dataframe. It filters this batch_results_df by the ‘success’ column, creating a success and an error dataframe. Then, it writes these dataframes to a local file.
def writeLogs(opps_to_upsert_df, results): print("===== Reading Results, Writing Log Files ... =====") ct = datetime.now().strftime("%m-%d-%Y-%H-%M-%S") # put batch results in dataframe batch_results_df = pd.DataFrame.from_dict(results) # create success & error dataframe & write to logs success_df = batch_results_df.loc[batch_results_df['success'] == True] print("===== Success =====") print(success_df.shape[0]) # MacOS success_df.to_csv('/Users/YOUR_USERNAME/Documents/success'+ct+'.csv', index = False) # WINDOWS #success_df.to_csv('C:\\Users\\YOUR_USERNAME\\Documents\\success'+ct+'.csv', index = False) error_df = batch_results_df.loc[batch_results_df['success'] == False] print("===== Error. =====") print(error_df.shape[0]) # MacOS error_df.to_csv('/Users/YOUR_USERNAME/Documents/error'+ct+'.csv', index = False) # WINDOWS #error_df.to_csv('C:\\Users\\YOUR_USERNAME\\Documents\\error'+ct+'.csv', index = False)
The error logs are still pretty useless since they don’t contain the id of the record that failed to upsert, or any other information from the original job.
Enriching error logs
To enrich error logs, you will need to merge them with data from the original request. The returned error logs are missing the record id, but can be matched based on index.
Applying df.reset_index(), adds an index column to both the request and the result. I use this index column to merge both data frames.
def writeLogs(opps_to_upsert_df, results): print("===== Reading Results, Writing Log Files ... =====") # making time stamp ct = datetime.now().strftime("%m-%d-%Y-%H-%M-%S") # put batch results in dataframe batch_results_df = pd.DataFrame.from_dict(results) # adding index column to bulk action results batch_results_df = batch_results_df.reset_index() # create success & error dataframe & write to logs success_df = batch_results_df.loc[batch_results_df['success'] == True] print("===== Success =====") print(success_df.shape[0]) # MacOS success_df.to_csv('/Users/YOUR_USERNAME/Documents/success'+ct+'.csv', index = False) # WINDOWS #success_df.to_csv('C:\\Users\\YOUR_USERNAME\\Documents\\success'+ct+'.csv', index = False) error_df = batch_results_df.loc[batch_results_df['success'] == False] print("===== Error. =====") print(error_df.shape[0]) #renaming id column to avoid confusing when merging error_df = error_df.rename(columns={'id': 'fail_id'}) if error_df.empty == False: # adding an index column to the original bulk request opps_to_upsert_df = opps_to_upsert_df.reset_index() # merging request & result dataframe on index column error_records_df = error_df.merge(opps_to_upsert_df, left_on="index", right_on="index", how="left") # writing to file # MacOS error_records_df.to_csv('/Users/YOUR_USERNAME/Documents/error'+ct+'.csv', index = False) # WINDOWS #error_records_df.to_csv('C:\\Users\\YOUR_USERNAME\\Documents\\error'+ct+'.csv', index = False)
Request:
[
...,
{
"index" : 12
"Id": "0063XXXXXXXXXXXXXX",
"AccountID": "0013XXXXXXXXXXXXXX",
"Description": "Testing"
},
...
]
Result:
[
...,
{
"index" : 12,
"success" : false,
"created" : false,
"id" : null,
"errors" : [ {
"statusCode" : "FIELD_CUSTOM_VALIDATION_EXCEPTION",
"message" : "Test Error Message.",
"fields" : [ "Test_Field__c" ]
} ],
...
}]
Merged
[
...,
{
"index" : 12,
"success" : false,
"created" : false,
"id" : null,
"errors" : [ {
"statusCode" : "FIELD_CUSTOM_VALIDATION_EXCEPTION",
"message" : "Test Error Message.",
"fields" : [ "Test_Field__c" ]
} ],
"Id": "0063XXXXXXXXXXXXXX",
"AccountID": "0013XXXXXXXXXXXXXX",
"Description": "Testing"
...
}]
Full Code
import pandas as pd from simple_salesforce import Salesforce import json from datetime import datetime def getSFConnection(): username = "SFDC_USERNAME" password = "SFDC_PASSWORD" token = "SFDC_SECURITY_TOKEN" sf = Salesforce(instance_url="https://YOUR_INSTANCE.my.salesforce.com/", username=username, password=password, security_token=token) # when connecting to a sandbox, use this to connect # sf = Salesforce(instance_url="https://YOUR_INSTANCE.my.salesforce.com/", username=username, password=password, security_token=token, domain='test') print("Connected successfully to {0}".format(sf.sf_instance)) return sf def getAllSFOpps(sf): soql_query = "SELECT Id, AccountId, Description FROM Opportunity" opps_df = pd.DataFrame(sf.query_all(soql_query)['records']) # Delete the attributes column del opps_df['attributes'] return opps_df def writeLogs(opps_to_upsert_df, results): print("===== Reading Results, Writing Log Files ... =====") # making time stamp ct = datetime.now().strftime("%m-%d-%Y-%H-%M-%S") # put batch results in dataframe batch_results_df = pd.DataFrame.from_dict(results) # adding index column to bulk action results batch_results_df = batch_results_df.reset_index() # create success & error dataframe & write to logs success_df = batch_results_df.loc[batch_results_df['success'] == True] print("===== Success =====") print(success_df.shape[0]) # MacOS success_df.to_csv('/Users/YOUR_USERNAME/Documents/success'+ct+'.csv', index = False) # WINDOWS #success_df.to_csv('C:\\Users\\YOUR_USERNAME\\Documents\\success'+ct+'.csv', index = False) error_df = batch_results_df.loc[batch_results_df['success'] == False] print("===== Error =====") print(error_df.shape[0]) #renaming id column to avoid confusing when merging error_df = error_df.rename(columns={'id': 'fail_id'}) if error_df.empty == False: # adding an index column to the original bulk request opps_to_upsert_df = opps_to_upsert_df.reset_index() # merging request & result dataframe on index column error_records_df = error_df.merge(opps_to_upsert_df, left_on="index", right_on="index", how="left") # writing to file # MacOS error_records_df.to_csv('/Users/YOUR_USERNAME/Documents/error'+ct+'.csv', index = False) # WINDOWS #error_records_df.to_csv('C:\\Users\\YOUR_USERNAME\\Documents\\error'+ct+'.csv', index = False) def main(): print("===== Connecting to Salesforce =====") sf = getSFConnection() print("===== Querying Salesforce for Opportunities =====") opps_to_upsert_df = getAllSFOpps(sf) print("===== Modify Opps, Parsing =====") opps_to_upsert_df['Description'] = 'Testing' opps_parsed = json.loads(opps_to_upsert_df.to_json(orient="records")) # Upsert the records in bulk print("===== Upserting ... =====") results = sf.bulk.Opportunity.upsert(opps_parsed, 'Id', batch_size=10,use_serial=True) print("===== Finished Upsert=====") # Write Log Files writeLogs(opps_to_upsert_df, results) print("===== DONE =====") if __name__ == '__main__': main()
Want to read more blog entries about Salesforce development? Follow me.