diff --git a/src/main/java/com/conveyal/analysis/controllers/BundleController.java b/src/main/java/com/conveyal/analysis/controllers/BundleController.java index b7fc71cc5..ef9111285 100644 --- a/src/main/java/com/conveyal/analysis/controllers/BundleController.java +++ b/src/main/java/com/conveyal/analysis/controllers/BundleController.java @@ -13,20 +13,18 @@ import com.conveyal.file.FileUtils; import com.conveyal.gtfs.GTFSCache; import com.conveyal.gtfs.GTFSFeed; -import com.conveyal.gtfs.error.GTFSError; import com.conveyal.gtfs.error.GeneralError; import com.conveyal.gtfs.model.Stop; import com.conveyal.gtfs.validator.PostLoadValidator; import com.conveyal.osmlib.Node; import com.conveyal.osmlib.OSM; -import com.conveyal.r5.analyst.progress.ProgressInputStream; import com.conveyal.r5.analyst.cluster.TransportNetworkConfig; +import com.conveyal.r5.analyst.progress.ProgressInputStream; import com.conveyal.r5.analyst.progress.Task; import com.conveyal.r5.streets.OSMCache; import com.conveyal.r5.util.ExceptionUtils; import com.mongodb.QueryBuilder; import org.apache.commons.fileupload.FileItem; -import org.apache.commons.fileupload.disk.DiskFileItem; import org.bson.types.ObjectId; import org.locationtech.jts.geom.Envelope; import org.slf4j.Logger; @@ -107,19 +105,25 @@ private Bundle create (Request req, Response res) { // Do some initial synchronous work setting up the bundle to fail fast if the request is bad. final Map> files = HttpUtils.getRequestFiles(req.raw()); final Bundle bundle = new Bundle(); + final File osmPbfFile; + final List gtfsZipFiles; try { bundle.name = files.get("bundleName").get(0).getString("UTF-8"); bundle.regionId = files.get("regionId").get(0).getString("UTF-8"); if (files.get("osmId") != null) { + osmPbfFile = null; bundle.osmId = files.get("osmId").get(0).getString("UTF-8"); Bundle bundleWithOsm = Persistence.bundles.find(QueryBuilder.start("osmId").is(bundle.osmId).get()).next(); if (bundleWithOsm == null) { throw AnalysisServerException.badRequest("Selected OSM does not exist."); } + } else { + osmPbfFile = HttpUtils.saveFileItemLocally(files.get("osm").get(0)); } if (files.get("feedGroupId") != null) { + gtfsZipFiles = null; bundle.feedGroupId = files.get("feedGroupId").get(0).getString("UTF-8"); Bundle bundleWithFeed = Persistence.bundles.find(QueryBuilder.start("feedGroupId").is(bundle.feedGroupId).get()).next(); if (bundleWithFeed == null) { @@ -134,6 +138,8 @@ private Bundle create (Request req, Response res) { bundle.feeds = bundleWithFeed.feeds; bundle.feedsComplete = bundleWithFeed.feedsComplete; bundle.totalFeeds = bundleWithFeed.totalFeeds; + } else { + gtfsZipFiles = HttpUtils.saveFileItemsLocally(files.get("feedGroup")); } UserPermissions userPermissions = UserPermissions.from(req); bundle.accessGroup = userPermissions.accessGroup; @@ -155,16 +161,15 @@ private Bundle create (Request req, Response res) { .withWorkProduct(BUNDLE, bundle._id, bundle.regionId) .withAction(progressListener -> { try { - if (bundle.osmId == null) { + if (osmPbfFile != null) { // Process uploaded OSM. bundle.osmId = new ObjectId().toString(); - DiskFileItem fi = (DiskFileItem) files.get("osm").get(0); // Here we perform minimal validation by loading the OSM, but don't retain the resulting MapDB. OSM osm = new OSM(null); osm.intersectionDetection = true; // Number of entities in an OSM file is unknown, so derive progress from the number of bytes read. // Wrapping in buffered input stream should reduce number of progress updates. - osm.readPbf(ProgressInputStream.forFileItem(fi, progressListener)); + osm.readPbf(ProgressInputStream.forFile(osmPbfFile, progressListener)); // osm.readPbf(new BufferedInputStream(fi.getInputStream())); Envelope osmBounds = new Envelope(); for (Node n : osm.nodes.values()) { @@ -173,10 +178,10 @@ private Bundle create (Request req, Response res) { osm.close(); checkWgsEnvelopeSize(osmBounds, "OSM data"); // Store the source OSM file. Note that we're not storing the derived MapDB file here. - fileStorage.moveIntoStorage(OSMCache.getKey(bundle.osmId), fi.getStoreLocation()); + fileStorage.moveIntoStorage(OSMCache.getKey(bundle.osmId), osmPbfFile); } - if (bundle.feedGroupId == null) { + if (gtfsZipFiles != null) { // Process uploaded GTFS files bundle.feedGroupId = new ObjectId().toString(); @@ -186,8 +191,7 @@ private Bundle create (Request req, Response res) { bundle.feeds = new ArrayList<>(); bundle.totalFeeds = files.get("feedGroup").size(); - for (FileItem fileItem : files.get("feedGroup")) { - File feedFile = ((DiskFileItem) fileItem).getStoreLocation(); + for (File feedFile : gtfsZipFiles) { ZipFile zipFile = new ZipFile(feedFile); File tempDbFile = FileUtils.createScratchFile("db"); File tempDbpFile = new File(tempDbFile.getAbsolutePath() + ".p"); diff --git a/src/main/java/com/conveyal/analysis/controllers/OpportunityDatasetController.java b/src/main/java/com/conveyal/analysis/controllers/OpportunityDatasetController.java index 66509bcec..3e320d43b 100644 --- a/src/main/java/com/conveyal/analysis/controllers/OpportunityDatasetController.java +++ b/src/main/java/com/conveyal/analysis/controllers/OpportunityDatasetController.java @@ -3,6 +3,7 @@ import com.conveyal.analysis.AnalysisServerException; import com.conveyal.analysis.UserPermissions; import com.conveyal.analysis.components.TaskScheduler; +import com.conveyal.analysis.datasource.DataSourceUtil; import com.conveyal.analysis.grids.SeamlessCensusGridExtractor; import com.conveyal.analysis.models.DataGroup; import com.conveyal.analysis.models.OpportunityDataset; @@ -29,7 +30,6 @@ import com.google.common.io.Files; import com.mongodb.QueryBuilder; import org.apache.commons.fileupload.FileItem; -import org.apache.commons.fileupload.disk.DiskFileItem; import org.apache.commons.io.FilenameUtils; import org.bson.types.ObjectId; import org.slf4j.Logger; @@ -55,7 +55,6 @@ import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -import static com.conveyal.analysis.datasource.DataSourceUtil.detectUploadFormatAndValidate; import static com.conveyal.analysis.util.JsonUtil.toJson; import static com.conveyal.file.FileCategory.GRIDS; import static com.conveyal.r5.analyst.WebMercatorExtents.parseZoom; @@ -327,24 +326,15 @@ private OpportunityDatasetUploadStatus createOpportunityDataset(Request req, Res // are recorded in a persistent purpose-built way rather than falling back on the UI's catch-all error window. // TODO more standardized mechanism for tracking asynchronous tasks and catching exceptions on them OpportunityDatasetUploadStatus status = new OpportunityDatasetUploadStatus(regionId, sourceName); - addStatusAndRemoveOldStatuses(status); - // TODO should we delete this temporary directory at the end? - final File tmpDirectory = FileUtils.createScratchDirectory(); - final List files = new ArrayList<>(); - final List fileItems; + final List files; final FileStorageFormat uploadFormat; final Map parameters; try { // Validate inputs and parameters, which will throw an exception if there's anything wrong with them. // Call remove() rather than get() so that subsequent code will see only string parameters, not the files. - fileItems = formFields.remove("files"); - for (var fi : fileItems) { - var tmpFile = new File(tmpDirectory, fi.getName()); - Files.move(((DiskFileItem) fi).getStoreLocation(), tmpFile); - files.add(tmpFile); - } - uploadFormat = detectUploadFormatAndValidate(files); + files = HttpUtils.extractFilesFromFileItemsAndUnzip(formFields.remove("files")); + uploadFormat = DataSourceUtil.detectUploadFormatAndValidate(files); parameters = extractStringParameters(formFields); } catch (Exception e) { status.completeWithError(e); @@ -352,6 +342,9 @@ private OpportunityDatasetUploadStatus createOpportunityDataset(Request req, Res return status; } + // Add the status to the region wide tracker before we begin the heavy tasks. + addStatusAndRemoveOldStatuses(status); + // We are going to call several potentially slow blocking methods to create and persist new pointsets. // This whole series of actions will be run sequentially but within an asynchronous Executor task. // After enqueueing, the status is returned so the UI can track progress. @@ -631,6 +624,7 @@ public static class OpportunityDatasetUploadStatus implements ProgressListener { public Status status = Status.PROCESSING; public String name; public String message; + public String stackTrace; public Date createdAt; public Date completedAt; @@ -647,7 +641,8 @@ private void completed (Status status) { } public void completeWithError (Exception e) { - message = "Unable to create opportunity dataset. " + ExceptionUtils.stackTraceString(e); + stackTrace = ExceptionUtils.stackTraceString(e); + message = "Unable to create opportunity dataset. " + e.getMessage(); completed(Status.ERROR); } diff --git a/src/main/java/com/conveyal/analysis/datasource/DataSourceUploadAction.java b/src/main/java/com/conveyal/analysis/datasource/DataSourceUploadAction.java index 5f621635a..28028f861 100644 --- a/src/main/java/com/conveyal/analysis/datasource/DataSourceUploadAction.java +++ b/src/main/java/com/conveyal/analysis/datasource/DataSourceUploadAction.java @@ -3,25 +3,23 @@ import com.conveyal.analysis.UserPermissions; import com.conveyal.analysis.models.DataSource; import com.conveyal.analysis.persistence.AnalysisCollection; +import com.conveyal.analysis.util.HttpUtils; import com.conveyal.file.FileStorage; import com.conveyal.file.FileStorageFormat; import com.conveyal.file.FileStorageKey; import com.conveyal.r5.analyst.progress.ProgressListener; import com.conveyal.r5.analyst.progress.TaskAction; import org.apache.commons.fileupload.FileItem; -import org.apache.commons.fileupload.disk.DiskFileItem; import org.apache.commons.io.FilenameUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.util.ArrayList; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.stream.Collectors; -import static com.conveyal.analysis.datasource.DataSourceUtil.detectUploadFormatAndValidate; import static com.conveyal.analysis.util.HttpUtils.getFormField; import static com.conveyal.file.FileCategory.DATASOURCES; import static com.conveyal.file.FileStorageFormat.SHP; @@ -123,15 +121,9 @@ public static DataSourceUploadAction forFormFields ( // Extract required parameters. Throws AnalysisServerException on failure, e.g. if a field is missing. final String sourceName = getFormField(formFields, "sourceName", true); final String regionId = getFormField(formFields, "regionId", true); - final List fileItems = formFields.get("sourceFiles"); - final List files = new ArrayList<>(); + final List files = HttpUtils.extractFilesFromFileItemsAndUnzip(formFields.get("sourceFiles")); - for (var fi : fileItems) { - var dfi = (DiskFileItem) fi; - files.add(dfi.getStoreLocation()); - } - - FileStorageFormat format = detectUploadFormatAndValidate(files); + FileStorageFormat format = DataSourceUtil.detectUploadFormatAndValidate(files); DataSourceIngester ingester = DataSourceIngester.forFormat(format); String originalFileNames = files.stream().map(File::getName).collect(Collectors.joining(", ")); diff --git a/src/main/java/com/conveyal/analysis/datasource/DataSourceUtil.java b/src/main/java/com/conveyal/analysis/datasource/DataSourceUtil.java index 13b158fe1..01222c210 100644 --- a/src/main/java/com/conveyal/analysis/datasource/DataSourceUtil.java +++ b/src/main/java/com/conveyal/analysis/datasource/DataSourceUtil.java @@ -27,15 +27,15 @@ public abstract class DataSourceUtil { * @throws DataSourceException if the type of the upload can't be detected or preconditions are violated. * @return the expected type of the uploaded file or files, never null. */ - public static FileStorageFormat detectUploadFormatAndValidate (List fileItems) { - if (isNullOrEmpty(fileItems)) { + public static FileStorageFormat detectUploadFormatAndValidate (List files) { + if (isNullOrEmpty(files)) { throw new DataSourceException("You must select some files to upload."); } - Set fileExtensions = extractFileExtensions(fileItems); + Set fileExtensions = extractFileExtensions(files); if (fileExtensions.isEmpty()) { throw new DataSourceException("No file extensions seen, cannot detect upload type."); } - checkFileCharacteristics(fileItems); + checkFileCharacteristics(files); if (fileExtensions.contains("zip")) { throw new DataSourceException("Upload of spatial .zip files not yet supported"); // TODO unzip and process unzipped files - will need to peek inside to detect GTFS uploads first. @@ -45,7 +45,7 @@ public static FileStorageFormat detectUploadFormatAndValidate (List fileIt final Set shapefileExtensions = Sets.newHashSet("shp", "dbf", "prj"); if ( ! Sets.intersection(fileExtensions, shapefileExtensions).isEmpty()) { if (fileExtensions.containsAll(shapefileExtensions)) { - verifyBaseNamesSame(fileItems); + verifyBaseNamesSame(files); // TODO check that any additional file is .shx, and that there are no more than 4 files. } else { throw new DataSourceException("You must multi-select at least SHP, DBF, and PRJ files for shapefile upload."); @@ -113,5 +113,4 @@ private static void verifyBaseNamesSame (List files) { } } } - } diff --git a/src/main/java/com/conveyal/analysis/util/HttpUtils.java b/src/main/java/com/conveyal/analysis/util/HttpUtils.java index 53cb2a04a..74fef654e 100644 --- a/src/main/java/com/conveyal/analysis/util/HttpUtils.java +++ b/src/main/java/com/conveyal/analysis/util/HttpUtils.java @@ -1,14 +1,16 @@ package com.conveyal.analysis.util; import com.conveyal.analysis.AnalysisServerException; +import com.conveyal.file.FileUtils; import com.conveyal.r5.util.ExceptionUtils; import org.apache.commons.fileupload.FileItem; -import org.apache.commons.fileupload.FileItemFactory; import org.apache.commons.fileupload.disk.DiskFileItemFactory; import org.apache.commons.fileupload.servlet.ServletFileUpload; import javax.servlet.http.HttpServletRequest; +import java.io.File; import java.io.UnsupportedEncodingException; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -36,8 +38,8 @@ public static Map> getRequestFiles (HttpServletRequest re // If we always saved the FileItems via write() or read them with getInputStream() they would not all need to // be on disk. try { - FileItemFactory fileItemFactory = new DiskFileItemFactory(0, null); - ServletFileUpload sfu = new ServletFileUpload(fileItemFactory); + DiskFileItemFactory diskFileItemFactory = new DiskFileItemFactory(0, null); + ServletFileUpload sfu = new ServletFileUpload(diskFileItemFactory); return sfu.parseParameterMap(req); } catch (Exception e) { throw AnalysisServerException.fileUpload(ExceptionUtils.stackTraceString(e)); @@ -66,4 +68,56 @@ public static String getFormField(Map> formFields, String fieldName)); } } + + /** + * Extracts `FileItem`s contents locally in a temp directory. Automatically unzip files. Return the list of new + * `File` handles. + */ + public static List extractFilesFromFileItemsAndUnzip(List fileItems) { + File directory = FileUtils.createScratchDirectory(); + List files = new ArrayList<>(); + for (FileItem fi : fileItems) { + File file = moveFileItemIntoDirectory(fi, directory); + String name = file.getName(); + if (name.toLowerCase().endsWith(".zip")) { + files.addAll(FileUtils.unzipFileIntoDirectory(file, directory)); + } else { + files.add(file); + } + } + return files; + } + + /** + * Move `FileItem`s contents into a temporary directory. Return the list of new `File` handles. + */ + public static List saveFileItemsLocally(List fileItems) { + File directory = FileUtils.createScratchDirectory(); + List files = new ArrayList<>(); + for (FileItem fileItem : fileItems) { + files.add(moveFileItemIntoDirectory(fileItem, directory)); + } + return files; + } + + /** + * Save the contents of a `FileItem` in a temporary directory and return the `File`. + */ + public static File saveFileItemLocally(FileItem fileItem) { + return moveFileItemIntoDirectory(fileItem, FileUtils.createScratchDirectory()); + } + + /** + * Move the contents of a `FileItem` to the given directory by calling `write`. `DiskFileItem`s will call `renameTo` + * if the `FileItem's contents are already on the disk. + */ + public static File moveFileItemIntoDirectory(FileItem fileItem, File directory) { + File file = new File(directory, fileItem.getName()); + try { + fileItem.write(file); + } catch (Exception e) { + throw new AnalysisServerException(e, "Error storing file in directory on disk. FileItem.write() failed."); + } + return file; + } } diff --git a/src/main/java/com/conveyal/file/FileUtils.java b/src/main/java/com/conveyal/file/FileUtils.java index f2eb94842..5890d18bd 100644 --- a/src/main/java/com/conveyal/file/FileUtils.java +++ b/src/main/java/com/conveyal/file/FileUtils.java @@ -1,17 +1,13 @@ package com.conveyal.file; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.RandomAccessFile; +import java.io.*; import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; import java.util.zip.GZIPInputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; public abstract class FileUtils { /** @@ -123,4 +119,51 @@ public static boolean isGzip (File file) { throw new RuntimeException(e); } } + + /** + * Extract the files inside a zipped file into a given directory and return the `File` handles to the new files. + */ + public static List unzipFileIntoDirectory(File zipFile, File directory) { + List files = new ArrayList<>(); + ZipInputStream zis = new ZipInputStream(FileUtils.getInputStream(zipFile)); + ZipEntry zipEntry; + byte[] buffer = new byte[1024]; + try { + while ((zipEntry = zis.getNextEntry()) != null) { + String entryName = zipEntry.getName(); + File newFile = new File(directory, entryName); + if (zipEntry.isDirectory()) { + // Skip special `__MACOSX` directories. + if (entryName.toUpperCase().contains("__MACOSX")) continue; + if (!newFile.isDirectory() && !newFile.mkdirs()) { + throw new IOException("Failed to create directory " + newFile); + } + } else { + // Skip file names beginning with a "." + if (Path.of(entryName).getFileName().startsWith(".")) continue; + + // fix for Windows-created archives + File parent = newFile.getParentFile(); + if (!parent.isDirectory() && !parent.mkdirs()) { + throw new IOException("Failed to create directory " + parent); + } + + // write file content + FileOutputStream fos = new FileOutputStream(newFile); + int len; + while ((len = zis.read(buffer)) > 0) { + fos.write(buffer, 0, len); + } + fos.close(); + files.add(newFile); + } + } + zis.closeEntry(); + zis.close(); + } catch (IOException e) { + throw new RuntimeException(e.getMessage()); + } + + return files; + } } diff --git a/src/main/java/com/conveyal/r5/analyst/progress/ProgressInputStream.java b/src/main/java/com/conveyal/r5/analyst/progress/ProgressInputStream.java index bc02502d2..417ae6de9 100644 --- a/src/main/java/com/conveyal/r5/analyst/progress/ProgressInputStream.java +++ b/src/main/java/com/conveyal/r5/analyst/progress/ProgressInputStream.java @@ -1,8 +1,9 @@ package com.conveyal.r5.analyst.progress; -import org.apache.commons.fileupload.FileItem; +import com.conveyal.file.FileUtils; import org.apache.commons.io.input.ProxyInputStream; +import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -13,7 +14,7 @@ * This will report progress as the total number of bytes that have passed through the stream, like CountingInputStream. * This can exceed 100% of the file size if the caller uses mark and reset. The progressListener should be * pre-configured with the total number of bytes expected and a detail message using ProgressListener::beginTask. - * The static method forFileItem() demonstrates usage when reading from a file of known length. + * The static method forFile() demonstrates usage when reading from a file of known length. */ public class ProgressInputStream extends ProxyInputStream { @@ -40,18 +41,10 @@ public synchronized long skip (final long length) throws IOException { /** * Given an uploaded file, report progress on reading it. - * Incrementing the progress seems to introduce some inefficiency when performing unbuffered small reads, such as - * calls to InputStream.read() which are used by DataInputStream to read numbers. - * TODO wrap in buffered input stream to reduce small read calls, or tune to only report once per percentage? */ - public static ProgressInputStream forFileItem (FileItem fileItem, ProgressListener progressListener) { - try { - checkArgument(fileItem.getSize() < Integer.MAX_VALUE); - progressListener.beginTask("Reading file " + fileItem.getName(), (int)(fileItem.getSize())); - return new ProgressInputStream(progressListener, fileItem.getInputStream()); - } catch (IOException e) { - throw new RuntimeException(e); - } + public static ProgressInputStream forFile (File file, ProgressListener progressListener) { + checkArgument(file.length() < Integer.MAX_VALUE); + progressListener.beginTask("Reading file " + file.getName(), (int)(file.length())); + return new ProgressInputStream(progressListener, FileUtils.getInputStream(file)); } - }