博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark 广播变量 TorrentBroadcast
阅读量:6229 次
发布时间:2019-06-21

本文共 5373 字,大约阅读时间需要 17 分钟。

1.需求的出现

  当我们在driver端调度spark作用的过程中,需要向各个节点发送任务“数据”--Rdd,一个般一个Rdd会对应多个任务,没一个任务可以交给一个excutor执行,而一个excutor可以开启多个线程去计算,那么此时每个线程都要从Driver端获取Rdd,那样就会产生大量的副本,当需要向excutor传递大型变量的时候,就会产生大量的网络占用,而且多次序列化,与反序列化都会占用资源。

2.解决方案

  Spark采用了广播变量的方案,解决了产生副本过多的问题,driver会将在任务执行过程中需要发送的序列化变量对象进行切割,形成多个chunk,储存在BlockkManager中,每个excutor一样都会有一个blockManager,当excutor需要变量的时候首先会从自身的BlockManager中去寻找,如果没有才去Driver或者其他执行器进行抓取,这样就可以确保在一个excutor中只需要一份变量副本。也就减少了大量变量副本而产生的网络占用了。

验证实例:

一、不采用广播变量

1)定义方法可以监控Spark任务执行端的信息

def sendInfo(obj: Object, m: String, param: String) = {val ip = java.net.InetAddress.getLocalHost.getHostAddressval pid = java.lang.management.ManagementFactory.getRuntimeMXBean.getName.split("@")(0)val tname = Thread.currentThread().getNameval classname = obj.getClass.getSimpleNameval objHash = obj.hashCode()val info = ip + "/" + pid + "/" + tname + "/" + classname + "@" + objHash + "/" + m + "(" + param + ")" + "\r\n"//发送数据给nc 服务器val sock = new java.net.Socket("s101", 8888)val out = sock.getOutputStreamout.write(info.getBytes())out.flush()out.close()}

2)首先创建一个可序列化的dog类,(entends Serializable)

scala> class dog extends Serializable

3)创建一个dog对象,并将其作为变量传入spark作业任务中

//创建对象 scala> val d = new dogd: dog = dog@321b8863 //创建rddscala> val rdd1 = sc.makeRDD(1 to 10 ,10)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at 
:24val rdd2 = rdd1.map(e=>{sendInfo(d,"x","x");e})rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at
:31 //触发Spark任务

 scala> rdd2.collect

 res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

4)监控结果:

192.168.48.101/4140/Executor task launch worker-0/dog@1920051793/x(x)192.168.48.102/3355/Executor task launch worker-0/dog@867775714/x(x)192.168.48.102/3360/Executor task launch worker-0/dog@634533975/x(x)192.168.48.102/3355/Executor task launch worker-1/dog@1176307278/x(x)192.168.48.104/3364/Executor task launch worker-0/dog@1762941990/x(x)192.168.48.104/3357/Executor task launch worker-0/dog@1176451488/x(x)192.168.48.103/3450/Executor task launch worker-0/dog@2076792302/x(x)192.168.48.103/3445/Executor task launch worker-0/dog@1844856176/x(x)192.168.48.103/3445/Executor task launch worker-1/dog@1152883024/x(x)192.168.48.103/3450/Executor task launch worker-1/dog@1619414885/x(x)

  通过dog对象的地址可以看出,每个executor的每个线程都会创建(反序列化,从Driver端抓取(是否会从其他executor抓取还有待验证))一个新的对象。

二、广播变量的实现

1)定义方法可以监控Spark任务执行端的信息

def sendInfo(obj: Object, m: String, param: String) = {val ip = java.net.InetAddress.getLocalHost.getHostAddressval pid = java.lang.management.ManagementFactory.getRuntimeMXBean.getName.split("@")(0)val tname = Thread.currentThread().getNameval classname = obj.getClass.getSimpleNameval objHash = obj.hashCode() val info = ip + "/" + pid + "/" + tname + "/" + classname + "@" + objHash + "/" + m + "(" + param + ")" + "\r\n" //发送数据给nc 服务器 val sock = new java.net.Socket("s101", 8888) val out = sock.getOutputStream out.write(info.getBytes()) out.flush() out.close() }

2)首先创建一个可序列化的dog类,(entends Serializable)

