Classification of Text into Topics - Debug with pio-shell

Author : Rajdeep Dua
Last Updated : Jan 30 2017

Launch PIO Shell

./bin/pio-shell

Execute the Following Commands

  1. Import appropriate classes from Spark and Prediction IO
import org.apache.predictionio.controller._
import org.apache.predictionio.data.storage.{Event, PropertyMap, Storage}
import org.apache.predictionio.data.store.PEventStore
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
  1. Create WikiPage Class
case class WikiPage(content:String, category:String)
  1. Create TrainingData Class
import org.apache.predictionio.controller._
import org.apache.predictionio.controller.Params
case class DataSourceParam(appName:String, evalK:Int) extends Params
import org.apache.predictionio.controller.SanityCheck
class TrainingData(val contentAndcategory:RDD[WikiPage])
  extends Serializable with SanityCheck{

  def sanityCheck(): Unit = {
    try {
      val obs = contentAndcategory.takeSample(false, 2)
      println("total observation",obs.length)
      println()
    } catch {
      case (e: ArrayIndexOutOfBoundsException) => {
        println()
        println("Data set is empty," +"
          make sure event fields match imported data.")
        println()
      }
    }
  }
}

4.Create DataSource class

import org.apache.predictionio.controller.PDataSource
import org.apache.predictionio.controller.{PDataSource, EmptyActualResult}
import org.apache.predictionio.controller.{PDataSource, EmptyEvaluationInfo}
class DataSource(val dsp: DataSourceParam) extends
  PDataSource[TrainingData, EmptyEvaluationInfo, Query, EmptyActualResult]{

  def readEvent(sc:SparkContext) : RDD[WikiPage] = {
    val eventDB = Storage.getPEvents()
    val eventRDD: RDD[Event] = PEventStore.find(
      appName = dsp.appName,
      entityType = Some("wiki_page"),
      eventNames = Some(List("train"))
      )(sc)
    val sentimentRDD : RDD[WikiPage] = eventRDD.map{
      event  =>
        val text = event.properties.get[String]("content")
        val category = event.properties.get[String]("category")
        WikiPage(text,category)
    }
    sentimentRDD
  }

   override def readTraining(sc: SparkContext): TrainingData = {
    new TrainingData(readEvent(sc))
  }
}
  1. Create Tokenizer_new class which Transforms text in unigram RDD
import org.apache.spark.ml.feature.Tokenizer
class Tokenizer_new extends Tokenizer(){

  override def createTransformFunc: (String) => Seq[String] = { str =>
    val unigram = str.replaceAll("[.*|!*|?*|=*|)|(]","").
      replaceAll("((www\\.[^\\s]+)|(https?://[^\\s]+)|(http?://[^\\s]+))","")
      .replaceAll("(0-9*)|(0-9)+(A-Za-z)*(.*|:*)","")
      .toLowerCase().split("\\s+").toSeq
    unigram
  }
}
  1. Create PreparedData class
case class PreparedData(var trainingData: TrainingData,
  var labeledpoints:RDD[LabeledPoint]) extends Serializable{
}
  1. Create DataSource Class Instance
val dsp = new DataSourceParam("topics_wikipedia",1)
val ds = new DataSource(dsp)

Output

ds: DataSource = $iwC$$iwC$$iwC$DataSource@1eb306f9

Populate trainingData RDD.

val trainingData = ds.readTraining(sc)
//trainingData: TrainingData = $iwC$$iwC$TrainingData@571b6751
  1. Create eventRDD from PredictionIO Storage.
val eventDB = Storage.getPEvents()
//eventDB: org.apache.predictionio.data.storage.PEvents
// = org.apache.predictionio.data.storage.jdbc.JDBCPEvents@be1f3d6



val eventRDD: RDD[Event] = PEventStore.find(
            appName = dsp.appName,
            entityType = Some("wiki_page"),
            eventNames = Some(List("train"))
            )(sc)

eventRDD.first()

Instantate sentimentRDD from eventRDD by using WikiPage class

val sentimentRDD : RDD[WikiPage] = eventRDD.map{
            event  =>
              val text = event.properties.get[String]("content")
              val category = event.properties.get[String]("category")
              WikiPage(text,category)
          }
sentimentRDD.first()

Output

res2: WikiPage =
WikiPage(Politics (from Greek: Politiká: Politika, definition
"affairs of the cities") is the process of making decisions applying
to all members of each group. More narrowly, it refers to achieving
and exercising positions of governance — organized control over a
human community, particularly a state. Furthermore, politics is the
study or practice of the distribution of power and resources within
a given community (a usually hierarchically organized population) as
well as the interrelationship(s) between communities.
A variety of methods are deployed in politics, which include promoting
or forcing one's own political views among people, negotiation with
other political subjects, making laws, and exercising force, including
warfare against adversaries. Politics is exer...
scala> val trainingData = new TrainingData(sentimentRDD)
trainingData: TrainingData = $iwC$$iwC$TrainingData@25ae5f6b
  1. Create PhraseDataframe
import org.apache.predictionio.controller.PPreparator
import org.apache.spark.SparkContext
import org.apache.spark.ml.feature.{Tokenizer, _}
import org.apache.spark.mllib.regression.LabeledPoint

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.callUDF
def processPhrase(phraseDataframe:DataFrame): DataFrame ={

  val tokenizer =
    new Tokenizer_new().setInputCol("content").setOutputCol("unigram")
  val unigram = tokenizer.transform(phraseDataframe)

  val remover =
    new StopWordsRemover().setInputCol("unigram").setOutputCol("filtered")
  val stopRemoveDF = remover.transform(unigram)

  var htf = new HashingTF().setInputCol("filtered").setOutputCol("rowFeatures")
  val tf = htf.transform(stopRemoveDF)

  tf
}
val obs = trainingData.contentAndcategory

Output

obs: org.apache.spark.rdd.RDD[WikiPage] =
 MapPartitionsRDD[3] at map at <console>:50
val sqlContext = SQLContext.getOrCreate(sc)
//sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@38ebf8dc
val phraseDataframe = sqlContext.createDataFrame(obs).toDF("content", "category")
val categories: Map[String,Int] = phraseDataframe.map(row => row.getAs[String]("category")).collect().zipWithIndex.toMap
//categories: Map[String,Int] = Map(Politics -> 0, Sport -> 1)
  1. Create TermFrequency (tf) Dataframe
val tf = processPhrase(phraseDataframe)
//tf: org.apache.spark.sql.DataFrame
// = [content: string, category: string, unigram: array<string>,
// filtered: array<string>, rowFeatures: vector]
  1. PreparedData instance from labeldpoints, followed by PhraseDataframe
val labeledpoints = tf.map(row => new LabeledPoint(categories(row.getAs[String]("category")).toDouble, row.getAs[Vector]("rowFeatures")))

//labeledpoints: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[18] at map at <console>:88
val pd = PreparedData
case class AlgorithmParams(val lambda:Double) extends Params
val ap = new AlgorithmParams(1)
ap: AlgorithmParams = AlgorithmParams(1.0)
import org.apache.predictionio.controller.{IPersistentModel,
  IPersistentModelLoader, P2LAlgorithm, Params}
import org.apache.spark
import org.apache.spark.SparkContext
import org.apache.spark.ml.feature._
import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, DataFrame, Row, SQLContext}
import org.apache.spark.sql.functions.lit
import org.apache.spark.SparkConf


val pd = PreparedData(trainingData,labeledpoints)

Output

pd: PreparedData = PreparedData($iwC$$iwC$TrainingData@25ae5f6b,MapPartitionsRDD[18] at map at <console>:88)
pd.labeledpoints

Output

res10: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[18] at map at <console>:88
val nbModel =  NaiveBayes.train(pd.labeledpoints,lambda = ap.lambda)

//nbModel: org.apache.spark.mllib.classification.NaiveBayesModel
//  = org.apache.spark.//mllib.classification.NaiveBayesModel@69f50fb5

val obs = pd.trainingData.contentAndcategory


val sqlContext = SQLContext.getOrCreate(sc)
//sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@38ebf8dc

val phraseDataframe = sqlContext.createDataFrame(obs).toDF("content", "category")

Output

phraseDataframe: org.apache.spark.sql.DataFrame =
  [content: string, category: string]

Output

phraseDataframe.show()
+--------------------+--------+
|             content|category|
+--------------------+--------+
|Politics (from Gr...|Politics|
|Sport (UK) or spo...|   Sport|
+--------------------+--------+
val categories: Map[String,Int]
  = phraseDataframe.map(row => row.getAs[String]("category")).
    collect().zipWithIndex.toMap
categories: Map[String,Int] = Map(Politics -> 0, Sport -> 1)
  1. Create PredictionIO Model class from NaiveBayesModel

    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    case class Model( nbModel: NaiveBayesModel,
                      categories : Map[String,Int],
                         sc: SparkContext
                       ) extends IPersistentModel[AlgorithmParams] with Serializable{
      def save(id: String, params: AlgorithmParams, sc: SparkContext): Boolean = {
        nbModel.save(sc, s"/tmp/${id}/nbmodel")
        sc.parallelize(Seq(categories)).saveAsObjectFile(s"/tmp/${id}/categories")
        true
      }
    }
    
    object Model extends IPersistentModelLoader[AlgorithmParams, Model]{
      def apply(id: String, params: AlgorithmParams, sc: Option[SparkContext]) = {
       // println(sc.get.objectFile(s"/tmp/${id}/categories"))
        new Model(
          NaiveBayesModel.load(sc.get,s"/tmp/${id}/nbmodel"),
          sc.get.objectFile[Map[String,Int]](s"/tmp/${id}/categories").first,
          sc.get
        )
      }
    }
    
    val model = new Model(nbModel, categories, sc)
    

    Output

    model: Model = Model(org.apache.spark.mllib.classification.NaiveBayesModel@69f50fb5,Map(Politics -> 0, Sport -> 1),org.apache.spark.SparkContext@356fa0d1)
    
  2. Define Query class and instantiate

case class Query(topics: Seq[Array[String]]) extends Serializable
val topics = Seq(Array("hockey"), Array("baseball"))

Output

topics: Seq[Array[String]] = List(Array(hockey), Array(baseball))
val query = new Query(topics)
val qryInd = query.topics.zipWithIndex
val df = sqlContext.createDataFrame(qryInd).toDF("words","id")
var htf = new HashingTF().setInputCol("words").setOutputCol("feature")
val hm = htf.transform(df)
val featureSet = hm.map(x => x.getAs[Vector]("feature"))
val categories = model.categories.map(_.swap)

Output

categories: scala.collection.immutable.Map[Int,String]
  = Map(0 -> Politics, 1 -> Sport)
  1. Predict using NaiveBayesModel
val prediction = model.nbModel.predict(featureSet).first()

Output

prediction: Double = 1.0
val cat = categories(prediction.toInt)
val prob = model.nbModel.predictProbabilities(featureSet)

Output

prob: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[32] at mapPartitions at NaiveBayes.scala:116
prob.first()

Output

res12: org.apache.spark.mllib.linalg.Vector =
  [0.33277732913894537,0.6672226708610547]