Skip to content

Commit

Permalink
Use aioftp for downloads (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLoecker authored Oct 4, 2022
1 parent 18384e0 commit cd85feb
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 79 deletions.
1 change: 1 addition & 0 deletions environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies:
- tqdm==4.64.1
- pip==22.1.2
- pip:
- aioftp==0.21.2
- python-libsbml==5.19.2
- optlang==1.5.2
- cobra==0.25.0
Expand Down
1 change: 0 additions & 1 deletion main/py/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ def __init__(self, projectdir):
self.configdir = os.path.join(projectdir, "data", "config_sheets")
self.outputdir = os.path.join(projectdir, "output")
self.pydir = os.path.join(projectdir, "py")
self.docdir = os.path.join(projectdir, "doc")


current_dir = os.getcwd()
Expand Down
2 changes: 1 addition & 1 deletion main/py/proteomics/Crux.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def raw_to_mzml(self, file_information: list[FileInformation]):

self._conversion_counter.acquire()
self._conversion_counter.value += 1
clear_print(f"Starting mzML conversion: {self._conversion_counter.value} / {len(self.file_information)} - {information.raw_file_name}")
clear_print(f"Starting raw -> mzML conversion: {self._conversion_counter.value} / {len(self.file_information)} - {information.raw_file_name}")
self._conversion_counter.release()

information.mzml_base_path.mkdir(parents=True, exist_ok=True)
Expand Down
129 changes: 61 additions & 68 deletions main/py/proteomics/FTPManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,18 @@
TODO: Find a way to mark a file as "downloaded"
- Keep a list of file names in a ".completd" hidden folder?
"""
import ftplib
from ftplib import FTP
from FileInformation import FileInformation
from FileInformation import clear_print

import aioftp
import asyncio
import multiprocessing
from multiprocessing.sharedctypes import Synchronized
import numpy as np
import time
from urllib.parse import urlparse

from .FileInformation import FileInformation
from .FileInformation import clear_print


def ftp_client(host: str, max_attempts: int = 3, port: int = 21, user: str = "anonymous", passwd: str = "guest") -> FTP:
async def aioftp_client(host: str, username: str = "anonymous", password: str = "guest", max_attempts: int = 3) -> aioftp.Client:
"""
This class is responsible for creating a "client" connection
"""
Expand All @@ -25,23 +24,24 @@ def ftp_client(host: str, max_attempts: int = 3, port: int = 21, user: str = "an
# Attempt to connect, throw error if unable to do so
while not connection_successful and attempt_num <= max_attempts:
try:
client: FTP = FTP(user=user, passwd=passwd)
client.connect(host=host, port=port)
client: aioftp.Client = aioftp.Client()
await client.connect(host, 21)
await client.login(user=username, password=password)
connection_successful = True
except ConnectionResetError:

# Make sure this print statement is on a new line on the first error
if attempt_num == 1:
print("")

# Line clean: https://stackoverflow.com/a/5419488/13885200
clear_print(f"Attempt {attempt_num} of {max_attempts} failed to connect, waiting 5 seconds before trying again")
clear_print(f"Attempt {attempt_num} of {max_attempts} failed to connect")
attempt_num += 1
time.sleep(5)
if not connection_successful:
print("")
raise ConnectionResetError("Could not connect to FTP server")

return client


Expand All @@ -55,26 +55,30 @@ def __init__(self, root_link: str, file_extensions: list[str]):
self._files: list[str] = []
self._file_sizes: list[int] = []

self._get_info()

def _get_info(self):
self._get_info_wrapper()

def _get_info_wrapper(self):
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
async_tasks = [self._get_info()]

event_loop.run_until_complete(asyncio.wait(async_tasks))
event_loop.close()

async def _get_info(self):
"""
This function is responsible for getting all files under the root_link
"""
scheme: str = urlparse(self._root_link).scheme
host = urlparse(self._root_link).hostname
folder = urlparse(self._root_link).path

client: FTP = ftp_client(host=host)

for file_path in client.nlst(folder):
if file_path.endswith(tuple(self._extensions)):
download_url: str = f"{scheme}://{host}{file_path}"

client = await aioftp_client(host)
for path, info in (await client.list(folder, recursive=True)):
if str(path).endswith(tuple(self._extensions)):
download_url: str = f"{scheme}://{host}{path}"
self._files.append(download_url)
try:
self._file_sizes.append(client.size(file_path))
except ftplib.error_perm:
self._file_sizes.append(0)
self._file_sizes.append(int(info["size"]))

@property
def files(self):
Expand All @@ -97,65 +101,54 @@ def __init__(
self._file_information: list[FileInformation] = file_information
self._core_count: int = min(core_count, 2) # Limit to 2 downloads at a time
self._download_counter: Synchronized = multiprocessing.Value("i", 1)
self._semaphore = asyncio.Semaphore(self._core_count)

# Find files to download
self.download_data_wrapper()
self._download_data_wrapper()

def download_data_wrapper(self):
def _download_data_wrapper(self):
"""
This function is responsible for using multiprocessing to download as many files at once in parallel
This function is responsible for "kicking off" asynchronous data downloading
"""
# Calculate the number of cores ot use
# We are going to use half the number of cores, OR the number of files to download, whichever is less
# Otherwise if user specified the number of cores, use that
print("Starting file download")

# Split list into chunks to process separately
file_chunks: list[FileInformation] = np.array_split(self._file_information, self._core_count)

# Create a list of jobs
jobs: list[multiprocessing.Process] = []

for i, information in enumerate(file_chunks):

# Append a job to the list
job = multiprocessing.Process(
target=self.download_data,
args=(information,),
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
async_tasks = []
for file_information in self._file_information:
async_tasks.append(
self._aioftp_download_data(
file_information=file_information,
semaphore=self._semaphore,
)
)
jobs.append(job)

[job.start() for job in jobs] # Start jobs
[job.join() for job in jobs] # Wait for jobs to finish
[job.terminate() for job in jobs] # Terminate jobs

def download_data(self, file_information: list[FileInformation]):
# Await all the tasks
event_loop.run_until_complete(asyncio.wait(async_tasks))
event_loop.close()

# Start processing the files
for i, information in enumerate(file_information):

# Define FTP-related variables
parsed_url = urlparse(information.download_url)
host = parsed_url.hostname
path = parsed_url.path
async def _aioftp_download_data(self, file_information: FileInformation, semaphore: asyncio.Semaphore) -> None:

# Convert file_size from byte to MB
size_mb: int = round(information.file_size / (1024**2))
# Define FTP-related variables
parsed_url = urlparse(file_information.download_url)
host = parsed_url.hostname
path = parsed_url.path

# Connect to the host, login, and download the file
client: FTP = ftp_client(host=host)
# client: FTP = FTP(host=host, user="anonymous", passwd="guest")
# Convert file size from byte to MB
size_mb: int = round(file_information.file_size / (1024 ** 2))

# Get the lock and print file info
# Use a semaphore so only N number of tasks can be started at once
async with semaphore:
client = await aioftp_client(host)
self._download_counter.acquire()
print(f"Started download {self._download_counter.value:02d} / {len(self._file_information):02d} ({size_mb} MB) - {information.raw_file_name}") # fmt: skip
clear_print(f"Started download {self._download_counter.value:02d} / {len(self._file_information):02d} ({size_mb} MB) - {file_information.raw_file_name}")
self._download_counter.value += 1
self._download_counter.release()

# Download the file
information.raw_file_path.parent.mkdir(parents=True, exist_ok=True)
client.retrbinary(f"RETR {path}", open(information.raw_file_path, "wb").write)
client.quit()
# Download file, use "write_into" to write to a file, not a directory
await client.download(source=path, destination=file_information.raw_file_path, write_into=True)

await client.quit()


if __name__ == "__main__":
Expand Down
24 changes: 15 additions & 9 deletions main/py/proteomics/proteomics_preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,15 @@
This is the main driver-file for downloading proteomics data
"""
import argparse
import Crux
import csv
from FileInformation import FileInformation
import FTPManager
import os
import sys
from pathlib import Path


from . import Crux
from .FileInformation import FileInformation
from . import FTPManager


class ArgParseFormatter(
argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter
):
Expand Down Expand Up @@ -212,7 +210,7 @@ def print_download_size(self):
# Convert to MB
total_size = total_size // 1024 ** 2
print(f"Total size to download: {total_size} MB")

def _set_replicate_numbers(self):
instances: dict[str, list[FileInformation]] = {}
for information in self.file_information:
Expand All @@ -225,7 +223,7 @@ def _set_replicate_numbers(self):
for i, file_information in enumerate(instances[cell_type]):
current_info: FileInformation = file_information
previous_info: FileInformation = instances[cell_type][i - 1] if i > 0 else None

# Do not modify the replicate value if we are on the very first iteration of this cell type
if i == 0:
pass
Expand Down Expand Up @@ -303,7 +301,8 @@ def parse_args(args: list[str]) -> argparse.Namespace:
dest="skip_download",
default=False,
type=bool,
help="If this action is passed in, FTP data will not be downloaded.\nThis assumes you have raw data under the folder specified by the option '--ftp-out-dir'",
help="If this action is passed in, FTP data will not be downloaded.\n"
"This assumes you have raw data under the folder specified by the option '--ftp-out-dir'",
)
parser.add_argument(
"--skip-mzml",
Expand Down Expand Up @@ -333,7 +332,11 @@ def parse_args(args: list[str]) -> argparse.Namespace:
dest="core_count",
metavar="cores",
default=os.cpu_count() // 2,
help="This is the number of threads to use for downloading files. It will default to the minimum of: half the available CPU cores available, or the number of input files found.\nIt will not use more cores than necessary\nOptions are an integer or 'all' to use all available cores",
help="This is the number of threads to use for downloading files.\n"
"It will default to the minimum of: half the available CPU cores available, or the number of input files found.\n"
"It will not use more cores than necessary\n"
"Options are an integer or 'all' to use all available cores.\n"
"Note: Downloading will use a MAX of 2 threads at once, as some FTP servers do not work well with multiple connections from the same IP address at once.",
)
# TODO: Add option to delete intermediate files (raw, mzml, sqt)

Expand Down Expand Up @@ -407,13 +410,15 @@ def main(args: list[str]):
file_information=file_information,
core_count=args.core_count,
)
print("") # New line to separate this output from the next

if args.skip_mzml is False:
# Convert raw to mzML and then create SQT files
Crux.RAWtoMZML(
file_information=file_information,
core_count=args.core_count,
)
print("") # New line to separate this output from the next

if args.skip_sqt is False:
# Convert mzML to SQT
Expand All @@ -428,6 +433,7 @@ def main(args: list[str]):
file_information=file_information,
core_count=args.core_count,
)
print("") # New line to separate this output from the next

# Get the root folder of output CSV file
root_folders: set[Path] = set([i.intensity_csv.parent for i in file_information])
Expand Down

0 comments on commit cd85feb

Please sign in to comment.