Source code for simple_salesforce.bulk

""" Classes for interacting with Salesforce Bulk API """

import concurrent.futures
import json
from collections import OrderedDict
from functools import partial
from time import sleep
from typing import Any, Dict, Iterable, List, Optional, Union, cast

import requests

from .exceptions import SalesforceGeneralError
from .util import BulkDataAny, BulkDataStr, Headers, Proxies, \
    call_salesforce, \
    list_from_generator


[docs] class SFBulkHandler: """ Bulk API request handler Intermediate class which allows us to use commands, such as 'sf.bulk.Contacts.create(...)' This is really just a middle layer, whose sole purpose is to allow the above syntax """
[docs] def __init__( self, session_id: str, bulk_url: str, proxies: Optional[Proxies] = None, session: Optional[requests.Session] = None ): """Initialize the instance with the given parameters. Arguments: * session_id -- the session ID for authenticating to Salesforce * bulk_url -- API endpoint set in Salesforce instance * proxies -- the optional map of scheme to proxy server * session -- Custom requests session, created in calling code. This enables the use of requests Session features not otherwise exposed by simple_salesforce. """ self.session_id = session_id self.session = session or requests.Session() self.bulk_url = bulk_url # don't wipe out original proxies with None if not session and proxies is not None: self.session.proxies = proxies # Define these headers separate from Salesforce class, # as bulk uses a slightly different format self.headers = { 'Content-Type': 'application/json', 'X-SFDC-Session': self.session_id, 'X-PrettyPrint': '1' }
def __getattr__(self, name: str ) -> "SFBulkType": return SFBulkType(object_name=name, bulk_url=self.bulk_url, headers=self.headers, session=self.session )
[docs] def submit_dml(self, object_name: str, dml: str, data: BulkDataAny, external_id_field: str = None, batch_size: int = 10000, use_serial: bool = False, bypass_results: bool = False, include_detailed_results: bool = False ): """ Perform any DML operation on any custom or standard object in Salesforce i.e. insert/upsert/update/delete Required to put this function in this class due to error: TypeError: 'SFBulkType' object is not callable - this makes SFBulkType callable for this specific function The main purpose of this function is to build customizable reporting functions and reduce code reuse in individual execution scripts mainly with pandas Arguments: * object_name -- SF object * dml -- insert, upsert, update, delete * data -- JSON formatted salesforce records. Data is batched by 10,000 records by default. To pick a lower size pass smaller integer to `batch_size`. to let simple-salesforce pick the appropriate limit dynamically, enter `batch_size='auto'` * batch_size -- default to 10,000 * use_serial -- default: bool = False * bypass_results -- default: bool = False, * include_detailed_results --default: bool = False, * external_id_field -- unique identifier field for upsert operations. """ return SFBulkType(object_name=object_name, bulk_url=self.bulk_url, headers=self.headers, session=self.session).submit_dml(dml, data, external_id_field, batch_size, use_serial, bypass_results, include_detailed_results)
[docs] class SFBulkType: """ Interface to Bulk/Async API functions"""
[docs] def __init__( self, object_name: str, bulk_url: str, headers: Headers, session: requests.Session ): """Initialize the instance with the given parameters. Arguments: * object_name -- the name of the type of SObject this represents, e.g. `Lead` or `Contact` * bulk_url -- API endpoint set in Salesforce instance * headers -- bulk API headers * session -- Custom requests session, created in calling code. This enables the use of requests Session features not otherwise exposed by simple_salesforce. """ self.object_name = object_name self.bulk_url = bulk_url self.session = session self.headers = headers
def _create_job(self, operation: str, use_serial: bool, external_id_field: Optional[str] = None ) -> Any: """ Create a bulk job Arguments: * operation -- Bulk operation to be performed by job * use_serial -- Process batches in order * external_id_field -- unique identifier field for upsert operations """ payload = { 'operation': operation, 'object': self.object_name, 'concurrencyMode': 1 if use_serial else 0, 'contentType': 'JSON' } if operation == 'upsert': payload['externalIdFieldName'] = external_id_field url = f'{self.bulk_url}job' result = call_salesforce(url=url, method='POST', session=self.session, headers=self.headers, data=json.dumps(payload, allow_nan=False ) ) return result.json(object_pairs_hook=OrderedDict) def _close_job(self, job_id: str ) -> Any: """ Close a bulk job """ payload = { 'state': 'Closed' } url = f'{self.bulk_url}job/{job_id}' result = call_salesforce(url=url, method='POST', session=self.session, headers=self.headers, data=json.dumps(payload, allow_nan=False ) ) return result.json(object_pairs_hook=OrderedDict) def _get_job(self, job_id: str ) -> Any: """ Get an existing job to check the status """ url = f'{self.bulk_url}job/{job_id}' result = call_salesforce(url=url, method='GET', session=self.session, headers=self.headers ) return result.json(object_pairs_hook=OrderedDict) def _add_batch( self, job_id: str, data: BulkDataAny, operation: str ) -> Any: """ Add a set of data as a batch to an existing job Separating this out in case of later implementations involving multiple batches """ url = f'{self.bulk_url}job/{job_id}/batch' data_: Union[BulkDataAny, str] if operation not in ('query', 'queryAll'): data_ = json.dumps(data, allow_nan=False ) else: data_ = data result = call_salesforce(url=url, method='POST', session=self.session, headers=self.headers, data=data_ ) return result.json(object_pairs_hook=OrderedDict) def _get_batch(self, job_id: str, batch_id: str ) -> Any: """ Get an existing batch to check the status """ url = f'{self.bulk_url}job/{job_id}/batch/{batch_id}' result = call_salesforce(url=url, method='GET', session=self.session, headers=self.headers ) return result.json(object_pairs_hook=OrderedDict) def _get_batch_results( self, job_id: str, batch_id: str, operation: str ) -> Iterable[Any]: """ retrieve a set of results from a completed job """ url = f'{self.bulk_url}job/{job_id}/batch/{batch_id}/result' result = call_salesforce(url=url, method='GET', session=self.session, headers=self.headers ) if operation in ('query', 'queryAll'): for batch_result in result.json(): url_query_results = f'{url}/{batch_result}' batch_query_result = call_salesforce(url=url_query_results, method='GET', session=self.session, headers=self.headers ).json() yield batch_query_result else: yield result.json() def _get_batch_request_with_batch_results(self, job_id: str, batch_id: str, ) -> Iterable[Any]: """ retrieve a set of results from a completed job """ url = f'{self.bulk_url}job/{job_id}/batch/{batch_id}/request' batch_request = call_salesforce(url=url, method='GET', session=self.session, headers=self.headers ) batch_results = self._get_batch_results(job_id, batch_id, operation='batch_results' ) results = [] for batch_result in batch_results: for idx, i in enumerate(batch_result): flattened_request_dict = [{ k + '.' + list(v.keys())[0]: v.get( list(v.keys())[0] ) } if isinstance(v, dict ) else { k: v } for k, v in batch_request.json()[idx].items()] for request_field in flattened_request_dict: i.update(request_field) results.append(i) yield results
[docs] def worker(self, batch: Dict[str, Any], operation: str, wait: int = 5, bypass_results: bool = False, include_detailed_results: bool = False ) -> Iterable[Any]: """ Gets batches from concurrent worker threads. self._bulk_operation passes batch jobs. The worker function checks each batch job waiting for it complete and appends the results. """ if not bypass_results: batch_status = self._get_batch(job_id=batch['jobId'], batch_id=batch['id'] )['state'] while batch_status not in ['Completed', 'Failed', 'NotProcessed']: sleep(wait) batch_status = self._get_batch(job_id=batch['jobId'], batch_id=batch['id'] )['state'] if include_detailed_results: result = self._get_batch_request_with_batch_results( job_id=batch['jobId'], batch_id=batch['id'] ) else: result = self._get_batch_results(job_id=batch['jobId'], batch_id=batch['id'], operation=operation ) else: result = [{ 'bypass_results': bypass_results, 'job_id': batch['jobId'] }] return result
def _add_autosized_batches( self, data: BulkDataAny, operation: str, job: str ) -> List[Any]: """ Auto-create batches that respect bulk api V1 limits. bulk v1 api has following limits number of records <= 10000 AND file_size_limit <= 10MB AND number_of_character_limit <= 10000000 Documentation on limits can be found at: https://developer.salesforce.com/docs/atlas.en-us .salesforce_app_limits_cheatsheet.meta /salesforce_app_limits_cheatsheet /salesforce_app_limits_platform_bulkapi.htm#ingest_jobs Our JSON serialization uses the default `ensure_ascii=True`, so the character and byte lengths will be the same. Therefore we only need to adhere to a single length limit of 10,000,000 characters. TODO: In future when simple-salesforce supports bulk api V2 we should detect api version and set max file size accordingly. V2 increases file size limit to 150MB TODO: support for the following limits have not been added since these are record / field level limits and not chunk level limits: * Maximum number of fields in a record: 5,000 * Maximum number of characters in a record: 400,000 * Maximum number of characters in a field: 131,072 """ record_limit = 10_000 char_limit = 10_000_000 batches = [] last_break = 0 record_count, char_count = 0, 0 for i, record in enumerate(data): # 2 is added to account for the enclosing `[]` for the first record # and the separator `, ` between records for subsequent records. additional_chars = len(json.dumps(record, default=str ) ) + 2 if any([ char_count + additional_chars > char_limit, record_count == record_limit ] ): batches.append(data[last_break:i]) last_break = i record_count, char_count = 0, 0 char_count += additional_chars record_count += 1 if last_break < len(data) - 1: batches.append(data[last_break:]) return [self._add_batch(job_id=job, data=i, operation=operation ) for i in batches] # pylint: disable=R0913,line-too-long def _bulk_operation( self, operation: str, data: BulkDataAny, use_serial: bool = False, external_id_field: Optional[str] = None, batch_size: Union[int, str] = 10000, wait: int = 5, bypass_results: bool = False, include_detailed_results: bool = False ) -> Iterable[Iterable[Any]]: """ String together helper functions to create a complete end-to-end bulk API request Arguments: * operation -- Bulk operation to be performed by job * data -- list of dict to be passed as a batch * use_serial -- Process batches in serial mode * external_id_field -- unique identifier field for upsert operations * wait -- seconds to sleep between checking batch status * batch_size -- number of records to assign for each batch in the job or `auto` """ # check for batch size type since now it accepts both integers # & the string `auto` if not (isinstance(batch_size, int ) or batch_size == 'auto'): raise ValueError('batch size should be auto or an integer') results: Iterable[Iterable[Any]] if operation not in ('query', 'queryAll'): # Checks if data is present if not data: raise ValueError(f'data should not be empty for {operation}') # Checks to prevent batch limit if batch_size != 'auto': batch_size = min(batch_size, len(data), 10000 ) with concurrent.futures.ThreadPoolExecutor() as pool: job = self._create_job(operation=operation, use_serial=use_serial, external_id_field=external_id_field ) if batch_size == 'auto': batches = self._add_autosized_batches(job=job['id'], data=data, operation=operation ) else: batch_size = cast(int, batch_size ) batches = [ self._add_batch(job_id=job['id'], data=i, operation=operation ) for i in [data[i * batch_size:(i + 1) * batch_size] for i in range(len(data) // batch_size + 1)] if i] multi_thread_worker = partial(self.worker, operation=operation, wait=wait, bypass_results=bypass_results, include_detailed_results=include_detailed_results ) list_of_results = pool.map(multi_thread_worker, batches ) results = [x for sublist in list_of_results for i in sublist for x in i] if not bypass_results else \ [{ k: v } for sublist in list_of_results for i in sublist for k, v in i.items()] self._close_job(job_id=job['id']) elif operation in ('query', 'queryAll'): job = self._create_job(operation=operation, use_serial=use_serial, external_id_field=external_id_field ) batch = self._add_batch(job_id=job['id'], data=data, operation=operation ) self._close_job(job_id=job['id']) batch_status = self._get_batch(job_id=batch['jobId'], batch_id=batch['id'] ) while batch_status['state'] not in [ 'Completed', 'Failed', 'NotProcessed' ]: sleep(wait) batch_status = self._get_batch(job_id=batch['jobId'], batch_id=batch['id'] ) if batch_status['state'] == 'Failed': raise SalesforceGeneralError('', batch_status['state'], batch_status['jobId'], batch_status['stateMessage'] ) results = self._get_batch_results(job_id=batch['jobId'], batch_id=batch['id'], operation=operation ) return results # _bulk_operation wrappers to expose supported Salesforce bulk operations
[docs] def delete( self, data: BulkDataStr, batch_size: int = 10000, use_serial: bool = False, bypass_results: bool = False, include_detailed_results: bool = False ) -> Iterable[Any]: """ soft delete records Data is batched by 10,000 records by default. To pick a lower size pass smaller integer to `batch_size`. to let simple-salesforce pick the appropriate limit dynamically, enter `batch_size='auto'` """ results = self._bulk_operation(use_serial=use_serial, operation='delete', data=data, batch_size=batch_size, bypass_results=bypass_results, include_detailed_results=include_detailed_results ) return results
[docs] def insert( self, data: BulkDataAny, batch_size: int = 10000, use_serial: bool = False, bypass_results: bool = False, include_detailed_results: bool = False ) -> Iterable[Any]: """ insert records Data is batched by 10,000 records by default. To pick a lower size pass smaller integer to `batch_size`. to let simple-salesforce pick the appropriate limit dynamically, enter `batch_size='auto'` """ results = self._bulk_operation(use_serial=use_serial, operation='insert', data=data, batch_size=batch_size, bypass_results=bypass_results, include_detailed_results=include_detailed_results ) return results
[docs] def upsert( self, data: BulkDataAny, external_id_field: str, batch_size: int = 10000, use_serial: bool = False, bypass_results: bool = False, include_detailed_results: bool = False ) -> Iterable[Any]: """ upsert records based on a unique identifier Data is batched by 10,000 records by default. To pick a lower size pass smaller integer to `batch_size`. to let simple-salesforce pick the appropriate limit dynamically, enter `batch_size='auto'` """ results = self._bulk_operation(use_serial=use_serial, operation='upsert', external_id_field=external_id_field, data=data, batch_size=batch_size, bypass_results=bypass_results, include_detailed_results=include_detailed_results ) return results
[docs] def update( self, data: BulkDataAny, batch_size: int = 10000, use_serial: bool = False, bypass_results: bool = False, include_detailed_results: bool = False ) -> Iterable[Any]: """ update records Data is batched by 10,000 records by default. To pick a lower size pass smaller integer to `batch_size`. to let simple-salesforce pick the appropriate limit dynamically, enter `batch_size='auto'` """ results = self._bulk_operation(use_serial=use_serial, operation='update', data=data, batch_size=batch_size, bypass_results=bypass_results, include_detailed_results=include_detailed_results ) return results
[docs] def hard_delete( self, data: BulkDataStr, batch_size: int = 10000, use_serial: bool = False, bypass_results: bool = False, include_detailed_results: bool = False ) -> Iterable[Any]: """ hard delete records Data is batched by 10,000 records by default. To pick a lower size pass smaller integer to `batch_size`. to let simple-salesforce pick the appropriate limit dynamically, enter `batch_size='auto'` """ results = self._bulk_operation(use_serial=use_serial, operation='hardDelete', data=data, batch_size=batch_size, bypass_results=bypass_results, include_detailed_results=include_detailed_results ) return results
[docs] def query( self, data: BulkDataStr, lazy_operation: bool = False, wait: int = 5 ) -> Iterable[Any]: """ bulk query """ results = self._bulk_operation(operation='query', data=data, wait=wait ) if lazy_operation: return results return list_from_generator(results)
[docs] def query_all( self, data: BulkDataStr, lazy_operation: bool = False, wait: int = 5 ) -> Iterable[Any]: """ bulk queryAll """ results = self._bulk_operation(operation='queryAll', data=data, wait=wait ) if lazy_operation: return results return list_from_generator(results)
[docs] def submit_dml( self, function_name: str, data: BulkDataAny, external_id_field: str = None, batch_size: int = 10000, use_serial: bool = False, bypass_results: bool = False, include_detailed_results: bool = False ): """ modular bulk dml operations - perform insert/upsert/update/delete on any standard and custom objects in Salesforce.""" if function_name == 'upsert' and external_id_field is not None: return getattr(self, function_name)(data, external_id_field, batch_size, use_serial, bypass_results, include_detailed_results, ) else: return getattr(self, function_name)(data, batch_size, use_serial, bypass_results, include_detailed_results)