Usually, I use the following procedure for testing my Hadoop code:
- Testing the classes as real units. Each Mapper and each Reducer deserves to be tested in isolation. Spock with its awesome support for mocking and stubbing is a great tool for testing units in isolation. With few 3rd party dependencies it is even possible to mock final classes, bypass the existing constructors of these classes, and inject them into private fields of the units under test => isolation at its best (more on this in another post).
- Next step is to execute complete job roundtrips using the local Hadoop mode, again with Spock. If the application features a command line interface, I set up a simple Specification which executes the main() method of the main driver class and compares some expected output files against the actual output files. Being able to execute jobs in local mode is great, because it is a fast way to run blackbox test against the map reduce framework. Howerver, since the local mode is somewhat limited, it might be necessary to use a real cluster. For example, in local mode it is not possible to run more than one reducer. This limits the testing capabilities of custom partitioning and grouping code.
- To execute m/r code on a real cluster, I often use the hadoop-test project (include it with
testCompile 'org.apache.hadoop:hadoop-test:0.20.2'
in thebuild.gradle
file). This project features implementations of both a distributed filesystem (org.apache.hadoop.hdfs.MiniDFSCluster
) and a m/r cluster (org.apache.hadoop.mapred.MiniMRCluster
). The most annoying thing about these clusters is the very long startup time, but hey, they're distributed. The remainder of this post is about usage of these clusters.
Furthermore, there might be other units like custom Writables, InputFormat and OutputFormat implementations and classes containing the business logic to convert data, and so on. I try to write a Specification (the Spock counterpart of a TestCase) for each non-trivial class.
This should reveal most of the nasty little bugs contained in the business logic of the application. Small side-note: I know, there is a mr-unit test framework provided by Cloudera. Personally, I found Spock to be more powerful and flexible, but as always, this is merely a matter of taste.
static MiniMRCluster mrCluster static MiniDFSCluster dfsCluster def setupSpec() { def conf = new JobConf() if (System.getProperty("hadoop.log.dir") == null) { System.setProperty("hadoop.log.dir", "/tmp"); } dfsCluster = new MiniDFSCluster(conf, 2, true, null) mrCluster = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(), 1) def hdfs = dfsCluster.getFileSystem() hdfs.delete(new Path('/main-testdata'), true) hdfs.delete(new Path('/user'), true) FileUtil.copy(inputData, hdfs, new Path('main-testdata'), false, conf) }Notes:
setupSpec()
is the Spock equivalent of JUnit 4's@BeforeClass
- If the system property (lines 6 and 7) is not set, the m/r cluster will not startup
- The clusters are configured to use 2 slave nodes each
- After successful filesystem startup, it is possible to perform the usual filesystem operations with it
def cleanupSpec() { mrCluster?.shutdown() dfsCluster?.getFileSystem().delete(new Path('/main-testdata'), true) dfsCluster?.getFileSystem().delete(new Path('/user'), true) dfsCluster?.shutdown() }However, there is a really annoying problem with this code: it takes forever. I don't know why (presumably, I'm doing something wrong), but this shutdown procedure is blocked by several data integrity tests which take a long time themselves. Since the generated data is garbage and of zero relevance after the tests have completed, I'd like to get rid of these checks, but I really cannot figure out how. The logs get s-l-o-w-l-y filled with lines like
11/09/19 23:47:06 INFO datanode.DataBlockScanner: Verification succeeded for blk_3329401068442722923_1001 11/09/19 23:47:12 INFO datanode.DataBlockScanner: Verification succeeded for blk_654270692326292497_1008 11/09/19 23:47:39 INFO datanode.DataBlockScanner: Verification succeeded for blk_-3673127094860948561_1006and the single test and thus the whole test suite as well takes several minutes to complete, fatal for each continuous integration system. However, there is a workaround which is neither obvious nor very nice, but it works: wrapping the shutdown procedure in another thread. Simply by modifying the code from above into
def cleanupSpec() { Thread.start { mrCluster?.shutdown() dfsCluster?.getFileSystem().delete(new Path('/main-testdata'), true) dfsCluster?.getFileSystem().delete(new Path('/user'), true) dfsCluster?.shutdown() }.join(5000) }I could not spot any blocking behavior any longer. If someone has a better idea to avoid blocking on shutdown, please feel free to comment.