Tuesday, September 20, 2011

Shutdown MiniDFSCluster and MiniMRCluster Takes Forever

Writing unit tests for Hadoop applications does not need to be more complicated than writing tests for any other Java application.
Usually, I use the following procedure for testing my Hadoop code:
  1. Testing the classes as real units.
  2. Each Mapper and each Reducer deserves to be tested in isolation. Spock with its awesome support for mocking and stubbing is a great tool for testing units in isolation. With few 3rd party dependencies it is even possible to mock final classes, bypass the existing constructors of these classes, and inject them into private fields of the units under test => isolation at its best (more on this in another post).
    Furthermore, there might be other units like custom Writables, InputFormat and OutputFormat implementations and classes containing the business logic to convert data, and so on. I try to write a Specification (the Spock counterpart of a TestCase) for each non-trivial class.
    This should reveal most of the nasty little bugs contained in the business logic of the application. Small side-note: I know, there is a mr-unit test framework provided by Cloudera. Personally, I found Spock to be more powerful and flexible, but as always, this is merely a matter of taste.
  3. Next step is to execute complete job roundtrips using the local Hadoop mode, again with Spock. If the application features a command line interface, I set up a simple Specification which executes the main() method of the main driver class and compares some expected output files against the actual output files.
  4. Being able to execute jobs in local mode is great, because it is a fast way to run blackbox test against the map reduce framework. Howerver, since the local mode is somewhat limited, it might be necessary to use a real cluster. For example, in local mode it is not possible to run more than one reducer. This limits the testing capabilities of custom partitioning and grouping code.
  5. To execute m/r code on a real cluster, I often use the hadoop-test project (include it with testCompile 'org.apache.hadoop:hadoop-test:0.20.2' in the build.gradle file). This project features implementations of both a distributed filesystem (org.apache.hadoop.hdfs.MiniDFSCluster) and a m/r cluster (org.apache.hadoop.mapred.MiniMRCluster). The most annoying thing about these clusters is the very long startup time, but hey, they're distributed. The remainder of this post is about usage of these clusters.
In case there are several different Hadoop jobs to test or only a single job with different configurations, because of the long startup time, one should think about putting the cluster management code into the static setup methods of the test framework.
static MiniMRCluster mrCluster
static MiniDFSCluster dfsCluster

def setupSpec() {
	def conf = new JobConf()
	if (System.getProperty("hadoop.log.dir") == null) {
		System.setProperty("hadoop.log.dir", "/tmp");
	}
		
	dfsCluster = new MiniDFSCluster(conf, 2, true, null)
	mrCluster = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(), 1)
		
	def hdfs = dfsCluster.getFileSystem()
	hdfs.delete(new Path('/main-testdata'), true)
	hdfs.delete(new Path('/user'), true)
	FileUtil.copy(inputData, hdfs, new Path('main-testdata'), false, conf)
}
Notes:
  • setupSpec() is the Spock equivalent of JUnit 4's @BeforeClass
  • If the system property (lines 6 and 7) is not set, the m/r cluster will not startup
  • The clusters are configured to use 2 slave nodes each
  • After successful filesystem startup, it is possible to perform the usual filesystem operations with it
Teardown and cleanup is similarly performed in the static context:
def cleanupSpec() {
	mrCluster?.shutdown()
	dfsCluster?.getFileSystem().delete(new Path('/main-testdata'), true)
	dfsCluster?.getFileSystem().delete(new Path('/user'), true)
	dfsCluster?.shutdown()
}
However, there is a really annoying problem with this code: it takes forever. I don't know why (presumably, I'm doing something wrong), but this shutdown procedure is blocked by several data integrity tests which take a long time themselves. Since the generated data is garbage and of zero relevance after the tests have completed, I'd like to get rid of these checks, but I really cannot figure out how. The logs get s-l-o-w-l-y filled with lines like
11/09/19 23:47:06 INFO datanode.DataBlockScanner: Verification succeeded for blk_3329401068442722923_1001
11/09/19 23:47:12 INFO datanode.DataBlockScanner: Verification succeeded for blk_654270692326292497_1008
11/09/19 23:47:39 INFO datanode.DataBlockScanner: Verification succeeded for blk_-3673127094860948561_1006
and the single test and thus the whole test suite as well takes several minutes to complete, fatal for each continuous integration system. However, there is a workaround which is neither obvious nor very nice, but it works: wrapping the shutdown procedure in another thread. Simply by modifying the code from above into
def cleanupSpec() {
	Thread.start { 
		mrCluster?.shutdown()
		dfsCluster?.getFileSystem().delete(new Path('/main-testdata'), true)
		dfsCluster?.getFileSystem().delete(new Path('/user'), true)
		dfsCluster?.shutdown()
	}.join(5000)
}
I could not spot any blocking behavior any longer. If someone has a better idea to avoid blocking on shutdown, please feel free to comment.

No comments:

Post a Comment