scala> class dog extends Serializable

3)创建一个dog对象,并将其作为变量传入spark作业任务中

//创建对象 scala> val d = new dogd: dog = dog@321b8863 //创建广播变量 scala> val d1 = sc.broadcast(d) //创建rddscala> val rdd1 = sc.makeRDD(1 to 10 ,10)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at 
:24val rdd2 = rdd1.map(e=>{sendInfo(d1.value,"x","x");e}) rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at
:31 //触发Spark任务

 scala> rdd2.collect

 res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

4)监控结果:(由于集群设计的问题导致很难看出结论)

192.168.48.104/3559/Executor task launch worker-0/dog@1092230054/x(x)192.168.48.104/3559/Executor task launch worker-1/dog@1092230054/x(x)192.168.48.104/3562/Executor task launch worker-0/dog@2084510396/x(x) 192.168.48.103/3646/Executor task launch worker-0/dog@613174275/x(x)192.168.48.103/3655/Executor task launch worker-0/dog@603565101/x(x) 192.168.48.102/3561/Executor task launch worker-0/dog@369123253/x(x)192.168.48.102/3561/Executor task launch worker-1/dog@369123253/x(x)192.168.48.102/3546/Executor task launch worker-1/dog@2120472001/x(x)192.168.48.102/3546/Executor task launch worker-0/dog@2120472001/x(x) 192.168.48.101/5281/Executor task launch worker-0/dog@976431494/x(x)

看到这个结果开始我是有点懵的,为什么同一个节点的 worker-0和worker-1是同一个dog,但是同样的worker-0却是不同的dog。后来才想起来之前修改过配置文件spark-env.sh。

在每个节点启动了两个executor。也就是说 两个worker-0是属于不同的executor的所以,是不同的dog。

5)通过修改启动spark-shell时的参数配置,改变资源配置,实现一个节点只启动一个executor,一个executor启动多个线程。重复上述同样步骤

spark-shell --master spark://s101:7077 --executor-cores 4 --total-executor-cores 40

 

6)重复上述同样步骤,得到如下结果

192.168.48.104/4293/Executor task launch worker-2/dog@1746435851/x(x)192.168.48.104/4293/Executor task launch worker-0/dog@1746435851/x(x)192.168.48.104/4293/Executor task launch worker-1/dog@1746435851/x(x)192.168.48.102/4305/Executor task launch worker-1/dog@744462004/x(x)192.168.48.102/4305/Executor task launch worker-0/dog@744462004/x(x)192.168.48.103/4374/Executor task launch worker-0/dog@1750215217/x(x)192.168.48.103/4374/Executor task launch worker-1/dog@1750215217/x(x)192.168.48.103/4374/Executor task launch worker-2/dog@1750215217/x(x)192.168.48.101/7610/Executor task launch worker-1/dog@1611268215/x(x)192.168.48.101/7610/Executor task launch worker-2/dog@1611268215/x(x)

可以看出一个executor多个线程共享一个dog对象。

 

转载于:https://www.cnblogs.com/yanxun1/p/9800645.html

你可能感兴趣的文章
一个backup exec 2012的真实故障案例,服务无法启动1068
查看>>
我的友情链接
查看>>
Linux基础
查看>>
hadoop+hive环境搭建(centos6.5)-01
查看>>
点到点子接口的帧中继配置
查看>>
计算机网络与Internet应用
查看>>
python md5
查看>>
强制转换与内存
查看>>
发送UDP应答包的思考
查看>>
ASA防火墙基本配置
查看>>
软文真的可以帮助我们的网站吗?
查看>>
现代程序设计 作业6 - 简单而有意义的题目
查看>>
70、MSTP简介
查看>>
【VMware虚拟化解决方案】构建VMware私有云 实现ITaaS
查看>>
每天一个linux命令-mkdir
查看>>
四天精通shell编程(二)
查看>>
Linux 学习笔记_8_进程管理_2_进程管理命令
查看>>
python3中实现客户端与服务端交互发送文件
查看>>
Centos yum异常问题
查看>>
标签制作软件中如何导出标签模板为PDF文件?
查看>>