How to create meaningful log files for simple-salesforce bulk methods (Salesforce x Python)

This article discusses how to create meaningful error logs from simple-salesforce bulk methods.

In this example, the python script is

  • connecting to Salesforce
  • querying all opportunities
  • setting the ‘Description’ field on all opps to ‘Testing’
  • bulk upserting the opportunities
import pandas as pd
from simple_salesforce import Salesforce
import json 
from datetime import datetime

def getSFConnection():
    username = "SFDC_USERNAME"
    password = "SFDC_PASSWORD"
    token = "SFDC_SECURITY_TOKEN"

    sf = Salesforce(instance_url="https://YOUR_INSTANCE.my.salesforce.com/", username=username, password=password, security_token=token)
    # when connecting to a sandbox, use this to connect
    # sf = Salesforce(instance_url="https://YOUR_INSTANCE.my.salesforce.com/", username=username, password=password, security_token=token, domain='test')
    print("Connected successfully to {0}".format(sf.sf_instance))
    return sf

def getAllSFOpps(sf):
    soql_query = "SELECT Id, AccountId, Description FROM Opportunity"
    opps_df = pd.DataFrame(sf.query_all(soql_query)['records'])
    # Delete the attributes column
    del opps_df['attributes']
    return opps_df

def main():
    print("===== Connecting to Salesforce =====")
    sf = getSFConnection()

    print("===== Querying Salesforce for Opportunities =====")
    opps_to_upsert_df = getAllSFOpps(sf)

    print("===== Modify Opps, Parsing =====")
    opps_to_upsert_df['Description'] = 'Testing'
    opps_parsed = json.loads(opps_to_upsert_df.to_json(orient="records"))

    # Upsert the records in bulk
    print("===== Upserting ... =====")
    results = sf.bulk.Opportunity.upsert(opps_parsed, 'Id', batch_size=10,use_serial=True)
    print("===== Finished  Upsert=====")

if __name__ == '__main__':
    main()

Bulk data load job result logs

This creates a bulk data load job in Salesforce. You can view it in Setup > Environments > Jobs > Bulk Data Load Jobs, including the logs. However, there are a few downsides:

  • Logs are stored per batch, there isn’t a unified file to view all results, or all errors
  • Error logs are sparse: They list the encountered error but no information on the affected record. Specifically, the record id is missing
  • Depending on how many bulk data load jobs run in a day, it can be difficult to find which job in Salesforce belongs to which script execution

Here’s an example of a bulk data load job result log:

[{
  "success" : true,
  "created" : false,
  "id" : "0063XXXXXXXXXXXXXX",
  "errors" : [ ]
}, {
  "success" : false,
  "created" : false,
  "id" : null,
  "errors" : [ {
    "statusCode" : "FIELD_CUSTOM_VALIDATION_EXCEPTION",
    "message" : "Test Error Message.",
    "fields" : [ "Test_Field__c" ]
  } ]
}]

Writing better logs

There’s already one advantage when you use the simple-salesforce package for bulk methods: It returns once unified results log, instead of one for each batch.

The goal here is to write create 2 log files from it- one for successes, and one for errors. For the error log file, I want to include both the error message, the affected record id and all other fields passed into the bulk DML.

Saving success and error logs to a datetime-stamped file

The method reads the results from the bulk DML and puts it into a pandas dataframe. It filters this batch_results_df by the ‘success’ column, creating a success and an error dataframe. Then, it writes these dataframes to a local file.

def writeLogs(opps_to_upsert_df, results):
    print("===== Reading Results, Writing Log Files ... =====")
    ct = datetime.now().strftime("%m-%d-%Y-%H-%M-%S")
    # put batch results in dataframe
    batch_results_df = pd.DataFrame.from_dict(results)
    
    # create success & error dataframe & write to logs
    success_df = batch_results_df.loc[batch_results_df['success'] == True]
    print("===== Success =====")
    print(success_df.shape[0])
    # MacOS
    success_df.to_csv('/Users/YOUR_USERNAME/Documents/success'+ct+'.csv', index = False)
    # WINDOWS
    #success_df.to_csv('C:\\Users\\YOUR_USERNAME\\Documents\\success'+ct+'.csv', index = False)
   
    error_df = batch_results_df.loc[batch_results_df['success'] == False]
    print("===== Error. =====")
    print(error_df.shape[0])
    # MacOS
    error_df.to_csv('/Users/YOUR_USERNAME/Documents/error'+ct+'.csv', index = False)
    # WINDOWS
    #error_df.to_csv('C:\\Users\\YOUR_USERNAME\\Documents\\error'+ct+'.csv', index = False)

The error logs are still pretty useless since they don’t contain the id of the record that failed to upsert, or any other information from the original job.

Enriching error logs

To enrich error logs, you will need to merge them with data from the original request. The returned error logs are missing the record id, but can be matched based on index.

Applying df.reset_index(), adds an index column to both the request and the result. I use this index column to merge both data frames.

