From 6ec8a8d3d553546625baa630a377589a3c25e7cd Mon Sep 17 00:00:00 2001 From: Junjie zhang Date: Sun, 30 Jun 2024 18:41:08 -0700 Subject: [PATCH 01/11] Remove cudf::nvtx3 dependency and use chars_begin of string_view --- tools/dlrm_script/dlrm_raw.cu | 4 ++-- tools/dlrm_script/dlrm_raw_utils.hpp | 2 +- tools/dlrm_script/hash/concurrent_unordered_map.cuh | 6 ++---- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/tools/dlrm_script/dlrm_raw.cu b/tools/dlrm_script/dlrm_raw.cu index 1a457098fe..54d666ed7c 100644 --- a/tools/dlrm_script/dlrm_raw.cu +++ b/tools/dlrm_script/dlrm_raw.cu @@ -156,7 +156,7 @@ void process_kaggle_dataset(const std::string &input_dir_path, const std::string if (col.type().id() == cudf::type_id::STRING) { auto str_col = cudf::strings_column_view(col.view()); int64_t num_strings = str_col.size(); - char *char_array = const_cast(str_col.chars().data()); + char *char_array = const_cast(str_col.chars_begin(cudf::get_default_stream())); int32_t *offsets = const_cast(str_col.offsets().data()); build_categorical_index<<>>( @@ -517,7 +517,7 @@ void process_terabyte_dataset(const std::string &input_dir_path, const std::stri if (col.type().id() == cudf::type_id::STRING) { auto str_col = cudf::strings_column_view(col.view()); int64_t num_strings = str_col.size(); - char *char_array = const_cast(str_col.chars().data()); + char *char_array = const_cast(str_col.chars_begin(cudf::get_default_stream())); int32_t *offsets = const_cast(str_col.offsets().data()); build_categorical_index<<>>( diff --git a/tools/dlrm_script/dlrm_raw_utils.hpp b/tools/dlrm_script/dlrm_raw_utils.hpp index 5f21102c90..821f644ffe 100644 --- a/tools/dlrm_script/dlrm_raw_utils.hpp +++ b/tools/dlrm_script/dlrm_raw_utils.hpp @@ -574,7 +574,7 @@ size_t convert_input_binaries(rmm::mr::device_memory_resource *mr, std::string i for (int k = 0; k < num_categoricals; k++) { auto str_col_view = cudf::strings_column_view((col_logs[k + num_numericals]->view())); - char_ptrs.push_back(const_cast(str_col_view.chars().data())); + char_ptrs.push_back(const_cast(str_col_view.chars_begin(cudf::get_default_stream()))); offset_ptrs.push_back(const_cast(str_col_view.offsets().data())); } diff --git a/tools/dlrm_script/hash/concurrent_unordered_map.cuh b/tools/dlrm_script/hash/concurrent_unordered_map.cuh index 2952b76008..c9febba3bb 100644 --- a/tools/dlrm_script/hash/concurrent_unordered_map.cuh +++ b/tools/dlrm_script/hash/concurrent_unordered_map.cuh @@ -18,7 +18,6 @@ #include #include -#include #include #ifndef CUDF_GE_2306 #include @@ -56,8 +55,8 @@ struct packed { using type = void; }; template <> -struct packed { - using type = uint64_t; +struct packed { + using type = unsigned long long; }; template <> struct packed { @@ -170,7 +169,6 @@ class concurrent_unordered_map { const key_type unused_key = std::numeric_limits::max(), const Hasher& hash_function = hasher(), const Equality& equal = key_equal(), const allocator_type& allocator = allocator_type(), cudaStream_t stream = 0) { - CUDF_FUNC_RANGE(); using Self = concurrent_unordered_map; // Note: need `(*p).destroy` instead of `p->destroy` here From fb47eaaf0ba1ef5eb097e57f3a4e759c8e54a0e9 Mon Sep 17 00:00:00 2001 From: Minseok Lee Date: Tue, 9 Jul 2024 22:11:01 -0700 Subject: [PATCH 02/11] Update .nspect-vuln-allowlist.toml --- .nspect-vuln-allowlist.toml | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/.nspect-vuln-allowlist.toml b/.nspect-vuln-allowlist.toml index dede93276b..17ba0d188f 100644 --- a/.nspect-vuln-allowlist.toml +++ b/.nspect-vuln-allowlist.toml @@ -9,6 +9,26 @@ paths = ['third_party/hadoop/*'] comment = 'We do not use and are not planning on using the Hadoop Yarn Web UI' nspect_ids = ['NSPECT-OZP9-WUQA'] +[[oss.excluded.directories]] +paths = ['third_party/hadoop/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-*'] +comment = 'No Use' +nspect_ids = ['NSPECT-OZP9-WUQA'] + +[[oss.excluded.directories]] +paths = ['third_party/hadoop/hadoop-tools/*'] +comment = 'No Use' +nspect_ids = ['NSPECT-OZP9-WUQA'] + +[[oss.excluded.directories]] +paths = ['third_party/hadoop/hadoop-common/*'] +comment = 'No Use' +nspect_ids = ['NSPECT-OZP9-WUQA'] + +[[oss.excluded.directories]] +paths = ['third_party/hadoop/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/*'] +comment = 'No Use' +nspect_ids = ['NSPECT-OZP9-WUQA'] + [[oss.excluded.directories]] paths = ['third_party/rocksdb/docs'] comment = 'The package is for the RocksDB documentation and its hosting on the GitHub pages. We never use it.' From 4ad51c7227feea1a62aa42ba71f557a6a782a4c3 Mon Sep 17 00:00:00 2001 From: Minseok Lee Date: Wed, 10 Jul 2024 22:32:36 -0700 Subject: [PATCH 03/11] Update .nspect-vuln-allowlist.toml --- .nspect-vuln-allowlist.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.nspect-vuln-allowlist.toml b/.nspect-vuln-allowlist.toml index 17ba0d188f..586e754de5 100644 --- a/.nspect-vuln-allowlist.toml +++ b/.nspect-vuln-allowlist.toml @@ -1,4 +1,4 @@ -version = "4.3.0" +version = "24.06" [oss] From 65989a59d86c613124a7a672e162b41b8af5b7a4 Mon Sep 17 00:00:00 2001 From: Minseok Lee Date: Thu, 11 Jul 2024 19:08:34 -0700 Subject: [PATCH 04/11] Update .nspect-vuln-allowlist.toml --- .nspect-vuln-allowlist.toml | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/.nspect-vuln-allowlist.toml b/.nspect-vuln-allowlist.toml index 586e754de5..83966fb88c 100644 --- a/.nspect-vuln-allowlist.toml +++ b/.nspect-vuln-allowlist.toml @@ -4,23 +4,36 @@ version = "24.06" [oss.excluded] +[[oss.excluded.directories]] +paths = ['third_party/hadoop'] +comment = 'No Use' +nspect_ids = ['NSPECT-OZP9-WUQA'] + [[oss.excluded.directories]] paths = ['third_party/hadoop/*'] -comment = 'We do not use and are not planning on using the Hadoop Yarn Web UI' +comment = 'No Use' +nspect_ids = ['NSPECT-OZP9-WUQA'] + +[[oss.excluded.directories]] +paths = ['third_party/hadoop/hadoop-yarn-project/hadoop-yarn', 'third_party/hadoop/hadoop-yarn-project/hadoop-yarn +/hadoop-yarn-api', 'third_party/hadoop/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core', 'third_party/hadoop/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server +/hadoop-yarn-server-timelineservice'] +comment = 'No Use' nspect_ids = ['NSPECT-OZP9-WUQA'] [[oss.excluded.directories]] -paths = ['third_party/hadoop/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-*'] +paths = ['hadoop/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase +/hadoop-yarn-server-timelineservice-hbase-server'] comment = 'No Use' nspect_ids = ['NSPECT-OZP9-WUQA'] [[oss.excluded.directories]] -paths = ['third_party/hadoop/hadoop-tools/*'] +paths = ['third_party/hadoop/hadoop-tools'] comment = 'No Use' nspect_ids = ['NSPECT-OZP9-WUQA'] [[oss.excluded.directories]] -paths = ['third_party/hadoop/hadoop-common/*'] +paths = ['third_party/hadoop/hadoop-common'] comment = 'No Use' nspect_ids = ['NSPECT-OZP9-WUQA'] From 24abe8497caf30be8633748b227ed40044445d05 Mon Sep 17 00:00:00 2001 From: Minseok Lee Date: Thu, 11 Jul 2024 19:11:06 -0700 Subject: [PATCH 05/11] Update .nspect-vuln-allowlist.toml --- .nspect-vuln-allowlist.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.nspect-vuln-allowlist.toml b/.nspect-vuln-allowlist.toml index 83966fb88c..31de24302b 100644 --- a/.nspect-vuln-allowlist.toml +++ b/.nspect-vuln-allowlist.toml @@ -5,12 +5,12 @@ version = "24.06" [oss.excluded] [[oss.excluded.directories]] -paths = ['third_party/hadoop'] +paths = ['third_party/hadoop', 'third_party/hadoop/*'] comment = 'No Use' nspect_ids = ['NSPECT-OZP9-WUQA'] [[oss.excluded.directories]] -paths = ['third_party/hadoop/*'] +paths = ['third_party/hadoop/hadoop-mapreduce-project'] comment = 'No Use' nspect_ids = ['NSPECT-OZP9-WUQA'] From 676c5a87cf183b1e182c49c4362f470e4c336c4e Mon Sep 17 00:00:00 2001 From: Minseok Lee Date: Sun, 14 Jul 2024 18:05:02 -0700 Subject: [PATCH 06/11] Update .nspect-vuln-allowlist.toml --- .nspect-vuln-allowlist.toml | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/.nspect-vuln-allowlist.toml b/.nspect-vuln-allowlist.toml index 31de24302b..cadb49f73a 100644 --- a/.nspect-vuln-allowlist.toml +++ b/.nspect-vuln-allowlist.toml @@ -3,20 +3,35 @@ version = "24.06" [oss] [oss.excluded] - + [[oss.excluded.directories]] -paths = ['third_party/hadoop', 'third_party/hadoop/*'] +paths = ['third_party/hadoop/hadoop-mapreduce-project'] comment = 'No Use' nspect_ids = ['NSPECT-OZP9-WUQA'] [[oss.excluded.directories]] -paths = ['third_party/hadoop/hadoop-mapreduce-project'] +paths = ['third_party/hadoop/hadoop-tools/hadoop-azure'] +comment = 'No Use' +nspect_ids = ['NSPECT-OZP9-WUQA'] + +[[oss.excluded.directories]] +paths = ['third_party/hadoop/hadoop-yarn-project/hadoop-yarn'] +comment = 'No Use' +nspect_ids = ['NSPECT-OZP9-WUQA'] + +[[oss.excluded.directories]] +paths = ['third_party/hadoop/hadoop-yarn-project/hadoop-yarn +/hadoop-yarn-api] +comment = 'No Use' +nspect_ids = ['NSPECT-OZP9-WUQA'] + +[[oss.excluded.directories]] +paths = ['third_party/hadoop/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core'] comment = 'No Use' nspect_ids = ['NSPECT-OZP9-WUQA'] [[oss.excluded.directories]] -paths = ['third_party/hadoop/hadoop-yarn-project/hadoop-yarn', 'third_party/hadoop/hadoop-yarn-project/hadoop-yarn -/hadoop-yarn-api', 'third_party/hadoop/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core', 'third_party/hadoop/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server +paths = ['third_party/hadoop/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server /hadoop-yarn-server-timelineservice'] comment = 'No Use' nspect_ids = ['NSPECT-OZP9-WUQA'] @@ -51,3 +66,8 @@ nspect_ids = ['NSPECT-OZP9-WUQA'] paths = ['third_party/protobuf/*'] comment = 'We never use csharp, java, php, the thir party googletest, etc., inside ptotobuf' nspect_ids = ['NSPECT-OZP9-WUQA'] + +[[oss.excluded.directories]] +paths = ['third_party/hadoop', 'third_party/hadoop/*'] +comment = 'No Use' +nspect_ids = ['NSPECT-OZP9-WUQA'] From 856475654921770ba13fdf703b80a518b7f2c781 Mon Sep 17 00:00:00 2001 From: Matthias Langer Date: Tue, 23 Jul 2024 01:38:15 -0700 Subject: [PATCH 07/11] Remove generic dependency on Hadoop. Just download and compile it as needed. --- .gitlab-ci.yml | 2 +- .gitmodules | 6 -- CMakeLists.txt | 57 ++++++++++++++++--- HugeCTR/src/CMakeLists.txt | 2 +- HugeCTR/src/hps/CMakeLists.txt | 6 +- .../src/inference_benchmark/CMakeLists.txt | 6 +- docs/source/hugectr_contributor_guide.md | 6 +- sbin/install-hadoop.sh | 2 +- third_party/hadoop | 1 - third_party/protobuf | 1 - tools/dockerfiles/Dockerfile.optimized | 5 ++ 11 files changed, 62 insertions(+), 32 deletions(-) delete mode 160000 third_party/hadoop delete mode 160000 third_party/protobuf diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 7960ca8305..b010d230b3 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -115,7 +115,7 @@ build_train_single_node_with_hdfs_minimal: variables: FROM_IMAGE: ${IMAGE_ALL} DST_IMAGE: $TRAIN_IMAGE_VERSIONED_WITH_HDFS_MINI - CMAKE_OPTION: "-DCMAKE_BUILD_TYPE=Release -DKEY_HIT_RATIO=ON -DSM=\"60;61;70;75;80;90\" -DCLANGFORMAT=OFF -DENABLE_HDFS=MINIMAL" + CMAKE_OPTION: "-DCMAKE_BUILD_TYPE=Release -DKEY_HIT_RATIO=ON -DSM=\"60;61;70;75;80;90\" -DCLANGFORMAT=OFF -DENABLE_HDFS=ON" BUILD_HUGECTR: 1 BUILD_HUGECTR2ONNX: 1 diff --git a/.gitmodules b/.gitmodules index a778699774..f45803cf28 100644 --- a/.gitmodules +++ b/.gitmodules @@ -32,12 +32,6 @@ [submodule "third_party/librdkafka"] path = third_party/librdkafka url = https://github.com/edenhill/librdkafka.git -[submodule "third_party/protobuf"] - path = third_party/protobuf - url = https://github.com/protocolbuffers/protobuf.git -[submodule "third_party/hadoop"] - path = third_party/hadoop - url = https://github.com/apache/hadoop.git [submodule "third_party/HierarchicalKV"] path = third_party/HierarchicalKV url = https://github.com/NVIDIA-Merlin/HierarchicalKV.git diff --git a/CMakeLists.txt b/CMakeLists.txt index 63b5ab334a..6e247b964b 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14,6 +14,7 @@ # cmake_minimum_required(VERSION 3.17) + project(HugeCTR LANGUAGES CXX CUDA) list(APPEND CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake/Modules) @@ -351,17 +352,57 @@ add_subdirectory(gpu_cache/src) option(ENABLE_HDFS "Enable HDFS" OFF) if(ENABLE_HDFS) - if(ENABLE_HDFS STREQUAL "MINIMAL") - message("HDFS build mode: Client only") - else() - message("HDFS build mode: Full") - endif() + message(STATUS "HDFS build mode: Client only") set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DENABLE_HDFS") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_HDFS") + + set(FETCHCONTENT_QUIET OFF) + + # Java. + if (NOT EXISTS /usr/bin/mvn) + execute_process(WORKING_DIRECTORY "${CMAKE_BINARY_DIR}" + COMMAND /bin/bash ${PROJECT_SOURCE_DIR}/sbin/install-jdk-and-maven.sh + COMMAND_ERROR_IS_FATAL ANY + ) + endif() - # Build and Install Hadoop - include(SetupHadoop) - hadoop_setup(${ENABLE_HDFS}) + # Hadoop. + # sudo apt install libboost-date-time-dev + # sudo apt install libboost-program-options-dev + # sudo apt install libprotobuf-dev + # sudo apt install libfuse-dev + # sudo apt install libprotoc-dev + FetchContent_Declare(hadoop + DOWNLOAD_COMMAND git clone + --branch rel/release-3.4.0 + --depth 1 + --progress https://github.com/apache/hadoop.git + "${CMAKE_BINARY_DIR}/_deps/hadoop-src" + ) + FetchContent_Populate(hadoop) + set(hadoop_SOURCE_DIR "${hadoop_SOURCE_DIR}/hadoop-hdfs-project/hadoop-hdfs-native-client") + set(hadoop_BINARY_DIR "${hadoop_SOURCE_DIR}/target/hadoop-hdfs-native-client-3.4.0") + if(EXISTS ${hadoop_BINARY_DIR}/include/hdfs.h AND EXISTS ${hadoop_BINARY_DIR}/lib/native/libhdfs.a) + message(STATUS "Found hdfs library in ${hadoop_BINARY_DIR}") + else() + execute_process(WORKING_DIRECTORY "${hadoop_SOURCE_DIR}" + COMMAND mvn clean package + -Pdist,native + -DskipTests + -Dtar + -Dmaven.javadoc.skip=true + -Drequire.snappy + -Drequire.zstd + -Drequire.openssl + -Drequire.pmdk + COMMAND_ERROR_IS_FATAL ANY + ) + endif() + set(FETCHCONTENT_QUIET ON) + + include_directories("${hadoop_BINARY_DIR}/include") + link_directories("${hadoop_BINARY_DIR}/lib/native") + set(ENABLE_HDFS ON) endif() diff --git a/HugeCTR/src/CMakeLists.txt b/HugeCTR/src/CMakeLists.txt index 66be2d3b9f..6a6158ea9f 100755 --- a/HugeCTR/src/CMakeLists.txt +++ b/HugeCTR/src/CMakeLists.txt @@ -67,7 +67,7 @@ target_link_libraries(huge_ctr_shared PRIVATE nlohmann_json::nlohmann_json) target_link_libraries(huge_ctr_shared PUBLIC gpu_cache) if(ENABLE_HDFS) - target_link_libraries(huge_ctr_shared PUBLIC ${DB_LIB_PATHS}/libhdfs.so) + target_link_libraries(huge_ctr_shared PUBLIC hdfs) endif() if(ENABLE_S3) diff --git a/HugeCTR/src/hps/CMakeLists.txt b/HugeCTR/src/hps/CMakeLists.txt index db9a9b28a9..a481e9ca86 100644 --- a/HugeCTR/src/hps/CMakeLists.txt +++ b/HugeCTR/src/hps/CMakeLists.txt @@ -36,11 +36,7 @@ add_compile_definitions(LIBCUDACXX_ENABLE_EXPERIMENTAL_MEMORY_RESOURCE) add_library(huge_ctr_hps SHARED ${huge_ctr_hps_src}) if(ENABLE_HDFS) - target_link_libraries( - huge_ctr_hps - PUBLIC - ${DB_LIB_PATHS}/libhdfs.so # from Hugectr - ) + target_link_libraries(huge_ctr_hps PUBLIC hdfs) endif() if(ENABLE_S3) diff --git a/HugeCTR/src/inference_benchmark/CMakeLists.txt b/HugeCTR/src/inference_benchmark/CMakeLists.txt index bd7add40da..5873701c10 100644 --- a/HugeCTR/src/inference_benchmark/CMakeLists.txt +++ b/HugeCTR/src/inference_benchmark/CMakeLists.txt @@ -20,11 +20,7 @@ file(GLOB hps_benchmark_src ) if(ENABLE_HDFS) - target_link_libraries( - huge_ctr_inference - PUBLIC - ${DB_LIB_PATHS}/libhdfs.so # from Hugectr - ) + target_link_libraries(huge_ctr_inference PUBLIC hdfs) endif() if(ENABLE_S3) diff --git a/docs/source/hugectr_contributor_guide.md b/docs/source/hugectr_contributor_guide.md index 50b8fe2a90..431a5341f1 100755 --- a/docs/source/hugectr_contributor_guide.md +++ b/docs/source/hugectr_contributor_guide.md @@ -104,10 +104,10 @@ To build HugeCTR Training Container from source, do the following: - **ENABLE_INFERENCE**: You can use this option to build HugeCTR in inference mode, which was designed for the inference framework. In this mode, an inference shared library will be built for the HugeCTR Backend. Only interfaces that support the HugeCTR Backend can be used. Therefore, you can’t train models in this mode. This option is set to OFF by default. For building inference container, please refer to [Build HugeCTR Inference Container from Source](#build-hugectr-inference-container-from-source) - - **ENABLE_HDFS**: You can use this option to build HugeCTR together with HDFS to enable HDFS related functions. Permissible values are `ON`, `MINIMAL` and `OFF` *(default)*. Setting this option to `ON` leads to building all necessary Hadoop modules that are required for building AND running both HugeCTR and HDFS. In contrast, `MINIMAL` restricts building only the minimum necessary set of components for building HugeCTR. + - **ENABLE_HDFS**: You can use this option to build HugeCTR together with HDFS to enable HDFS related functions. Permissible values are `ON` and `OFF` *(default)*. Setting this option to `ON` leads to building all necessary Hadoop modules that are required for building so that it can connect to HDFS deployments. - **ENABLE_S3**: You can use this option to build HugeCTR together with Amazon AWS S3 SDK to enable S3 related functions. Permissible values are `ON` and `OFF` *(default)*. Setting this option to `ON` leads to building all necessary AWS SKKs and dependencies that are required for building AND running both HugeCTR and S3. - **Please note that setting DENABLE_HDFS=ON/MINIMAL or DENABLE_S3=ON requires root permission. So before using these two options to do the customized building, make sure you use `-u root` when you run the docker container.** + **Please note that setting DENABLE_HDFS=ON or DENABLE_S3=ON requires root permission. So before using these two options to do the customized building, make sure you use `-u root` when you run the docker container.** Here are some examples of how you can build HugeCTR using these build options: ```shell @@ -124,7 +124,7 @@ To build HugeCTR Training Container from source, do the following: ```shell $ mkdir -p build && cd build - $ cmake -DCMAKE_BUILD_TYPE=Release -DSM="70;80" -DENABLE_HDFS=MINIMAL .. # Target is NVIDIA V100 / A100 with only minimum HDFS components mode on. + $ cmake -DCMAKE_BUILD_TYPE=Release -DSM="70;80" -DENABLE_HDFS=ON .. # Target is NVIDIA V100 / A100 with HDFS components mode on. $ make -j && make install ``` diff --git a/sbin/install-hadoop.sh b/sbin/install-hadoop.sh index a16905d479..d7c4660894 100755 --- a/sbin/install-hadoop.sh +++ b/sbin/install-hadoop.sh @@ -40,7 +40,7 @@ if [[ ! -f "${HADOOP_HOME}/include/hdfs.h" ]]; then cp hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h ${HADOOP_HOME}/include fi -# Cleanup reundant files. +# Cleanup redundant files. for f in $(find ${HADOOP_HOME} -name *.cmd); do rm -rf $f done diff --git a/third_party/hadoop b/third_party/hadoop deleted file mode 160000 index a585a73c3e..0000000000 --- a/third_party/hadoop +++ /dev/null @@ -1 +0,0 @@ -Subproject commit a585a73c3e02ac62350c136643a5e7f6095a3dbb diff --git a/third_party/protobuf b/third_party/protobuf deleted file mode 160000 index 22d0e265de..0000000000 --- a/third_party/protobuf +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 22d0e265de7d2b3d2e9a00d071313502e7d4cccf diff --git a/tools/dockerfiles/Dockerfile.optimized b/tools/dockerfiles/Dockerfile.optimized index 415b797e1e..7522db9eba 100644 --- a/tools/dockerfiles/Dockerfile.optimized +++ b/tools/dockerfiles/Dockerfile.optimized @@ -26,6 +26,11 @@ ARG RELEASE=true RUN apt-get update -y && \ DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \ + libboost-date-time-dev \ + libboost-program-options-dev \ + libprotobuf-dev \ + libprotoc-dev \ + libfuse-dev \ clang-format \ libtbb-dev \ libaio-dev && \ From 4b989ecf2e2ec4a057bff6383507ebb0b73dbb7a Mon Sep 17 00:00:00 2001 From: Xavier Simmons Date: Wed, 12 Jun 2024 18:12:55 -0700 Subject: [PATCH 08/11] Query system page size for direct I/O --- .../data_readers/multi_hot/detail/aio_context.hpp | 3 ++- .../data_readers/multi_hot/detail/aio_context.cpp | 12 ++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/HugeCTR/include/data_readers/multi_hot/detail/aio_context.hpp b/HugeCTR/include/data_readers/multi_hot/detail/aio_context.hpp index ab05b31fa0..408636ac7a 100644 --- a/HugeCTR/include/data_readers/multi_hot/detail/aio_context.hpp +++ b/HugeCTR/include/data_readers/multi_hot/detail/aio_context.hpp @@ -36,10 +36,11 @@ class AIOContext : public IOContext { size_t io_depth_ = 0; size_t num_inflight_ = 0; + size_t alignment_ = 0; io_context_t ctx_ = 0; std::vector tmp_events_; // prevent dynamic memory allocation std::vector iocb_buffer_; std::queue free_cbs_; }; -} // namespace HugeCTR \ No newline at end of file +} // namespace HugeCTR diff --git a/HugeCTR/src/data_readers/multi_hot/detail/aio_context.cpp b/HugeCTR/src/data_readers/multi_hot/detail/aio_context.cpp index d9a8c6ee94..29cb99b73d 100644 --- a/HugeCTR/src/data_readers/multi_hot/detail/aio_context.cpp +++ b/HugeCTR/src/data_readers/multi_hot/detail/aio_context.cpp @@ -35,6 +35,12 @@ AIOContext::AIOContext(size_t io_depth) : io_depth_(io_depth), iocb_buffer_(io_d if (io_queue_init(io_depth, &ctx_) < 0) { throw std::runtime_error("io_queue_init failed"); } + + long page_size = sysconf(_SC_PAGESIZE); + if (page_size == -1) { + throw std::runtime_error("sysconf failed to return page size."); + } + alignment_ = static_cast(page_size); } AIOContext::~AIOContext() { @@ -118,8 +124,6 @@ IOError AIOContext::errno_to_enum(int err) { } } -size_t AIOContext::get_alignment() const { - return 4096; // O_DIRECT requirement -} +size_t AIOContext::get_alignment() const { return alignment_; } -} // namespace HugeCTR \ No newline at end of file +} // namespace HugeCTR From 743e2c748400038743a52ff1b67e42ce83b9471c Mon Sep 17 00:00:00 2001 From: Hui Kang Date: Mon, 5 Aug 2024 19:30:05 -0700 Subject: [PATCH 09/11] Modify the SOK code to fit tensorflow2.16 --- sparse_operation_kit/CMakeLists.txt | 7 +++++++ sparse_operation_kit/ReadMe.md | 2 +- .../impl/core_impl/gpu_resource_impl.hpp | 18 +++++++++++++++++- 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/sparse_operation_kit/CMakeLists.txt b/sparse_operation_kit/CMakeLists.txt index 9c6de27b0e..91f458d0c8 100644 --- a/sparse_operation_kit/CMakeLists.txt +++ b/sparse_operation_kit/CMakeLists.txt @@ -32,6 +32,8 @@ if (NOT TF_RESULT) list(GET TF_VERSION_LIST 0 TF_VERSION_MAJOR) list(GET TF_VERSION_LIST 1 TF_VERSION_MINOR) list(GET TF_VERSION_LIST 2 TF_VERSION_PATCH) + message(STATUS "TF_VERSION_MAJOR = ${TF_VERSION_MAJOR}") + message(STATUS "TF_VERSION_MINOR = ${TF_VERSION_MINOR}") if(${TF_VERSION_MAJOR} GREATER 1 AND ${TF_VERSION_MINOR} GREATER 9) add_definitions(-DTF_GE_210) set_property(GLOBAL PROPERTY SOK_CXX_STANDARD_PROPERTY cxx_std_17) @@ -51,6 +53,11 @@ if (NOT TF_RESULT) if(${TF_VERSION_MAJOR} GREATER 1 AND ${TF_VERSION_MINOR} GREATER 11) add_definitions(-DTF_GE_212) endif() + + + if(${TF_VERSION_MAJOR} GREATER 1 AND ${TF_VERSION_MINOR} GREATER 15) + add_definitions(-DTF_GE_216) + endif() else() message(FATAL_ERROR "Can not detect tensorflow in your environment,please install tensorflow(tf1 support version 1.15, for tf2 support version 2.60~latest) ") endif() diff --git a/sparse_operation_kit/ReadMe.md b/sparse_operation_kit/ReadMe.md index 99e710ec04..08d3adbbc4 100644 --- a/sparse_operation_kit/ReadMe.md +++ b/sparse_operation_kit/ReadMe.md @@ -87,7 +87,7 @@ You can also build the SOK module from source code. Here are the steps to follow ### Pre-requisites ### CUDA Version:>= 11.2 -TF2 Version:2.6.0~2.14.0 +TF2 Version:2.6.0~2.16.0 TF1 Version:1.15 diff --git a/sparse_operation_kit/kit_src/lookup/impl/core_impl/gpu_resource_impl.hpp b/sparse_operation_kit/kit_src/lookup/impl/core_impl/gpu_resource_impl.hpp index e16b7fac8a..b8d3e90844 100644 --- a/sparse_operation_kit/kit_src/lookup/impl/core_impl/gpu_resource_impl.hpp +++ b/sparse_operation_kit/kit_src/lookup/impl/core_impl/gpu_resource_impl.hpp @@ -27,6 +27,10 @@ #include "tensorflow/core/common_runtime/gpu_device_context.h" #endif +#ifdef TF_GE_216 +#include "tensorflow/c/experimental/stream_executor/stream_executor_internal.h" +#endif + #include "tensorflow/core/framework/device_base.h" #include "tensorflow/core/framework/op_kernel.h" #include "tensorflow/core/platform/stream_executor.h" @@ -49,6 +53,16 @@ class GPUResource final : public GPUResourceBase { LOG(FATAL) << "Get DeviceContext fail! please check OpKernel running on GPU."; } const GPUDeviceContext *gpu_dc = static_cast(dc); + +#ifdef TF_GE_216 + cudaStream_t stream = + reinterpret_cast(gpu_dc->stream()->platform_specific_handle().stream); + + if (!stream) { + LOG(FATAL) << "Get default CUDA stream fail!"; + } + stream_map_[current_stream_name_] = stream; +#else cudaStream_t *stream = reinterpret_cast(gpu_dc->stream()->implementation()->GpuStreamMemberHack()); @@ -62,6 +76,8 @@ class GPUResource final : public GPUResourceBase { LOG(FATAL) << "Get default CUDA stream fail!"; } stream_map_[current_stream_name_] = *stream; + +#endif } void set_stream(const std::string &name) override { current_stream_name_ = name; } @@ -84,4 +100,4 @@ class GPUResource final : public GPUResourceBase { std::string current_stream_name_; std::unordered_map stream_map_; }; -} // namespace tf_internal \ No newline at end of file +} // namespace tf_internal From 579518843ab0169cfaa649377eae4b477beba717 Mon Sep 17 00:00:00 2001 From: Emma Qiao Date: Wed, 7 Aug 2024 00:17:27 -0700 Subject: [PATCH 10/11] Fix path in nspect config --- .nspect-vuln-allowlist.toml | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/.nspect-vuln-allowlist.toml b/.nspect-vuln-allowlist.toml index cadb49f73a..6efea76379 100644 --- a/.nspect-vuln-allowlist.toml +++ b/.nspect-vuln-allowlist.toml @@ -20,25 +20,22 @@ comment = 'No Use' nspect_ids = ['NSPECT-OZP9-WUQA'] [[oss.excluded.directories]] -paths = ['third_party/hadoop/hadoop-yarn-project/hadoop-yarn -/hadoop-yarn-api] +paths = ['third_party/hadoop/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api] comment = 'No Use' nspect_ids = ['NSPECT-OZP9-WUQA'] [[oss.excluded.directories]] -paths = ['third_party/hadoop/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core'] +paths = ['third_party/hadoop/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core'] comment = 'No Use' nspect_ids = ['NSPECT-OZP9-WUQA'] [[oss.excluded.directories]] -paths = ['third_party/hadoop/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server -/hadoop-yarn-server-timelineservice'] +paths = ['third_party/hadoop/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice'] comment = 'No Use' nspect_ids = ['NSPECT-OZP9-WUQA'] [[oss.excluded.directories]] -paths = ['hadoop/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase -/hadoop-yarn-server-timelineservice-hbase-server'] +paths = ['hadoop/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server'] comment = 'No Use' nspect_ids = ['NSPECT-OZP9-WUQA'] From c3b186ac057dfcbd0bc9e7fe83d0added39d3613 Mon Sep 17 00:00:00 2001 From: Emma Qiao Date: Tue, 17 Sep 2024 21:47:02 -0700 Subject: [PATCH 11/11] [github]low frequency filter --- .../embedding/all2all_embedding_collection.cu | 126 +++++++++++++- .../all2all_embedding_collection.hpp | 5 +- HugeCTR/embedding/embedding_table.hpp | 10 +- .../impl/embedding_collection_adapter.cu | 50 ++++++ .../impl/embedding_collection_adapter.h | 4 + .../lookup/kernels/embedding_collection.cc | 6 +- .../lookup/ops/embedding_collection.cc | 9 + .../kit_src/variable/impl/det_variable.cu | 6 + .../kit_src/variable/impl/det_variable.h | 3 + .../kit_src/variable/impl/hkv_variable.cu | 34 +++- .../kit_src/variable/impl/hkv_variable.h | 5 +- .../kit_src/variable/impl/variable_base.cu | 9 +- .../kit_src/variable/impl/variable_base.h | 2 + .../sparse_operation_kit/lookup.py | 20 ++- .../lookup_sparse_hkv_low_frequency_test.py | 163 ++++++++++++++++++ 15 files changed, 435 insertions(+), 17 deletions(-) create mode 100644 sparse_operation_kit/sparse_operation_kit/test/function_test/tf2/lookup/lookup_sparse_hkv_low_frequency_test.py diff --git a/HugeCTR/embedding/all2all_embedding_collection.cu b/HugeCTR/embedding/all2all_embedding_collection.cu index b585221852..63acc5ee29 100644 --- a/HugeCTR/embedding/all2all_embedding_collection.cu +++ b/HugeCTR/embedding/all2all_embedding_collection.cu @@ -206,7 +206,7 @@ void weighted_sparse_forward_per_gpu( const core23::Tensor &sp_weights_all_gather_recv_buffer, ILookup *emb_storage, std::vector &emb_vec_model_buffer, int64_t *num_model_key, int64_t *num_model_offsets, core23::Tensor &ret_model_key, core23::Tensor &ret_model_offset, - core23::Tensor &ret_sp_weight) { + core23::Tensor &ret_sp_weight, bool use_filter) { HugeCTR::CudaDeviceContext context(core->get_device_id()); int tensor_device_id = core->get_device_id(); @@ -369,6 +369,79 @@ void weighted_sparse_forward_per_gpu( *num_model_offsets = model_offsets.num_elements(); } +template +__global__ void cal_lookup_idx(size_t lookup_num, offset_t *bucket_after_filter, size_t batch_size, + offset_t *lookup_offset, size_t bucket_num) { + int32_t i = blockIdx.x * blockDim.x + threadIdx.x; + int32_t step = blockDim.x * gridDim.x; + for (; i < (lookup_num); i += step) { + lookup_offset[i] = bucket_after_filter[i * batch_size]; + } +} + +template +__global__ void count_ratio_filter(size_t bucket_num, char *filtered, const offset_t *bucket_range, + offset_t *bucket_after_filter) { + int32_t i = blockIdx.x * blockDim.x + threadIdx.x; + int32_t step = blockDim.x * gridDim.x; + for (; i < (bucket_num); i += step) { + offset_t start = bucket_range[i]; + offset_t end = bucket_range[i + 1]; + bucket_after_filter[i + 1] = 0; + for (offset_t idx = start; idx < end; idx++) { + if (filtered[idx] == 1) { + bucket_after_filter[i + 1]++; + } + } + if (i == 0) { + bucket_after_filter[i] = 0; + } + } +} + +void filter(std::shared_ptr core, + const UniformModelParallelEmbeddingMeta &meta, const core23::Tensor &filtered, + core23::Tensor &bucket_range, core23::Tensor &bucket_after_filter, + core23::TensorParams ¶ms, EmbeddingInput &emb_input, core23::Tensor &lookup_offset, + core23::Tensor &temp_scan_storage, core23::Tensor &temp_select_storage, + size_t temp_scan_bytes, size_t temp_select_bytes, core23::Tensor &keys_after_filter) { + auto stream = core->get_local_gpu()->get_stream(); + // bucket_range length = bucket_num+1 , so here we minus 1. + int bucket_num = bucket_range.num_elements() - 1; + const int block_size = 256; + const int grid_size = + core->get_kernel_param().num_sms * core->get_kernel_param().max_thread_per_block / block_size; + + DISPATCH_INTEGRAL_FUNCTION_CORE23(bucket_range.data_type().type(), offset_t, [&] { + DISPATCH_INTEGRAL_FUNCTION_CORE23(keys_after_filter.data_type().type(), key_t, [&] { + offset_t *bucket_after_filter_ptr = bucket_after_filter.data(); + const offset_t *bucket_range_ptr = bucket_range.data(); + char *filterd_ptr = filtered.data(); + count_ratio_filter<<>>( + bucket_num, filterd_ptr, bucket_range_ptr, bucket_after_filter_ptr); + cub::DeviceScan::InclusiveSum( + temp_scan_storage.data(), temp_scan_bytes, bucket_after_filter.data(), + bucket_after_filter.data(), bucket_after_filter.num_elements(), stream); + + key_t *keys_ptr = emb_input.keys.data(); + + cub::DeviceSelect::Flagged(temp_select_storage.data(), temp_select_bytes, keys_ptr, + filterd_ptr, keys_after_filter.data(), + emb_input.num_keys.data(), emb_input.h_num_keys, stream); + + size_t batch_size = (bucket_num) / meta.num_lookup_; + + cal_lookup_idx<<<1, block_size, 0, stream>>>(meta.num_lookup_ + 1, + bucket_after_filter.data(), batch_size, + lookup_offset.data(), bucket_num); + HCTR_LIB_THROW(cudaStreamSynchronize(stream)); + emb_input.h_num_keys = static_cast(emb_input.num_keys.data()[0]); + emb_input.keys = keys_after_filter; + emb_input.bucket_range = bucket_after_filter; + }); + }); +} + void sparse_forward_per_gpu(std::shared_ptr core, const EmbeddingCollectionParam &ebc_param, const UniformModelParallelEmbeddingMeta &meta, @@ -376,7 +449,8 @@ void sparse_forward_per_gpu(std::shared_ptr core, const core23::Tensor &row_lengths_all_gather_recv_buffer, ILookup *emb_storage, std::vector &emb_vec_model_buffer, int64_t *num_model_key, int64_t *num_model_offsets, - core23::Tensor *ret_model_key, core23::Tensor *ret_model_offset) { + core23::Tensor *ret_model_key, core23::Tensor *ret_model_offset, + bool use_filter) { /* There are some steps in this function: 1.reorder key to feature major @@ -500,8 +574,56 @@ void sparse_forward_per_gpu(std::shared_ptr core, compress_offset_.compute(embedding_input.bucket_range, batch_size, &num_key_per_lookup_offset); HCTR_LIB_THROW(cudaStreamSynchronize(stream)); + if (use_filter) { + core23::Tensor bucket_range_after_filter; + core23::Tensor keys_after_filter; + core23::Tensor filtered; + + filtered = core23::Tensor( + params.shape({(int64_t)embedding_input.h_num_keys}).data_type(core23::ScalarType::Char)); + bucket_range_after_filter = + core23::Tensor(params.shape({embedding_input.bucket_range.num_elements()}) + .data_type(embedding_input.bucket_range.data_type().type())); + keys_after_filter = core23::Tensor(params.shape({(int64_t)embedding_input.h_num_keys + 1}) + .data_type(embedding_input.keys.data_type().type())); + + core23::Tensor temp_scan_storage; + core23::Tensor temp_select_storage; + + size_t temp_scan_bytes = 0; + size_t temp_select_bytes = 0; + + DISPATCH_INTEGRAL_FUNCTION_CORE23( + embedding_input.bucket_range.data_type().type(), offset_t, [&] { + DISPATCH_INTEGRAL_FUNCTION_CORE23(embedding_input.keys.data_type().type(), key_t, [&] { + cub::DeviceScan::InclusiveSum(nullptr, temp_scan_bytes, (offset_t *)nullptr, + (offset_t *)nullptr, + bucket_range_after_filter.num_elements()); + + temp_scan_storage = core23::Tensor(params.shape({static_cast(temp_scan_bytes)}) + .data_type(core23::ScalarType::Char)); + + cub::DeviceSelect::Flagged(nullptr, temp_select_bytes, (key_t *)nullptr, + (char *)nullptr, (key_t *)nullptr, (uint64_t *)nullptr, + embedding_input.h_num_keys); + + temp_select_storage = + core23::Tensor(params.shape({static_cast(temp_select_bytes)}) + .data_type(core23::ScalarType::Char)); + }); + }); + + emb_storage->ratio_filter(embedding_input.keys, embedding_input.h_num_keys, + num_key_per_lookup_offset, meta.num_local_lookup_ + 1, + meta.d_local_table_id_list_, filtered); + + filter(core, meta, filtered, embedding_input.bucket_range, bucket_range_after_filter, params, + embedding_input, num_key_per_lookup_offset, temp_scan_storage, temp_select_storage, + temp_scan_bytes, temp_select_bytes, keys_after_filter); + } core23::Tensor embedding_vec = core23::init_tensor_list( key_all_gather_recv_buffer.num_elements(), params.device().index()); + emb_storage->lookup(embedding_input.keys, embedding_input.h_num_keys, num_key_per_lookup_offset, meta.num_local_lookup_ + 1, meta.d_local_table_id_list_, embedding_vec); diff --git a/HugeCTR/embedding/all2all_embedding_collection.hpp b/HugeCTR/embedding/all2all_embedding_collection.hpp index 98021c7606..6c97567bc1 100644 --- a/HugeCTR/embedding/all2all_embedding_collection.hpp +++ b/HugeCTR/embedding/all2all_embedding_collection.hpp @@ -57,7 +57,7 @@ void weighted_sparse_forward_per_gpu( const core23::Tensor &sp_weights_all_gather_recv_buffer, ILookup *emb_storage, std::vector &emb_vec_model_buffer, int64_t *num_model_key, int64_t *num_model_offsets, core23::Tensor &ret_model_key, core23::Tensor &ret_model_offset, - core23::Tensor &ret_sp_weight); + core23::Tensor &ret_sp_weight, bool use_filter); void weighted_copy_model_keys_and_offsets( std::shared_ptr core, const core23::Tensor &model_key, @@ -71,7 +71,8 @@ void sparse_forward_per_gpu(std::shared_ptr core, const core23::Tensor &row_lengths_all_gather_recv_buffer, ILookup *emb_storage, std::vector &emb_vec_model_buffer, int64_t *num_model_key, int64_t *num_model_offsets, - core23::Tensor *ret_model_key, core23::Tensor *ret_model_offset); + core23::Tensor *ret_model_key, core23::Tensor *ret_model_offset, + bool use_filter); void copy_model_keys_and_offsets(std::shared_ptr core, const core23::Tensor &model_key, diff --git a/HugeCTR/embedding/embedding_table.hpp b/HugeCTR/embedding/embedding_table.hpp index 3cae66986c..9a28186ed1 100644 --- a/HugeCTR/embedding/embedding_table.hpp +++ b/HugeCTR/embedding/embedding_table.hpp @@ -23,9 +23,13 @@ class ILookup { public: virtual ~ILookup() = default; - virtual void lookup(const core23::Tensor &keys, size_t num_keys, - const core23::Tensor &num_keys_per_table_offset, size_t num_table_offset, - const core23::Tensor &table_id_list, core23::Tensor &embedding_vec) = 0; + virtual void lookup(const core23::Tensor& keys, size_t num_keys, + const core23::Tensor& num_keys_per_table_offset, size_t num_table_offset, + const core23::Tensor& table_id_list, core23::Tensor& embedding_vec) = 0; + + virtual void ratio_filter(const core23::Tensor& keys, size_t num_keys, + const core23::Tensor& id_space_offset, size_t num_id_space_offset, + const core23::Tensor& id_space, core23::Tensor& filtered){}; }; } // namespace embedding diff --git a/sparse_operation_kit/kit_src/lookup/impl/embedding_collection_adapter.cu b/sparse_operation_kit/kit_src/lookup/impl/embedding_collection_adapter.cu index 4886e85b83..95a02307db 100644 --- a/sparse_operation_kit/kit_src/lookup/impl/embedding_collection_adapter.cu +++ b/sparse_operation_kit/kit_src/lookup/impl/embedding_collection_adapter.cu @@ -380,6 +380,56 @@ void DummyVarAdapter::lookup( } } +template +void DummyVarAdapter::ratio_filter( + const core23::Tensor& keys, size_t num_keys, const core23::Tensor& id_space_offset, + size_t num_id_space_offset, const core23::Tensor& id_space, core23::Tensor& filtered) { + // clang-format off + id_space_offset_.clear(); + id_space_.clear(); + id_space_offset_.resize(num_id_space_offset); + CUDACHECK(cudaMemcpyAsync(id_space_offset_.data(), + id_space_offset.data(), + sizeof(OffsetType) * (num_id_space_offset), + cudaMemcpyDeviceToHost, stream_)); + id_space_.resize(num_id_space_offset - 1); + CUDACHECK(cudaMemcpyAsync(id_space_.data(), + id_space.data(), + sizeof(int) * (num_id_space_offset - 1), + cudaMemcpyDeviceToHost, stream_)); + // clang-format on + CUDACHECK(cudaStreamSynchronize(stream_)); + const KeyType* input = keys.data(); + bool* output_filtered = filtered.data(); + int start_index = 0; + size_t num = 0; + bool is_lookup = false; + + for (int i = 0; i < num_id_space_offset - 1; ++i) { + if (i == num_id_space_offset - 2) { + num += id_space_offset_[i + 1] - id_space_offset_[i]; + is_lookup = true; + } else { + if (same_table_[i + 1] != same_table_[i]) { + num += id_space_offset_[i + 1] - id_space_offset_[i]; + is_lookup = true; + } else { + num += id_space_offset_[i + 1] - id_space_offset_[i]; + } + } + if (num != 0 && is_lookup) { + auto var = vars_[id_space_[start_index]]; + var->ratio_filter(input, output_filtered, num, stream_); + CUDACHECK(cudaStreamSynchronize(stream_)); + input += num; + output_filtered += num; + num = 0; + is_lookup = false; + start_index = i + 1; + } + } +} + template class DummyVarAdapter; template class DummyVarAdapter; // template class DummyVarAdapter; diff --git a/sparse_operation_kit/kit_src/lookup/impl/embedding_collection_adapter.h b/sparse_operation_kit/kit_src/lookup/impl/embedding_collection_adapter.h index cf0d69e3a0..1acd7cfcab 100644 --- a/sparse_operation_kit/kit_src/lookup/impl/embedding_collection_adapter.h +++ b/sparse_operation_kit/kit_src/lookup/impl/embedding_collection_adapter.h @@ -88,6 +88,10 @@ class DummyVarAdapter : public ::embedding::ILookup { size_t num_id_space_offset, const core23::Tensor& id_space, core23::Tensor& embedding_vec) override; + void ratio_filter(const core23::Tensor& keys, size_t num_keys, + const core23::Tensor& id_space_offset, size_t num_id_space_offset, + const core23::Tensor& id_space, core23::Tensor& filtered) override; + private: std::shared_ptr tf_backend_; int sm_count_; diff --git a/sparse_operation_kit/kit_src/lookup/kernels/embedding_collection.cc b/sparse_operation_kit/kit_src/lookup/kernels/embedding_collection.cc index 63a4cad2c9..83dbef22df 100644 --- a/sparse_operation_kit/kit_src/lookup/kernels/embedding_collection.cc +++ b/sparse_operation_kit/kit_src/lookup/kernels/embedding_collection.cc @@ -72,6 +72,7 @@ class EmbeddingCollectionBase : public OpKernel { int global_gpu_id_; int num_local_lookups_; bool use_sp_weight_; + bool use_filter_; HugeCTR::core23::KernelParams kernel_params_; std::unique_ptr ebc_param_; @@ -143,6 +144,7 @@ class EmbeddingCollectionBase : public OpKernel { OP_REQUIRES_OK(ctx, ctx->GetAttr("id_in_local_rank", &id_in_local_rank_)); OP_REQUIRES_OK(ctx, ctx->GetAttr("num_gpus", &num_gpus_)); OP_REQUIRES_OK(ctx, ctx->GetAttr("use_sp_weight", &use_sp_weight_)); + OP_REQUIRES_OK(ctx, ctx->GetAttr("use_filter", &use_filter_)); // check rank/num_ranks/id_in_local_rank/num_gpus OP_REQUIRES(ctx, rank_ >= 0 && rank_ < num_ranks_, errors::InvalidArgument("Invalid rank.")); @@ -477,13 +479,13 @@ class LookupFowardBase : public EmbeddingCollectionBasemeta_, this->global_gpu_id_, key_recv_buffer_tensor, row_length_recv_buffer_tensor, sp_weight_recv_buffer_tensor, &adapter_, emb_vec_model_buffer, &num_model_key, &num_model_offsets, ret_model_key, ret_model_offset, - ret_sp_weight); + ret_sp_weight,this->use_filter_); } else { ::embedding::tf::model_forward::sparse_forward_per_gpu( tf_backend, *this->ebc_param_, *this->meta_, key_recv_buffer_tensor, row_length_recv_buffer_tensor, &adapter_, emb_vec_model_buffer, &num_model_key, - &num_model_offsets, &ret_model_key, &ret_model_offset); + &num_model_offsets, &ret_model_key, &ret_model_offset,this->use_filter_); } // Prepare model_key & model_offsets diff --git a/sparse_operation_kit/kit_src/lookup/ops/embedding_collection.cc b/sparse_operation_kit/kit_src/lookup/ops/embedding_collection.cc index c3344c1f4f..6c3cd8d2c1 100644 --- a/sparse_operation_kit/kit_src/lookup/ops/embedding_collection.cc +++ b/sparse_operation_kit/kit_src/lookup/ops/embedding_collection.cc @@ -52,6 +52,7 @@ REGISTER_OP("PreprocessingForward") .Attr("id_in_local_rank: int") .Attr("num_gpus: int") .Attr("use_sp_weight: bool") + .Attr("use_filter: bool") .Attr("Tindices: {int32, int64} = DT_INT64") .Attr("Toffsets: {int32, int64} = DT_INT64") .Attr("dtype: {float32, float16} = DT_FLOAT") @@ -80,6 +81,7 @@ REGISTER_OP("PreprocessingForwardWithWeight") .Attr("id_in_local_rank: int") .Attr("num_gpus: int") .Attr("use_sp_weight: bool") + .Attr("use_filter: bool") .Attr("Tindices: {int32, int64} = DT_INT64") .Attr("Toffsets: {int32, int64} = DT_INT64") .Attr("dtype: {float32, float16} = DT_FLOAT") @@ -112,6 +114,7 @@ REGISTER_OP("LookupForward") .Attr("id_in_local_rank: int") .Attr("num_gpus: int") .Attr("use_sp_weight: bool") + .Attr("use_filter: bool") .Attr("Tindices: {int32, int64} = DT_INT64") .Attr("Toffsets: {int32, int64} = DT_INT64") .Attr("dtype: {float32, float16} = DT_FLOAT") @@ -165,6 +168,7 @@ REGISTER_OP("LookupForwardVariable") .Attr("id_in_local_rank: int") .Attr("num_gpus: int") .Attr("use_sp_weight: bool") + .Attr("use_filter: bool") .Attr("Tindices: {int32, int64} = DT_INT64") .Attr("Toffsets: {int32, int64} = DT_INT64") .Attr("dtype: {float32, float16} = DT_FLOAT") @@ -218,6 +222,7 @@ REGISTER_OP("LookupForwardDynamic") .Attr("id_in_local_rank: int") .Attr("num_gpus: int") .Attr("use_sp_weight: bool") + .Attr("use_filter: bool") .Attr("Tindices: {int32, int64} = DT_INT64") .Attr("Toffsets: {int32, int64} = DT_INT64") .Attr("dtype: {float32, float16} = DT_FLOAT") @@ -273,6 +278,7 @@ REGISTER_OP("LookupForwardEmbeddingVarGPU") .Attr("id_in_local_rank: int") .Attr("num_gpus: int") .Attr("use_sp_weight: bool") + .Attr("use_filter: bool") .Attr("Tindices: {int32, int64} = DT_INT64") .Attr("Toffsets: {int32, int64} = DT_INT64") .Attr("dtype: {float32, float16} = DT_FLOAT") @@ -324,6 +330,7 @@ REGISTER_OP("LookupBackward") .Attr("id_in_local_rank: int") .Attr("num_gpus: int") .Attr("use_sp_weight: bool") + .Attr("use_filter: bool") .Attr("Tindices: {int32, int64} = DT_INT64") .Attr("Toffsets: {int32, int64} = DT_INT64") .Attr("dtype: {float32, float16} = DT_FLOAT") @@ -362,6 +369,7 @@ REGISTER_OP("PostprocessingForward") .Attr("id_in_local_rank: int") .Attr("num_gpus: int") .Attr("use_sp_weight: bool") + .Attr("use_filter: bool") .Attr("Tindices: {int32, int64} = DT_INT64") .Attr("Toffsets: {int32, int64} = DT_INT64") .Attr("dtype: {float32, float16} = DT_FLOAT") @@ -403,6 +411,7 @@ REGISTER_OP("PostprocessingBackward") .Attr("id_in_local_rank: int") .Attr("num_gpus: int") .Attr("use_sp_weight: bool") + .Attr("use_filter: bool") .Attr("Tindices: {int32, int64} = DT_INT64") .Attr("Toffsets: {int32, int64} = DT_INT64") .Attr("dtype: {float32, float16} = DT_FLOAT") diff --git a/sparse_operation_kit/kit_src/variable/impl/det_variable.cu b/sparse_operation_kit/kit_src/variable/impl/det_variable.cu index a9c1a84def..cb324aed0f 100644 --- a/sparse_operation_kit/kit_src/variable/impl/det_variable.cu +++ b/sparse_operation_kit/kit_src/variable/impl/det_variable.cu @@ -248,6 +248,12 @@ void DETVariable::scatter_update(const KeyType* keys, const map_->scatter_update(keys, values, num_keys, stream); } +template +void DETVariable::ratio_filter(const KeyType* keys, bool* filtered, + size_t num_keys, cudaStream_t stream) { + throw std::runtime_error("SOK dynamic variable with DET backend don't support ratio_filter!"); +} + template class DETVariable; template class DETVariable; diff --git a/sparse_operation_kit/kit_src/variable/impl/det_variable.h b/sparse_operation_kit/kit_src/variable/impl/det_variable.h index 243308bd7b..065fcaaf13 100644 --- a/sparse_operation_kit/kit_src/variable/impl/det_variable.h +++ b/sparse_operation_kit/kit_src/variable/impl/det_variable.h @@ -56,10 +56,13 @@ class DETVariable : public VariableBase { cudaStream_t stream = 0) override; void scatter_update(const KeyType *keys, const ValueType *values, size_t num_keys, cudaStream_t stream = 0) override; + void ratio_filter(const KeyType *keys, bool *filtered, size_t num_keys, + cudaStream_t stream = 0) override; private: std::unique_ptr> map_; + float filter_ratio_; size_t dimension_; size_t initial_capacity_; std::string initializer_; diff --git a/sparse_operation_kit/kit_src/variable/impl/hkv_variable.cu b/sparse_operation_kit/kit_src/variable/impl/hkv_variable.cu index 9a9fa2ab22..919ea561fd 100644 --- a/sparse_operation_kit/kit_src/variable/impl/hkv_variable.cu +++ b/sparse_operation_kit/kit_src/variable/impl/hkv_variable.cu @@ -276,12 +276,13 @@ HKVVariable::HKVVariable(int64_t dimension, int64_t initial_ size_t max_hbm_for_vectors, size_t max_bucket_size, float max_load_factor, int block_size, int device_id, bool io_by_cpu, const std::string& evict_strategy, - cudaStream_t stream) + cudaStream_t stream, float filter_ratio) : dimension_(dimension), initial_capacity_(initial_capacity), initializer_(initializer), stream_(stream), - curand_states_(nullptr) { + curand_states_(nullptr), + filter_ratio_(filter_ratio) { if (dimension_ <= 0) { throw std::invalid_argument("dimension must > 0 but got " + std::to_string(dimension)); } @@ -559,5 +560,34 @@ void HKVVariable::scatter_update(const KeyType* keys, const CUDACHECK(cudaStreamSynchronize(stream)); } +template +__global__ void ratio_filter_flag(curandState* state, const KeyType *keys, bool *filtered, size_t num_keys, float filter_ratio) { + int idx = blockIdx.x * blockDim.x + threadIdx.x; + curandState localState; + localState = state[GlobalThreadId()]; + for (int i = idx; i < num_keys; i += blockDim.x * gridDim.x) { + if (!filtered[i]) { + auto ratio = curand_uniform(&localState); + if (ratio < filter_ratio) { + filtered[i] = true; + } + } + } + state[GlobalThreadId()] = localState; +} +template +void HKVVariable::ratio_filter(const KeyType *keys, bool *filtered, + size_t num_keys, cudaStream_t stream) { + // TODO: update hkv, use exist; + ValueType** p_values; + CUDACHECK(cudaMallocAsync(&p_values, num_keys * sizeof(ValueType*),stream)); + hkv_table_->find(num_keys, keys, p_values, filtered, nullptr, stream); + uint32_t grid_dim = SM_NUM * (NTHREAD_PER_SM/256); + // filter + ratio_filter_flag<<>>(curand_states_, keys, filtered, num_keys, filter_ratio_); + CUDACHECK(cudaFreeAsync(p_values,stream)); + //CUDACHECK(cudaStreamSynchronize(stream)); +} + template class HKVVariable; } // namespace sok diff --git a/sparse_operation_kit/kit_src/variable/impl/hkv_variable.h b/sparse_operation_kit/kit_src/variable/impl/hkv_variable.h index 631d57b3be..a831a04950 100644 --- a/sparse_operation_kit/kit_src/variable/impl/hkv_variable.h +++ b/sparse_operation_kit/kit_src/variable/impl/hkv_variable.h @@ -33,7 +33,7 @@ class HKVVariable : public VariableBase { size_t max_capacity = 0, size_t max_hbm_for_vectors = 0, size_t max_bucket_size = 128, float max_load_factor = 0.5f, int block_size = 128, int device_id = 0, bool io_by_cpu = false, const std::string &evict_strategy = "kLru", - cudaStream_t stream = 0); + cudaStream_t stream = 0, float filter_ratio = 1.0); ~HKVVariable() override; int64_t rows() override; @@ -61,12 +61,15 @@ class HKVVariable : public VariableBase { cudaStream_t stream = 0) override; void scatter_update(const KeyType *keys, const ValueType *values, size_t num_keys, cudaStream_t stream = 0) override; + void ratio_filter(const KeyType *keys, bool *filtered, size_t num_keys, + cudaStream_t stream = 0) override; private: using HKVTable = nv::merlin::HashTable; std::unique_ptr hkv_table_ = std::make_unique(); nv::merlin::HashTableOptions hkv_table_option_; + float filter_ratio_; size_t dimension_; size_t initial_capacity_; std::string initializer_; diff --git a/sparse_operation_kit/kit_src/variable/impl/variable_base.cu b/sparse_operation_kit/kit_src/variable/impl/variable_base.cu index 7c89ea13e3..1b2eb5574c 100644 --- a/sparse_operation_kit/kit_src/variable/impl/variable_base.cu +++ b/sparse_operation_kit/kit_src/variable/impl/variable_base.cu @@ -87,9 +87,16 @@ std::shared_ptr> VariableFactory::create( if (evict_strategy_it != config_json.end()) { evict_strategy = io_by_cpu_it->get(); } + // When we encounter a feature that is not already in our model, we only add it to + // the model with probability p. + float filter_ratio = 1.0f; ///< low_frequency_filter probability p. default 100% + auto filter_ratio_it = config_json.find("filter_ratio"); + if (filter_ratio_it != config_json.end()) { + filter_ratio = filter_ratio_it->get(); + } return std::make_shared>( cols, init_capacity, initializer, max_capacity, max_hbm_for_vectors, max_bucket_size, - max_load_factor, block_size, device_id, io_by_cpu, evict_strategy, stream); + max_load_factor, block_size, device_id, io_by_cpu, evict_strategy, stream, filter_ratio); } } template <> diff --git a/sparse_operation_kit/kit_src/variable/impl/variable_base.h b/sparse_operation_kit/kit_src/variable/impl/variable_base.h index 544ecd0571..63994105f7 100644 --- a/sparse_operation_kit/kit_src/variable/impl/variable_base.h +++ b/sparse_operation_kit/kit_src/variable/impl/variable_base.h @@ -57,6 +57,8 @@ class VariableBase { cudaStream_t stream = 0) = 0; virtual void scatter_update(const KeyType *keys, const ValueType *values, size_t num_keys, cudaStream_t stream = 0) = 0; + virtual void ratio_filter(const KeyType *keys, bool *filtered, size_t num_keys, + cudaStream_t stream = 0) = 0; }; class VariableFactory { diff --git a/sparse_operation_kit/sparse_operation_kit/lookup.py b/sparse_operation_kit/sparse_operation_kit/lookup.py index 6831fd919e..965eccc049 100644 --- a/sparse_operation_kit/sparse_operation_kit/lookup.py +++ b/sparse_operation_kit/sparse_operation_kit/lookup.py @@ -232,6 +232,7 @@ def _LookupBackward(op, *top_grads): "id_in_local_rank", # "Toffsets", "use_sp_weight", + "use_filter", ] kwargs = {} for attr in attr_list: @@ -268,6 +269,7 @@ def _LookupBackward(op, *top_grads): "id_in_local_rank", # "Toffsets", "use_sp_weight", + "use_filter", ] kwargs = {} for attr in attr_list: @@ -304,6 +306,7 @@ def _LookupDynamicBackward(op, *top_grads): "id_in_local_rank", # "Toffsets", "use_sp_weight", + "use_filter", ] kwargs = {} for attr in attr_list: @@ -342,6 +345,7 @@ def _LookupBackwardEmbeddingVarGPU(op, *top_grads): "id_in_local_rank", # "Toffsets", "use_sp_weight", + "use_filter", ] kwargs = {} for attr in attr_list: @@ -385,7 +389,8 @@ def _PostprocessingBackward(op, *top_grads): "id_in_local_rank", "num_gpus", "Tindices", - "use_sp_weight" + "use_sp_weight", + "use_filter", # "Toffsets", ] kwargs = {} @@ -412,7 +417,7 @@ def to_list(any_obj): return any_obj -def lookup_sparse_impl(params, sp_ids, sp_weights=None, combiners=None): +def lookup_sparse_impl(params, sp_ids, sp_weights=None, combiners=None, use_filter=False): shard, dimensions = [], [] for param in params: shard.append(param.target_gpu) @@ -467,6 +472,7 @@ def lookup_sparse_impl(params, sp_ids, sp_weights=None, combiners=None): "num_ranks": num_ranks(), "id_in_local_rank": id_in_rank(), "use_sp_weight": use_sp_weight, + "use_filter": use_filter, } # Step1 @@ -529,7 +535,7 @@ def lookup_sparse_impl(params, sp_ids, sp_weights=None, combiners=None): return emb_vec -def lookup_sparse(params, sp_ids, sp_weights=None, combiners=None): +def lookup_sparse(params, sp_ids, sp_weights=None, combiners=None, use_low_frequency_filter=False): """ Abbreviated as ``sok.lookup_sparse``. @@ -552,6 +558,8 @@ def lookup_sparse(params, sp_ids, sp_weights=None, combiners=None): combiners: list, tuple,optional a list or tuple of string to specify the combiner of each lookup,for now only suupport "mean" "sum". if don't specify , indicate all elements(numbero of elements is same with number of sok.Variables) in combiners will should be set to be mean. + use_low_frequency_filter: bool,optional + For new indices that are not in the embedding table, should low-frequency filtering be performed to enter the embedding table Returns ------- @@ -655,7 +663,11 @@ def lookup_sparse(params, sp_ids, sp_weights=None, combiners=None): selected_combiners = [combiners[i] for i in selected_idx] selected_emb_vec = lookup_sparse_impl( - selected_params, selected_sp_ids, selected_sp_weights, selected_combiners + selected_params, + selected_sp_ids, + selected_sp_weights, + selected_combiners, + use_low_frequency_filter, ) for ii, i in enumerate(selected_idx): emb_vec[i] = selected_emb_vec[ii] diff --git a/sparse_operation_kit/sparse_operation_kit/test/function_test/tf2/lookup/lookup_sparse_hkv_low_frequency_test.py b/sparse_operation_kit/sparse_operation_kit/test/function_test/tf2/lookup/lookup_sparse_hkv_low_frequency_test.py new file mode 100644 index 0000000000..7715576bed --- /dev/null +++ b/sparse_operation_kit/sparse_operation_kit/test/function_test/tf2/lookup/lookup_sparse_hkv_low_frequency_test.py @@ -0,0 +1,163 @@ +""" + Copyright (c) 2022, NVIDIA CORPORATION. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + +import time +import numpy as np +import tensorflow as tf +import horovod.tensorflow as hvd +import sparse_operation_kit as sok + +np.set_printoptions(threshold=np.inf) + +if __name__ == "__main__": + hvd.init() + gpus = tf.config.experimental.list_physical_devices("GPU") + for gpu in gpus: + tf.config.experimental.set_memory_growth(gpu, True) + if gpus: + tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], "GPU") + sok.init() + + rows = [8192 * 2048, 8192 * 8192] + cols = [128, 4] + hotness = [1, 1] + combiners = ["sum", "mean"] + batch_size = 8192 + iters = 1 + filter_iters = 5 + initial_vals = [13, 17] + + # sok variables + sok_vars = [ + sok.DynamicVariable( + dimension=cols[i], + var_type="hybrid", + initializer=str(initial_vals[i]), + init_capacity=1024 * 1024, + max_capacity=1024 * 1024, + ) + for i in range(len(cols)) + ] + print("HKV var created") + + # indices + total_indices = [] + total_indices_np = [] + for i in range(len(rows)): + offsets = np.random.randint(1, hotness[i] + 1, iters * batch_size) + offsets = tf.convert_to_tensor(offsets, dtype=tf.int64) + offsets = hvd.broadcast(offsets, root_rank=0) + values = np.random.randint(0, rows[i], tf.reduce_sum(offsets)) + values = tf.convert_to_tensor(values, dtype=tf.int64) + values = hvd.broadcast(values, root_rank=0) + total_indices_np.append(values) + total_indices.append(tf.RaggedTensor.from_row_lengths(values, offsets)) + left = batch_size // hvd.size() * hvd.rank() + right = batch_size // hvd.size() * (hvd.rank() + 1) + + unique_indices = [] + for i in range(len(total_indices_np)): + unique_indices.append(np.unique(total_indices_np[i])) + + # initialize optimizer + optimizer = tf.optimizers.SGD(learning_rate=1.0, momentum=0.9) + sok_optimizer = sok.OptimizerWrapper(optimizer) + + def step(params, indices, use_filter=False): + with tf.GradientTape() as tape: + embeddings = sok.lookup_sparse( + params, indices, combiners=combiners, use_low_frequency_filter=use_filter + ) + loss = 0 + for i in range(len(embeddings)): + loss = loss + tf.reduce_sum(embeddings[i]) + grads = tape.gradient(loss, params) + sok_optimizer.apply_gradients(zip(grads, params)) + loss = hvd.allreduce(loss, op=hvd.Sum) + return loss, embeddings + + indices_records = [] + for i in range(iters): + loss, embeddings = step(sok_vars, total_indices) + print("____________pre lookup is done!________________".format(str(i))) + + # indices + total_indices_filter = [] + total_indices_filter_np = [] + for i in range(len(rows)): + offsets = np.random.randint(1, hotness[i] + 1, filter_iters * batch_size) + offsets = tf.convert_to_tensor(offsets, dtype=tf.int64) + offsets = hvd.broadcast(offsets, root_rank=0) + values = np.random.randint(0, rows[i], tf.reduce_sum(offsets)) + values = tf.convert_to_tensor(values, dtype=tf.int64) + values = hvd.broadcast(values, root_rank=0) + total_indices_filter_np.append(values) + total_indices_filter.append(tf.RaggedTensor.from_row_lengths(values, offsets)) + + left = batch_size // hvd.size() * hvd.rank() + right = batch_size // hvd.size() * (hvd.rank() + 1) + + def check_zero_line(arr): + zero_rows = [idx for idx, row in enumerate(arr) if np.all(row == 0)] + rows = arr.shape[0] + if rows == 0: + return 0, 0, 0 + zero_count = 0 + for i in range(rows): + tmp_line = arr[i, :] + if np.all(tmp_line == 0): + zero_count += 1 + return zero_count, rows, zero_count / rows + + for i in range(iters): + indices = [] + indices_np = [] + indices_new_np = [] + masks = [] + for j in range(len(total_indices_filter)): + tmp_indices_tensor = total_indices_filter[j][ + i * batch_size + left : i * batch_size + right + ] + indices.append(tmp_indices_tensor) + indices_np.append(np.squeeze(tmp_indices_tensor.numpy())) + mask = np.isin(indices_np[j], unique_indices[j]) + masks.append(mask) + loss, embeddings = step(sok_vars, indices, use_filter=True) + for k, embedding in enumerate(embeddings): + embedding_np = embedding.numpy() + mask_no_filter_index = np.where( + masks[k] == True, + )[0] + mask_filter_index = np.where( + masks[k] == False, + )[0] + + print("mask_no_filter_index = ", mask_no_filter_index) + print("mask_filter_index = ", mask_filter_index) + embedding_no_filter_np = embedding_np[mask_no_filter_index, :] + embedding_filter_np = embedding_np[mask_filter_index, :] + + print("embedding_no_filter_np = ", embedding_no_filter_np) + print("embedding_filter_np = ", embedding_filter_np) + print("embedding_np = ", embedding_np.shape) + print("mask = ", mask.shape) + zero_count_no_filter, _, zero_rate_no_filter = check_zero_line(embedding_no_filter_np) + zero_count_filter, _, zero_rate_filter = check_zero_line(embedding_filter_np) + print("zero_rate_no_filter = ", zero_rate_no_filter) + print("zero_rate_filter = ", zero_rate_filter) + assert zero_count_filter >= 0 + assert zero_count_no_filter == 0 + print("low frequency filter is pass")