We use both Docker and Apache Spark quite often in our projects. Recently we had to use the newest version of Spark (2.1.0) in one of them in a dockerized environment.  Since – by the time of resolving the issue – we did not find any image satisfying all our needs, we decided to create our own one and publish it as an Open Source project.  The result is available both on GitHub and DockerHub. We have also prepared a sample Scala/SBT application using Docker for deployment, also available at GitHub.

 

The dockerized Spark image on GitHub also contains a sample docker-compose file which may be used to create a standalone Spark cluster (Spark Master + 2 Workers). Since we are using it for development purposes, we have not integrated it with MESOS nor YARN cluster manager and launched Spark in standalone cluster. The same Docker image is used for both running cluster elements (master, worker) and as a base for deploying Spark jobs.

 

Currently we maintain three versions of Spark:

  • 2.1.0 (with Hadoop 2.7.3)
  • 2.0.2 (with Hadoop 2.7.3)
  • 1.6.3 (with Hadoop 2.6.5)

 

Example integration

Let’s take a look at the example Spark Job published on the GitHub. The first part of the integration with Spark is preparing the Scala application to work on Spark cluster. This is mainly done in the build.sbt file. Using sbt-assembly to create a fat JAR for deployment and sbt-docker to create a Docker image from it simplifies the process to running a single sbt docker command. However, some prior configurations inside the SBT file are required. The non-plugin part of the file looks like this:

name := "sbt-application"

organization := "Semantive"

version := "1.0"

scalaVersion := "2.11.8"

scalacOptions := Seq("-unchecked", "-deprecation", "-encoding", "utf8")

libraryDependencies ++= {
  val sparkV = "2.1.0"

  Seq(
    "org.apache.spark" %% "spark-core" % sparkV % "provided",
    "org.apache.spark" %% "spark-sql" % sparkV % "provided"
  )
}

The above code does not contain much Docker-related or Spark-related content. The only thing is Spark dependency – it must be provided as “provided” – the Spark libraries will be provided by submitting the Spark job. Adding Spark as a dependency without it will cause errors during application deployment.

//--------------------------------
//---- sbt-assembly settings -----
//--------------------------------

val mainClassString = "SparkApplication"

mainClass in assembly := Some(mainClassString)

assemblyJarName := "spark-app.jar"

assemblyMergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf")          => MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$")      => MergeStrategy.discard
  case "log4j.properties"                                  => MergeStrategy.discard
  case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines
  case "reference.conf"                                    => MergeStrategy.concat
  case _                                                   => MergeStrategy.first
}

assemblyOption in assembly ~= { _.copy(cacheOutput = false) }

assemblyExcludedJars in assembly := {
  val cp = (fullClasspath in assembly).value
  cp filter { c =>
    c.data.getName.startsWith("log4j")
    c.data.getName.startsWith("slf4j-") ||
    c.data.getName.startsWith("scala-library")
  }
}

// Disable tests (they require Spark)
test in assembly := {}

// publish to artifacts directory
publishArtifact in(Compile, packageDoc) := false

publishTo := Some(Resolver.file("file", new File("artifacts")))

cleanFiles <+= baseDirectory { base => base / "artifacts" }

In sbt-assembly we deal with problems that may happen during building the fat JAR. The assemblyMergeStrategy is important to deal with files present in multiple dependencies. In this case (application with no dependencies other than Spark) it may look like an overkill, but the issue may arise with any dependency (Akka, Avro, Spark connectors, etc).

The second important part that may need a bit of clarification is the assemblyExcludedJars. In a typical Spark usage, this part may not be necessary at all. However, when using options like “spark.driver.userClassPathFirst=true” (as we sometimes do), overwriting the logger or the Scala library causes the job to crash. That is why we forbid sbt-assembly from adding them to the JAR file.

The third and the last part of the build.sbt file is the sbt-docker configuration:

//--------------------------------
//----- sbt-docker settings ------
//--------------------------------
enablePlugins(sbtdocker.DockerPlugin)

dockerfile in docker := {
  val baseDir = baseDirectory.value
  val artifact: File = assembly.value

  val sparkHome = "/home/spark"
  val imageAppBaseDir = "/app"
  val artifactTargetPath = s"$imageAppBaseDir/${artifact.name}"

  val dockerResourcesDir = baseDir / "docker-resources"
  val dockerResourcesTargetPath = s"$imageAppBaseDir/"

  new Dockerfile {
    // Base Docker image of the target one
    from("semantive/spark")
    // Maintainer of the target image
    maintainer("Semantive")
    // Environment variables (system-wide, on the target image)
    env("APP_BASE", s"$imageAppBaseDir")
    env("APP_CLASS", mainClassString)
    env("SPARK_HOME", sparkHome)
    // File copying (local file path, path inside the image) 
    copy(artifact, artifactTargetPath)
    copy(dockerResourcesDir, dockerResourcesTargetPath)
    // Entry point (command run on start of the image)
    entryPoint(s"${dockerResourcesTargetPath}docker-entrypoint.sh")
  }
}
buildOptions in docker := BuildOptions(cache = false)

imageNames in docker := Seq(
  ImageName(
    namespace = Some(organization.value.toLowerCase),
    repository = name.value,
    // We parse the IMAGE_TAG env var which allows us to override the tag at build time
    tag = Some(sys.props.getOrElse("IMAGE_TAG", default = version.value))
  )
)

Apart from setting variables, two important things take place here:

  1. A Dockerfile for the application is defined (new Dockerfile { … }).
  2. The image is named (here: organization is treated as the namespace and image gets SBT project’s name).

Having built the application docker image, it can be submitted to the cluster. The submit may be performed both from the local OS or from the Docker image. Since the second option is more deployment-friendly and CI-friendly, we decided to add our Spark jobs to the docker-compose file for submitting.

The sample application is shipped with a sample docker-compose file:

master:
  image: semantive/spark
  command: bin/spark-class org.apache.spark.deploy.master.Master -h master
  hostname: master
  environment:
    MASTER: spark://master:7077
    SPARK_CONF_DIR: /conf
    SPARK_PUBLIC_DNS: localhost
  ports:
    - 4040:4040
    - 6066:6066
    - 7077:7077
    - 8080:8080
  volumes:
    - ./data:/tmp/data

worker:
  image: semantive/spark
  command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
  hostname: worker1
  environment:
    SPARK_CONF_DIR: /conf
    SPARK_WORKER_CORES: 4
    SPARK_WORKER_MEMORY: 2g
    SPARK_WORKER_PORT: 8881
    SPARK_WORKER_WEBUI_PORT: 8081
    SPARK_PUBLIC_DNS: localhost
  links:
    - master
  ports:
    - 8081:8081
  volumes:
    - ./data:/tmp/data

application:
  image: semantive/sbt-application:1.0
  links:
    - master

The file usually does not require many changes. However, two options (SPARK_WORKER_CORES and SPARK_WORKER_MEMORY) should be tuned accordingly to the developers’ machines’ resources and the submitted applications – setting insufficient values may cause some jobs to not be run on the cluster at all. This can be verified using the SparkUI (provided by the master on port 8080).

 

We hope that you may find our Docker image for Spark useful in your projects too. Any issue reports and pull requests are appreciated and welcomed!

Leave a Reply