Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get_all_results_for_query_batch extremely slow. #69

Open
peterHoburg opened this issue Oct 23, 2018 · 5 comments
Open

get_all_results_for_query_batch extremely slow. #69

peterHoburg opened this issue Oct 23, 2018 · 5 comments

Comments

@peterHoburg
Copy link

peterHoburg commented Oct 23, 2018

Hi all,
I am fetching ~10m records from a SF object using sf_bulk.create_query_job(sf_object_name, contentType="CSV", concurrency='Parallel', pk_chunking=True). All of the chunks finish in a reasonable amount of time. Then it comes time to get all of the records from each chunk, ~200,000 per chunk, I am using a list comprehension with get_all_results_for_query_batch() to get all of the results and return a list.

records_comprehension = [
record.decode("utf-8")
for chunk in sf_bulk.get_all_results_for_query_batch(batch_id, job)
for record in IteratorBytesIO(chunk)
]

To do this for ~200,000 items it is taking 20+ minutes and using ~15% of my 8 core CPU (I know I know python and multi core and all that. Just an interesting number. 12.5% is 100% usage of one core). As you can imagine that's not the best thing when trying to get 10m items. Right now I think I am being limited by single core performance during the creation of the records_comprehension specifically therecord.decode("utf-8") if I had to guess.

I am planning on parallelize the entire program in to 10 or so different processes to maximize the cpu resources I have available.

Other than parrelleizing the consumption of chunks is there something I am doing wrong? Is there a better way to go about this? In the examples it is essentially doing it this way, but with a double for loop instead of a comprehension (comprehension should be faster)

PS. When I get my code working as efficiently as possible I will submit a PR with a PK chunk example and some more function docs. Thank you Christopher Lambacher for making this project!

@nikhyada
Copy link

nikhyada commented Nov 6, 2018

@peterHoburg I have around 30+ chunks (~100,000 per chunk) and it's taking to much time for the program to execute when I am passing batch id's one after another into 'get_all_results_for_query_batch '. Did you find multithreading an efficient or some other approach you tried? Please advise?

@lambacck
Copy link
Contributor

lambacck commented Nov 6, 2018

get_all_results_for_query_batch is a convenience method. For very large quantities of records you likely want to be using some of the lower level methods to control which batches you get and when. There certainly is a bunch of IO overhead that can be parallelized. See #62 (comment) for some ideas of where to start.

@nikhyada
Copy link

@lambacck Hi, I implemented multiprocessing and getting below always this error:
Traceback (most recent call last):
File "/usr/lib64/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib64/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "thread.py", line 78, in write_op
for row in reader:
File "/usr/lib64/python3.6/csv.py", line 112, in next
row = next(self.reader)
File "/usr/lib/python3.6/dist-packages/unicodecsv/py3.py", line 55, in next
return self.reader.next()
File "/usr/lib/python3.6/dist-packages/unicodecsv/py3.py", line 51, in
f = (bs.decode(encoding, errors=errors) for bs in f)
File "/usr/lib/python3.6/dist-packages/salesforce_bulk/util.py", line 13, in read
return bytes(bytearray(islice(self.iterator, None, n)))
File "/usr/lib/python3.6/dist-packages/salesforce_bulk/salesforce_bulk.py", line 495, in
iter = (x.replace(b'\0', b'') for x in resp.iter_content(chunk_size=chunk_size))
File "/usr/lib/python3.6/dist-packages/requests/models.py", line 753, in generate
raise ChunkedEncodingError(e)
requests.exceptions.ChunkedEncodingError: ('Connection broken: IncompleteRead(527 bytes read, 1521 more expected)', IncompleteRead(527 bytes read, 1521 more expected))

Sorry, I am new to python, could you please advise if I am doing something wrong.

@nikhyada
Copy link

nikhyada commented Nov 12, 2018

Just to add on above, when I have controlled multiprocessing i.e. around 8-10 at a time it works fine but when I tried to use more than 10, in my scenario I used 16 I continuously got above error. I am using an octa-core processor and I believe it's not an OS related issue, is it due to SF API? Please advise?

Also, the result return Dict item, can I directly get a CSV o/p instead of DictCSV so I don't have to convert it explicitly to CSV

@aayushsf
Copy link

Hi, I'm facing a similar issue. Was this issue ever resolved?

I'm fetching a million records from Salesforce is taking more than 4 hours. I've identified that the IteratorBytesIO() function is consuming a substantial amount of time due to the unpacking process of the resulting JSON data. Is there a possibility to directly retrieve the data in JSON or CSV format without receiving it as an iterator?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants