协程 Python爬虫实战 keras casting nlp onclick jqgrid Keys.js vue安装 后台界面模板 sublime分屏快捷键 matlab向量的模 arraylist删除指定元素 vim跳到文件末尾 json转object docker启动命令 python迭代器 python线程 python中assert python正则表达式例子 python用什么ide python怎么下载 java接口 java入门编程 java集合图 java如何编写接口 linuxls命令 js四舍五入 vs2010sp1 梦幻西游手游助手 js数组删除指定元素 字幕制作软件哪个好 微信python退出程序 dg分区 英雄联盟体验服转换器 fdisk下载 ps色阶快捷键 视频解析软件 扫微信二维码诈骗原理 文明6万神殿
当前位置: 首页 > 学习教程  > 编程语言

Spark GraphX——pregel案例:求最短路径、求最小值

2020/8/11 19:57:02 文章标签:

一、Pregel API

def pregel[A] (initialMsg: A, maxIter: Int = Int.MaxValue,  activeDir: EdgeDirection = EdgeDirection.Out)
      (vprog: (VertexId, VD, A) => VD,
       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
       mergeMsg: (A, A) => A) : Graph[VD, ED]

参数介绍:

参数名 作用
initialMsg 图初始化的时候,开始模型计算的时候,所有节点都会先收到一个消息
maxIter 最大 迭代次数
EdgeDirection 规定了发送消息的方向
vprog(VertexId,VD,A) VertexId:节点的id必须要使用Long型的,VD:节点的值,A:传来的值。节点调用该消息将聚合后的数据和本节点进行属性的合并
sendMsg(EdgeTriplet) 返回一个迭代器,迭代器里一般包含:目的地id,源节点的值和边界的值。激活态的节点调用该方法发送信息
mergeMsg 如果一个节点接收到多条信息,先用mergeMsg来将多条信息聚合称为一条信息,如果节点只收到一条信息,则不调用该函数

二、EdgeTriplet的几个属性

  • srcAttr:源节点的数据
  • dstAttr:目标节点的数据
  • attr:edge中的属性
  • srcId:源节点的id
  • dstId:目标节点的id

三、案例分析

  1. 求最短路径
package com.wang.spark_graph

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD._

import org.apache.spark.rdd.RDD

object SparkpregelDemo extends App {
  //1 创建SparkContext
  val sparkConf = new SparkConf().setAppName("pergelDemo").setMaster("local[*]")
  val sparkContext = new SparkContext(sparkConf)
  //2、创建顶点
  val vertexArray = Array(
    (1L, ("Alice", 28)),
    (2L, ("Bob", 27)),
    (3L, ("Charlie", 65)),
    (4L, ("David", 42)),
    (5L, ("Ed", 55)),
    (6L, ("Fran", 50))
  )
  val vertexRDD: RDD[(Long, (String, Int))] = sparkContext.makeRDD(vertexArray)
  //3 创建边,边的属性代表相邻两个顶点之间的距离
  val edgeArray = Array(
    Edge(2L, 1L, 7),
    Edge(2L, 4L, 2),
    Edge(3L, 2L, 4),
    Edge(3L, 6L, 3),
    Edge(4L, 1L, 1),
    Edge(2L, 5L, 2),
    Edge(5L, 3L, 8),
    Edge(5L, 6L, 3)
  )
  private val edgeRDD: RDD[Edge[PartitionID]] = sparkContext.makeRDD(edgeArray)
  //4 创建图
  val graph1 = Graph(vertexRDD,edgeRDD)

  /* ************************** 使用pregle算法计算 ,顶点5 到 各个顶点的最短距离 ************************** */
  //被计算的图中,起始点id
  val srcVertexId = 5L
  //PositiveInfinity正无穷
  val initialGraph: Graph[Double, PartitionID] = graph1.mapVertices{case(vid,(name,age))=>if (vid==srcVertexId) 0.0 else Double.PositiveInfinity}
  //5.调用pregel
  val pregelGraph: Graph[Double, PartitionID] = initialGraph.pregel(
    Double.PositiveInfinity, Int.MaxValue, EdgeDirection.Out
  )(
    (vid: VertexId, vd: Double, distMsg: Double) => {
      val minDist = math.min(vd, distMsg)
      println(s"顶点${vid},属性${vd},收到消息${distMsg},合并后的属性${minDist}")
      minDist
    },
    (edgeTriplet: EdgeTriplet[Double, PartitionID]) => {
      if (edgeTriplet.srcAttr + edgeTriplet.attr < edgeTriplet.dstAttr) {
        println(s"顶点${edgeTriplet.srcId} 给 顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}")
        Iterator[(VertexId, Double)]((edgeTriplet.dstId, edgeTriplet.srcAttr + edgeTriplet.attr))
      } else {
        Iterator.empty
      }
    },
    (msg1: Double, msg2: Double) => math.min(msg1, msg2)
  )
  pregelGraph

  //输出结果
  pregelGraph.triplets.collect().foreach(println)

  //7.关闭SparkContext
  sparkContext.stop()
}

  1. 求最小值
package com.wang.spark_graph

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD._

object SparkpregelDemo2 extends App {
  val sparkConf = new SparkConf().setAppName("pergelDemo").setMaster("local[*]")
  val sc = new SparkContext(sparkConf)
  val vertices: RDD[(VertexId, (Int, Int))] = sc.parallelize(Array((1L, (7,-1)), (2L, (3,-1)),  (3L, (7,-1)), (4L, (6,-1))))
  val edges = sc.parallelize(Array(Edge(1L, 2L, 0), Edge(1L, 4L, 0),  Edge(2L, 4L, 0), Edge(3L, 1L, 0),  Edge(3L, 4L, 0)))
  val graph = Graph(vertices,edges)
  val initialMsg = 9999
  def vprog(vertexId:VertexId,value:(Int,Int),message:Int):(Int,Int) = { if (message == initialMsg) value else (message min value._1,value._1)}
  def sendMsg(triplet:EdgeTriplet[(Int,Int),Int]):Iterator[(VertexId,Int)] = {
    //println(triplet)//格式:((3,(2,-1)),(1,(7,-1)),0)
    val sourceVertex = triplet.srcAttr
    if(sourceVertex._1 == sourceVertex._2) Iterator.empty else Iterator((triplet.dstId,sourceVertex._1))
  }
  def mergeMsg(msg1:Int,msg2:Int):Int = msg1 min msg2
  val minGraph = graph.pregel(initialMsg,Int.MaxValue,EdgeDirection.Out)(vprog,sendMsg,mergeMsg)
  minGraph.vertices.collect().foreach(println)

}


本文链接: http://www.dtmao.cc/news_show_100269.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?