1.需求的出现
当我们在driver端调度spark作用的过程中,需要向各个节点发送任务“数据”--Rdd,一个般一个Rdd会对应多个任务,没一个任务可以交给一个excutor执行,而一个excutor可以开启多个线程去计算,那么此时每个线程都要从Driver端获取Rdd,那样就会产生大量的副本,当需要向excutor传递大型变量的时候,就会产生大量的网络占用,而且多次序列化,与反序列化都会占用资源。
2.解决方案
Spark采用了广播变量的方案,解决了产生副本过多的问题,driver会将在任务执行过程中需要发送的序列化变量对象进行切割,形成多个chunk,储存在BlockkManager中,每个excutor一样都会有一个blockManager,当excutor需要变量的时候首先会从自身的BlockManager中去寻找,如果没有才去Driver或者其他执行器进行抓取,这样就可以确保在一个excutor中只需要一份变量副本。也就减少了大量变量副本而产生的网络占用了。
验证实例:
一、不采用广播变量
1)定义方法可以监控Spark任务执行端的信息
def sendInfo(obj: Object, m: String, param: String) = {val ip = java.net.InetAddress.getLocalHost.getHostAddressval pid = java.lang.management.ManagementFactory.getRuntimeMXBean.getName.split("@")(0)val tname = Thread.currentThread().getNameval classname = obj.getClass.getSimpleNameval objHash = obj.hashCode()val info = ip + "/" + pid + "/" + tname + "/" + classname + "@" + objHash + "/" + m + "(" + param + ")" + "\r\n"//发送数据给nc 服务器val sock = new java.net.Socket("s101", 8888)val out = sock.getOutputStreamout.write(info.getBytes())out.flush()out.close()}
2)首先创建一个可序列化的dog类,(entends Serializable)
scala> class dog extends Serializable
3)创建一个dog对象,并将其作为变量传入spark作业任务中
//创建对象 scala> val d = new dogd: dog = dog@321b8863 //创建rddscala> val rdd1 = sc.makeRDD(1 to 10 ,10)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at:24val rdd2 = rdd1.map(e=>{sendInfo(d,"x","x");e})rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at :31 //触发Spark任务
scala> rdd2.collect
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)4)监控结果:
192.168.48.101/4140/Executor task launch worker-0/dog@1920051793/x(x)192.168.48.102/3355/Executor task launch worker-0/dog@867775714/x(x)192.168.48.102/3360/Executor task launch worker-0/dog@634533975/x(x)192.168.48.102/3355/Executor task launch worker-1/dog@1176307278/x(x)192.168.48.104/3364/Executor task launch worker-0/dog@1762941990/x(x)192.168.48.104/3357/Executor task launch worker-0/dog@1176451488/x(x)192.168.48.103/3450/Executor task launch worker-0/dog@2076792302/x(x)192.168.48.103/3445/Executor task launch worker-0/dog@1844856176/x(x)192.168.48.103/3445/Executor task launch worker-1/dog@1152883024/x(x)192.168.48.103/3450/Executor task launch worker-1/dog@1619414885/x(x)
通过dog对象的地址可以看出,每个executor的每个线程都会创建(反序列化,从Driver端抓取(是否会从其他executor抓取还有待验证))一个新的对象。
二、广播变量的实现
1)定义方法可以监控Spark任务执行端的信息
def sendInfo(obj: Object, m: String, param: String) = {val ip = java.net.InetAddress.getLocalHost.getHostAddressval pid = java.lang.management.ManagementFactory.getRuntimeMXBean.getName.split("@")(0)val tname = Thread.currentThread().getNameval classname = obj.getClass.getSimpleNameval objHash = obj.hashCode() val info = ip + "/" + pid + "/" + tname + "/" + classname + "@" + objHash + "/" + m + "(" + param + ")" + "\r\n" //发送数据给nc 服务器 val sock = new java.net.Socket("s101", 8888) val out = sock.getOutputStream out.write(info.getBytes()) out.flush() out.close() }
2)首先创建一个可序列化的dog类,(entends Serializable)
scala> class dog extends Serializable
3)创建一个dog对象,并将其作为变量传入spark作业任务中
//创建对象 scala> val d = new dogd: dog = dog@321b8863 //创建广播变量 scala> val d1 = sc.broadcast(d) //创建rddscala> val rdd1 = sc.makeRDD(1 to 10 ,10)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at:24val rdd2 = rdd1.map(e=>{sendInfo(d1.value,"x","x");e}) rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at :31 //触发Spark任务
scala> rdd2.collect
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)4)监控结果:(由于集群设计的问题导致很难看出结论)
192.168.48.104/3559/Executor task launch worker-0/dog@1092230054/x(x)192.168.48.104/3559/Executor task launch worker-1/dog@1092230054/x(x)192.168.48.104/3562/Executor task launch worker-0/dog@2084510396/x(x) 192.168.48.103/3646/Executor task launch worker-0/dog@613174275/x(x)192.168.48.103/3655/Executor task launch worker-0/dog@603565101/x(x) 192.168.48.102/3561/Executor task launch worker-0/dog@369123253/x(x)192.168.48.102/3561/Executor task launch worker-1/dog@369123253/x(x)192.168.48.102/3546/Executor task launch worker-1/dog@2120472001/x(x)192.168.48.102/3546/Executor task launch worker-0/dog@2120472001/x(x) 192.168.48.101/5281/Executor task launch worker-0/dog@976431494/x(x)
看到这个结果开始我是有点懵的,为什么同一个节点的 worker-0和worker-1是同一个dog,但是同样的worker-0却是不同的dog。后来才想起来之前修改过配置文件spark-env.sh。
在每个节点启动了两个executor。也就是说 两个worker-0是属于不同的executor的所以,是不同的dog。
5)通过修改启动spark-shell时的参数配置,改变资源配置,实现一个节点只启动一个executor,一个executor启动多个线程。重复上述同样步骤
spark-shell --master spark://s101:7077 --executor-cores 4 --total-executor-cores 40
6)重复上述同样步骤,得到如下结果
192.168.48.104/4293/Executor task launch worker-2/dog@1746435851/x(x)192.168.48.104/4293/Executor task launch worker-0/dog@1746435851/x(x)192.168.48.104/4293/Executor task launch worker-1/dog@1746435851/x(x)192.168.48.102/4305/Executor task launch worker-1/dog@744462004/x(x)192.168.48.102/4305/Executor task launch worker-0/dog@744462004/x(x)192.168.48.103/4374/Executor task launch worker-0/dog@1750215217/x(x)192.168.48.103/4374/Executor task launch worker-1/dog@1750215217/x(x)192.168.48.103/4374/Executor task launch worker-2/dog@1750215217/x(x)192.168.48.101/7610/Executor task launch worker-1/dog@1611268215/x(x)192.168.48.101/7610/Executor task launch worker-2/dog@1611268215/x(x)
可以看出一个executor多个线程共享一个dog对象。