*February8, 2018* Spark is a framework which tries to provides answers to many problems at once. At its core it allows for the distribution of generic workloads to a cluster. But then it provides a SQL-friendly API to work with structured data, a streaming engine to support applications with fast-data requirements and a ML library. The later is the one in which we are interested in this post: a distributed machine learning library with several models and general feature extraction, transformation and selection implementations. Supporting abstractions for composing ML pipelines or hyperparameter tunning, among others, are also provided. Even though we get a lot out of the box from Spark ML, there will eventually be cases where you need to develop your custom transformations. Maybe the data science team you are working with as came up with some new complex features that turned out to be really valuable to the problem and now you need to implement these transformations at scale. Ideally, you will want to write them using Scala and expose a Python wrapper to facilitate their use. For a better understanding, I recommend studying Spark’s code. Start with a easy model like the CountVectorizer and understand what is being done. It will give you all the tools you need to build your own customizations. We will use Spark 2.2.1 and the ML API that makes use of the DataFrame abstraction. The complete example can be found on this repository. It contains the scala code plus the python wrapper implementation and boiler plate for testing in both languages. Custom Estimator/Transformer Let’s create a custom Bucketizer that will divide the range of a continuous numerical column by an input parameter numberBins and then, for each row, decide the appropriate bin. Given an input column: ``` +-----+ |input| +-----+ | 1.0| | 5.0| | 0.0| | 7.0| | 4.0| | 8.0| | 10.0| +-----+ ``` We expect the following output ``` +-----+---+ |input|bin| +-----+---+ | 1.0| 0| | 5.0| 2| | 0.0| 0| | 7.0| 2| | 4.0| 1| | 8.0| 3| | 10.0| 4| +-----+---+ ``` In order to create a custom Transformer or Estimator we need to follow some contracts defined by Spark. Very briefly, a Transformer must provide a .transform implementation in the same way as the Estimator must provide one for the .fit method. You need an Estimator every time you need to calculate something prior to the actual application of the transformation. For instance, if you need to normalize the value of the column between 0 and 1, you must necessarily first know the maximum and the minimum of that particular column. So you would create a estimator with a .fit method that calculates this data and then returns a Model that already has all it needs to apply the operation. First of all declare the parameters needed by our Bucketizer: ```scala trait BucketizerParams extends Params { val inputCol= new Param[String](this, "inputCol", "The input column") val outputCol = new Param[String](this, "outputCol", "The output column") val numberBins: IntParam = new IntParam(this, "numberBins", "Number of fixed bins to divide the continuous range", ParamValidators.gt(0)) def getNBins: Int = $(numberBins) /** Validates and transforms the input schema. */ protected def validateAndTransformSchema(schema: StructType): StructType = { require(isDefined(inputCol), s"Bucketizer requires input column parameter: $inputCol") require(isDefined(outputCol), s"Bucketizer requires output column parameter: $outputCol") val field = schema.fields(schema.fieldIndex($(inputCol))) if (field.dataType!= DoubleType) { throw new Exception( s"Input type ${field.dataType} did not match input type DoubleType") } // Add the return field schema.add(StructField($(outputCol), IntegerType, false)) } } ``` `validateAndTransformSchema` just validates the model operating conditions, like the input type of the column: `if (field.dataType!= DoubleType)` We then declare that our Bucketizer will respect the Estimator contract, by returning a `BucketizerModel` with the transform method implemented. Additionally, `BucketizerParams` provides functionality to manage the parameters that we have defined above. ```scala class Bucketizer(override val uid: String) extends Estimator[BucketizerModel] with BucketizerParams And here is the implementation: class Bucketizer(override val uid: String) extends Estimator[BucketizerModel] with BucketizerParams { def this() = this(Identifiable.randomUID("Bucketizer")) def setInputCol(value: String) = set(inputCol, value) def setOutputCol(value: String) = set(outputCol, value) def setNumberBins(value: Int): this.type = set(numberBins, value) private def minMax(a: Array[Double]): (Double, Double) = { a.foldLeft((a(0), a(0))){case (acc, x) => (math.min(acc._1, x), math.max(acc._2, x))} } override def copy(extra: ParamMap): Bucketizer = { defaultCopy(extra) } override def transformSchema(schema: StructType): StructType = { validateAndTransformSchema(schema) } override def fit(dataset: Dataset[_]): BucketizerModel = { transformSchema(dataset.schema, logging = true) val numBins = $(numberBins) val input = dataset.select($(inputCol)).rdd.map(_.getAs[Double](0)).collect() val (min, max) = minMax(input) val bins = SortedMap[Double, Int]((min to max by ((max - min) / numBins)).zipWithIndex:_*) val model = new BucketizerModel(uid, bins) copyValues(model) } } ``` The interesting part is the fit method that calculates the minimum and maximum values of the input column, creates a SortedMap with the bins boundaries and returns a `BucketizerModel` with this pre calculated data. This model, having knowledge about the boundaries, just needs to map each value to the right bin: ```scala class BucketizerModel(override val uid: String, val bins: SortedMap[Double, Int]) extends Model[BucketizerModel] with BucketizerParams with MLWritable { import BucketizerModel._ /** Java-friendly version of [[bins]] */ def javaBins: JMap[JDouble, JInt] = { bins.map{ case (k, v) => double2Double(k) -> int2Integer(v) }.asJava } /** Returns the corresponding bin on which the input falls */ val getBin = (a: Double, bins: SortedMap[Double, Int]) => bins.to(a).last._2 override def copy(extra: ParamMap): BucketizerModel = { defaultCopy(extra) } override def transformSchema(schema: StructType): StructType = { validateAndTransformSchema(schema) } private var broadcastBins: Option[Broadcast[SortedMap[Double, Int]]] = None override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) if (broadcastBins.isEmpty) { val dict = bins broadcastBins = Some(dataset.sparkSession.sparkContext.broadcast(dict)) } val binsBr = broadcastBins.get val vectorizer = udf { (input: Double) => getBin(input, binsBr.value)} dataset.withColumn($(outputCol), vectorizer(col($(inputCol)))) } override def write: MLWriter = new BucketizerModelWriter(this) } ``` `javaBins` is needed to map the bins data structure to a more java-friendly version. Otherwise when we ask for this structure from Python (through py4j) we cannot directly cast it to a Python dict In the companion object of BucketizerModel we provide support for model persistence to disk. ```scala object BucketizerModel extends MLReadable[BucketizerModel] { class BucketizerModelWriter(instance: BucketizerModel) extends MLWriter { private case class Data(bins: Map[Double, Int]) override protected def saveImpl(path: String): Unit = { spark.persistence.DefaultParamsWriter.saveMetadata(instance, path, sc) val data = Data(instance.bins) val dataPath = new Path(path, "data").toString sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) } } private class BucketizerModelReader extends MLReader[BucketizerModel] { private val className = classOf[BucketizerModel].getName override def load(path: String): BucketizerModel = { val metadata = spark.persistence.DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString val data = sparkSession.read.parquet(dataPath) .select("bins") .head() val bins = SortedMap(data.getAs[Seq[(Double, Int)]](0).toMap.toArray:_*) val model = new BucketizerModel(metadata.uid, bins) spark.persistence.DefaultParamsReader.getAndSetParams(model, metadata) model } } override def read: MLReader[BucketizerModel] = new BucketizerModelReader override def load(path: String): BucketizerModel = super.load(path) } ``` Spark ML has some modules that are marked as private so we need to reimplement some behaviour. In the github repository this is done in `ReadWrite.scala` and `Utils.scala`. To create the jar: ``` sbt clean assembly ``` ### Python wrapper In case we need to provide access to our Python friends, we will need to create a wrapper on top of the Estimator. First of all, we need to inject our custom jar to the spark context. ```python import pyspark from pyspark import SparkConf conf = SparkConf() conf.set("spark.executor.memory", "1g") conf.set("spark.cores.max", "2") conf.set("spark.jars", 'spark-mllib-custom-models-assembly-0.1.jar') conf.set("spark.app.name", "sparkTestApp") spark = pyspark.sql.SparkSession.builder.config(conf=conf).getOrCreate() ``` We will need to write a wrapper on top of both the Estimator and the Model. For the Estimator is basically just boilerplate regarding the input arguments and also specify our package name in `_classpath`. ```python class Bucketizer(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable): """ Divides the range of a continuous column by an input parameter `numberBins` and then, for each row, decides the appropriate bin. """ _classpath = 'com.custom.spark.feature.Bucketizer' numberBins = Param( Params._dummy(), "numberBins", "Number of fixed bins to divide the range", typeConverter=TypeConverters.toInt) @keyword_only def __init__(self, inputCol=None, outputCol=None, numberBins=10): super(Bucketizer, self).__init__() self._java_obj = self._new_java_obj( Bucketizer._classpath , self.uid ) self._setDefault(numberBins=10) kwargs = self._input_kwargs self.setParams(**kwargs) @keyword_only def setParams(self, inputCol=None, outputCol=None, numberBins=10): """ Set the params for the TokenDistributionVectorizer """ kwargs = self._input_kwargs return self._set(**kwargs) def setNumberBins(self, value): return self._set(numberBins=value) def getNumberBins(self): return self.getOrDefault(self.numberBins) def setOutputCol(self, value): return self._set(outputCol=value) def getOutputCol(self): return self.getOrDefault(self.outputCol) def _create_model(self, java_model): return BucketizerModel(java_model) ``` `HasInputCol` and `HasOutputCol` save us the trouble of having to write: ```python inputCol = Param( Params._dummy(), "inputCol", "The input column", typeConverter=TypeConverters.toString) outputCol = Param( Params._dummy(), "outputCol", "The output column", typeConverter=TypeConverters.toString) ``` And here is the model: ```python class BucketizerModel(JavaModel, JavaMLReadable, JavaMLWritable): """ Model fitted by :py:class:`Bucketizer`. """ _classpath_model = 'com.custom.spark.feature.BucketizerModel' @property def bins(self): """ Map containing the boundary points for the range of the bins """ return self._call_java("javaBins") @staticmethod def _from_java(java_stage): """ Given a Java object, create and return a Python wrapper of it. Used for ML persistence. Meta-algorithms such as Pipeline should override this method as a classmethod. """ # Generate a default new instance from the stage_name class. py_type = BucketizerModel if issubclass(py_type, JavaParams): # Load information from java_stage to the instance. py_stage = py_type() py_stage._java_obj = java_stage py_stage._resetUid(java_stage.uid()) py_stage._transfer_params_from_java() return py_stage @classmethod def read(cls): """Returns an MLReader instance for this class.""" return CustomJavaMLReader(cls, cls._classpath_model) ``` Note that we are calling the java-friendly version to retrieve the bins data structure ``` self._call_java("javaBins") ``` Additionally, we provide the qualifier name of the package where the model is implemented `com.custom.spark.feature.BucketizerModel` Finally, in the read method we are returning a CustomJavaMLReader. This is a custom reading behaviour that we had to reimplement in order to allow for model persistence, i.e. being able to save/load the model. You can check the details in the repository. Additional support must be given to support the persistence of this model in Spark’s Pipeline context.