티스토리 뷰

반응형

// 이전 글의 세션 헬퍼를 먼저 참조해 주세요.


package sample.scala.spark.mongodb


import com.mongodb.spark.MongoSpark

import com.mongodb.spark.config.ReadConfig

import org.bson.Document


object MongoAggregate extends MongoHelper {


  def main(args: Array[String]):Unit = {


      val sparkSession = getSparkSession(args)


     val readConfig = ReadConfig(Map("collection" -> "orders", "readPreference.name" -> "primaryPreferred")

                                                    , Some(ReadConfig(sparkSession)))


      val order_rdd = MongoSpark.load(sparkSession.sparkContext, readConfig)


      val matchQery = "{$match:{name:'kim'}}"

      val groupQry = "{$group:{_id:null, cnt:{$sum:1}}}"


      val res_rdd = order_rdd.withPipeline(Seq(Document.parse(matchQry), Document.parse(groupQry)))


      res_rdd.foreach(println(_)) 


      sparkSession.close()

 }


}



* 오타가 있을 수 있습니다



반응형