def writeLogs(opps_to_upsert_df, results):
    print("===== Reading Results, Writing Log Files ... =====")
    # making time stamp
    ct = datetime.now().strftime("%m-%d-%Y-%H-%M-%S")
    
    # put batch results in dataframe
    batch_results_df = pd.DataFrame.from_dict(results)
    # adding index column to bulk action results
    batch_results_df = batch_results_df.reset_index() 
    
    # create success & error dataframe & write to logs
    success_df = batch_results_df.loc[batch_results_df['success'] == True]
    print("===== Success =====")
    print(success_df.shape[0])
    # MacOS
    success_df.to_csv('/Users/YOUR_USERNAME/Documents/success'+ct+'.csv', index = False)
    # WINDOWS
    #success_df.to_csv('C:\\Users\\YOUR_USERNAME\\Documents\\success'+ct+'.csv', index = False)
   
    error_df = batch_results_df.loc[batch_results_df['success'] == False]
    print("===== Error. =====")
    print(error_df.shape[0])
    
    
    #renaming id column to avoid confusing when merging
    error_df = error_df.rename(columns={'id': 'fail_id'})
   
    if error_df.empty == False:
        # adding an index column to the original bulk request
        opps_to_upsert_df = opps_to_upsert_df.reset_index()
        
        # merging request & result dataframe on index column
        error_records_df = error_df.merge(opps_to_upsert_df, left_on="index", right_on="index", how="left")
        
        # writing to file
        # MacOS
        error_records_df.to_csv('/Users/YOUR_USERNAME/Documents/error'+ct+'.csv', index = False)
        # WINDOWS
        #error_records_df.to_csv('C:\\Users\\YOUR_USERNAME\\Documents\\error'+ct+'.csv', index = False)

Request:

[
  ...,
{
  "index" : 12
  "Id": "0063XXXXXXXXXXXXXX", 
  "AccountID": "0013XXXXXXXXXXXXXX", 
  "Description": "Testing"
},
...
]

Result:

[
  ...,
{
  "index" : 12,
  "success" : false,
  "created" : false,
  "id" : null,
  "errors" : [ {
    "statusCode" : "FIELD_CUSTOM_VALIDATION_EXCEPTION",
    "message" : "Test Error Message.",
    "fields" : [ "Test_Field__c" ]
  } ],
 ...
}]

Merged

[
  ...,
{
  "index" : 12,
  "success" : false,
  "created" : false,
  "id" : null,
  "errors" : [ {
    "statusCode" : "FIELD_CUSTOM_VALIDATION_EXCEPTION",
    "message" : "Test Error Message.",
    "fields" : [ "Test_Field__c" ]
  } ],
  "Id": "0063XXXXXXXXXXXXXX", 
  "AccountID": "0013XXXXXXXXXXXXXX", 
  "Description": "Testing"
 ...
}]

Full Code

import pandas as pd
from simple_salesforce import Salesforce
import json 
from datetime import datetime

def getSFConnection():
    username = "SFDC_USERNAME"
    password = "SFDC_PASSWORD"
    token = "SFDC_SECURITY_TOKEN"

    sf = Salesforce(instance_url="https://YOUR_INSTANCE.my.salesforce.com/", username=username, password=password, security_token=token)
    # when connecting to a sandbox, use this to connect
    # sf = Salesforce(instance_url="https://YOUR_INSTANCE.my.salesforce.com/", username=username, password=password, security_token=token, domain='test')
    print("Connected successfully to {0}".format(sf.sf_instance))
    return sf

def getAllSFOpps(sf):
    soql_query = "SELECT Id, AccountId, Description FROM Opportunity"
    opps_df = pd.DataFrame(sf.query_all(soql_query)['records'])
    # Delete the attributes column
    del opps_df['attributes']
    return opps_df
    
def writeLogs(opps_to_upsert_df, results):
    print("===== Reading Results, Writing Log Files ... =====")
    # making time stamp
    ct = datetime.now().strftime("%m-%d-%Y-%H-%M-%S")
    
    # put batch results in dataframe
    batch_results_df = pd.DataFrame.from_dict(results)
    # adding index column to bulk action results
    batch_results_df = batch_results_df.reset_index() 
    
    # create success & error dataframe & write to logs
    success_df = batch_results_df.loc[batch_results_df['success'] == True]
    print("===== Success =====")
    print(success_df.shape[0])
    # MacOS
    success_df.to_csv('/Users/YOUR_USERNAME/Documents/success'+ct+'.csv', index = False)
    # WINDOWS
    #success_df.to_csv('C:\\Users\\YOUR_USERNAME\\Documents\\success'+ct+'.csv', index = False)
   
    error_df = batch_results_df.loc[batch_results_df['success'] == False]
    print("===== Error =====")
    print(error_df.shape[0])
    
    
    #renaming id column to avoid confusing when merging
    error_df = error_df.rename(columns={'id': 'fail_id'})
   
    if error_df.empty == False:
        # adding an index column to the original bulk request
        opps_to_upsert_df = opps_to_upsert_df.reset_index()
        
        # merging request & result dataframe on index column
        error_records_df = error_df.merge(opps_to_upsert_df, left_on="index", right_on="index", how="left")
        
        # writing to file
        # MacOS
        error_records_df.to_csv('/Users/YOUR_USERNAME/Documents/error'+ct+'.csv', index = False)
        # WINDOWS
        #error_records_df.to_csv('C:\\Users\\YOUR_USERNAME\\Documents\\error'+ct+'.csv', index = False)

def main():
    print("===== Connecting to Salesforce =====")
    sf = getSFConnection()

    print("===== Querying Salesforce for Opportunities =====")
    opps_to_upsert_df = getAllSFOpps(sf)

    print("===== Modify Opps, Parsing =====")
    opps_to_upsert_df['Description'] = 'Testing'
    opps_parsed = json.loads(opps_to_upsert_df.to_json(orient="records"))

    # Upsert the records in bulk
    print("===== Upserting ... =====")
    results = sf.bulk.Opportunity.upsert(opps_parsed, 'Id', batch_size=10,use_serial=True)
    print("===== Finished  Upsert=====")
    
    # Write Log Files
    writeLogs(opps_to_upsert_df, results)
    print("===== DONE =====")

if __name__ == '__main__':
    main()

Want to read more blog entries about Salesforce development? Follow me.