????kafka
????1??kafka??jms??????
????2??kafka??topic???
????topic?????????????????????????е???????partitioner?????topic?????????partition???????paritition?????????broker?У??????????洢??????
????3??partitioner
????partition??topic?????????洢?е???????????????????????????У???????topic????_partition????4??segment
???????partition???????segment?????????С??1G?????????λ???????offsetλ?á?
????5??kafka????????
????1/??????????????pagecache???棬????????浽???????????????????д????    ?д???????С?
????????????????д???????????=>600MB/s????????洢???100kb/s?????
????2/??ò???????sendfile???????????????????????????????????????????os???????????????????衣
????6?????????partitioner7????????partitioner????з?????segment???
????8??kafka??HA
??????partitioner???????б????????zookeeper???????????leader????????????洢??????????????leader??????????replicatition????????????
????9??kafka?????shell???????????????д????10??kafka?????JavaAPI?????????????????
??????????????????????Scala??Spark????
????scala???
????1????ζ??????
????2????ζ??庯????????????????????????????????????????
????3???????ж??????????????
????4???????????Array??list??set??tuple??map    ??????????????????5????????????6??trit????????????7??????????????????????????
????8??scala????????
?????????????????ú????????????????????????
?????????????????????У????????????????????????????????????????????????????????
????object MyPredef{
????//??????????????
????implicit def fileReadToRichFile(file: File)=new RichFile(file)
????}
????????
????import MyPredef._9??Actor
????д???????????????????socket10??akka
????ActorSystem.actorOf()???????Actor??
???????????????????Actor?е?prestart?????????????Щ?????
????Spark RDD
????1??SparkRDD???????????????????????????????????????????????????????????????????м???????????д????????RDD???????????????????????????
????2??RDD????????
???????з???
??????????????????????????
???????????????
??????λ????????HDFS?????????
????3??RDD??????????Transformation??Action
????Transformation??????????????????????????????????????????????????
????Action????????У?????е?Action???????DAGSchudle?з?stage???з???????TaskScheduler?????????DriverActor?????executor????С?
????4??RDD??????????Transformation
????->combineByKey(x=>x??(a:List[String]??b:String) => a :+ b??
????(m:List[String]??n:List[String])=> m ++ n)
?????????????????????????????δ????
????????????????????????????????δ????
???????????????????map???????????reduce????ζ???Щlist???д????
????->aggregate("?????????????String???????int")(?????func???????func)
??????????????????????????????func??????map????????func??????reduce???
????->reduceByKey(_+_)  ??????map???reduce?????????о??????
???????reduceByKey??aggregateByKey??????????combineByKey??????????????????
????->mapPartitions
?????????????????в?????????????????????????????
????????????????????????????????????????????????????????????
????rdd1.mapPartitions((it:Iterator[String]) => {
????it.toList.map(x => (x??1)).iterator
????})
????->mapPartitionsWithIndex
????????????????????????????д?????????????????
????val func = (index:Int?? it:Iterator[Int]) => {
????it.toList.map(x => ("index:" + index?? x)).iterator
????}
????rdd1.mapPartitionsWithIndex(func).collect
????5??RDD?????Partitioner
????//??????????????д?????getPartition??????numPartitions??????
????//?????????????????????????????????????????????????з??????
????class HostPartition(hostArr:Array[String]) extends Partitioner{
????//?????е???????з????????????????int???????????map???????
????val map = new mutable.HashMap[String??Int]()
????for(index <- 0 until(hostArr.length)){
????map.put(hostArr(index)??index)
????}
????//??дgetPartition???????
????override def getPartition(key: Any): Int = {
????map.getOrElse(key.toString??0)
????}
????override def numPartitions: Int = hostArr.length
????}
????????
????val hostPartition: HostPartition = new HostPartition(hostList)
????val allPartitionRDD: RDD[(String?? (String?? Int))] = host_url_count.partitionBy(hostPartition)
????6??????????????  ==>???????
????case class Gril(yanzhi:Int??nianling:Int) extends Ordered[Gril] with Serializable{
????override def compare(that: Gril): Int = {
????val yanzhiResult: Int = this.yanzhi.compareTo(that.yanzhi)
????if(yanzhiResult == 0){
????return this.nianling.compareTo(that.nianling)
????}
????return yanzhiResult
????}
????}
????????
????val rdd2: RDD[(String?? Int?? Int)] = rdd1.sortBy(msg => Gril(msg._2??msg._3))
????Spark??SQLContext 1??Spark????Hive??HDFS ??????Hive??hive-site.xml ; hadoop??core-site.xml??hdfs-site.xml??????Spark??conf??????ɡ?Spark?????????hive??????????????NameNode???????????
????2??DataFrame?????
??????????????????????RDD??????RDD?е???????????????
????3??DataFrame??δ?????
????????????->RDD + case class
????->RDD + structType
????->sqlContext.read.format.options(Map())
????4??DataFrame???????????????????????sqlContext????????
????dF.registerTempTable("person")
????5?????sqlContext  ==> ???????DataFrame
????sqlContext.sql("select * from person")
????6??DataFrame??????д??HDFS????mysql??
????val prop = new Properties()
????prop.put("user"?? "root")
????prop.put("password"?? "815325")
????//???????????????????????????????????????