Splash, a flexible Spark shuffle manager that supports user-defined storage backends for shuffle data storage and exchange
APACHE-2.0 License
A shuffle manager for Spark that supports different storage plugins.
The motivation of this project is to supply a fast, flexible and reliable shuffle manager that allows the user to plug in his/her favorite backend storage and network frameworks for holding and exchanging shuffle data.
In general, the current shuffle manager in Spark has some shortcomings.
We want to address these issues in this shuffle manager.
By default, we support Spark 2.3.2_2.11 with Hadoop 2.7.
If you want to generate a build with a different Spark version, you need to modify
these version parameters in pom.xml
spark.version
hadoop.version
scala.version
Check the Build section for how to generate your customized jar.
spark-defaults.conf
spark.driver.extraClassPath /path/to/splash.jar
spark.executor.extraClassPath /path/to/splash.jar
spark.shuffle.manager org.apache.spark.shuffle.SplashShuffleManager
Although the basic functionality of the project has been verified, we still feel that the public API might be modified when more storage plugins are developed. Therefore:
According to the definition of semantic versioning 2.0.0, we do not promise backward compatibility if the first digit in the version is changed.
question
.doc
folder.You can communicate with us in following ways:
Please check the Contributing document for details.
Use mvn install
to build the project. Optionally, you could use
-DskipTests=true
to disable the unit tests.
When the build process completes:
./target/splash-<version>.jar
. This./target/splash-<version>-shaded.jar
./target/surefire-reports
./target/site/jacoco
Use mvn clean
to clean the build output.
Use integration-test
or mvn failsafe:integration-test -DskipIT=false
to run the integration tests. Those tests should connect to the actual File
System. You could also modify the test source code to test your own storage
plugin.
./target/failsafe-reports
Use mvn pmd:pmd
to run static code analysis.
./target/site/pmd.html
spark.shuffle.splash.storageFactory
specifies the class name of yourStorageFactory
spark.shuffle.splash.clearShuffleOutput
is a boolean value telling theSplash uses plugins to support different types of storage systems. The user can develop their own storage plugins for the shuffle manager. The user can use different types of storage system based on the usage of the file. For details, please check our design document.
The Splash project is currently released with a default plugin:
com.memverge.splash.shared.SharedFSFactory
This plugin serves as an example for developers to develop their own storage plugins.
Take NFS as an example, here are the steps to configure Splash with the shared folder plugin.
spark-defaults.conf
:# add the Splash jar to the classpath
spark.driver.extraClassPath /path/to/splash.jar
spark.executor.extraClassPath /path/to/splash.jar
# set shuffle manager and storage plugin
spark.shuffle.manager org.apache.spark.shuffle.SplashShuffleManager
spark.shuffle.splash.storageFactory com.memverge.splash.shared.SharedFSFactory
# set the location of your shared folder
spark.shuffle.splash.folder /your/share/folder
Use this tool to verify the performance of the storage plugin. Users could also use this tool to compare different storage plugin implementations or find the regressions of the storage plugin.
Note that this tool bases on the storage interface. It does not require a Spark environment.
It writes the shuffle output and read them with configured arguments. See the configuration details below:
-h
or --help
: display the usage-f
or --factory
: specify the name of the storage factory-i
or --shuffleId
: the test shuffle ID, default to 1-t
or --tasks
: the number of concurrent tasks, default to 5-m
or --mappers
: the number of mappers, default to 10-r
or --reducers
: the number of reducers, default to 10-d
or --data
: the number of data blocks, default to 1K-b
or --blockSize
: the block/buffer size of each data block,-o
or --overwrite
: overwrite existing outputsSample command:
java -cp target/splash-shaded.jar com.memverge.splash.ShufflePerfTool
-d 64 -m 200 -r 200 -t 8 -o
Sample output
overwrite, removing existing shuffle for shuffleTest-1
==========================================
Writing 200 shuffle with 8 threads: 100% (200/200)
Write shuffle data completed in 7440 milliseconds
Reading index file: 0 ms
storage factory: com.memverge.splash.shared.SharedFSFactory
shuffle folder: \tmp\splash\shuffleTest-1\shuffle
number of mappers: 200
number of reducers: 200
total shuffle size: 3GB
bytes written: 3GB
bytes read: 0B
number of blocks: 64
blocks size: 256KB
partition size: 81KB
concurrent tasks: 8
bandwidth: 430MB/s
==========================================
Reading 40000 partitions with 8 threads 100% (40000/40000)
Read shuffle data completed in 35525 milliseconds
Reading index file: 15907 ms
storage factory: com.memverge.splash.shared.SharedFSFactory
shuffle folder: \tmp\splash\shuffleTest-1\shuffle
number of mappers: 200
number of reducers: 200
total shuffle size: 3GB
bytes written: 3GB
bytes read: 3GB
number of blocks: 64
blocks size: 256KB
partition size: 81KB
concurrent tasks: 8
bandwidth: 90MB/s