macos canvas razor random pyspark ros 打印 clojure vb6 python相对路径怎么写 docker创建容器 python编程 python关键字 python函数参数 python建站 java基础语言 java函数 java设置 java开发环境安装 java获取 java循环list linux命令 linux系统命令大全 opengl编程指南 路由器辐射大吗 js删除节点 js格式化时间 vnc客户端 视频字幕提取器 html特殊符号 电脑密码查看器 winterboard 还原软件哪个好 dnf95b套 pr加速视频 4k对齐是什么意思 python爬取图片 ps怎么做动画 bat转exe 伤害显示宏 苹果手机怎么添加邮箱
当前位置: 首页 > 学习教程  > 编程语言

使用Spark实现统计连续登陆的三天及以上的用户

2020/8/11 19:48:05 文章标签:

--   这个问题可以扩展到很多相似的问题:连续几个月充值会员、连续天数有商品卖出、连续打滴滴、连续逾期。

数据如下:

guid01,2018-02-28
guid01,2018-03-01
guid01,2018-03-02
guid01,2018-03-04
guid01,2018-03-05
guid01,2018-03-06
guid01,2018-03-07
guid02,2018-03-01
guid02,2018-03-02
guid02,2018-03-03
guid02,2018-03-06 

    uid| 连续登录天数 |  起始日期  |  结束日期 |
±--------±-------±------------±------------±-+
| guid01 |       4     | 2018-03-04   | 2018-03-07 |
| guid02 |       3      | 2018-03-01 | 2018-03-03 | 

def main(args: Array[String]): Unit = {

    val isLocal = args(0).toBoolean
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
    if(isLocal){
      conf.setMaster("local[*]")
    }
    val sc = new SparkContext(conf)
    //指定从哪里读取数据,并创建RDD
    val lines: RDD[String] = sc.textFile(args(1))
    val uidAndDate: RDD[(String, String)] = lines.map(line => {
      val fields: Array[String] = line.split(",")
      val uid = fields(0)
      val date = fields(1)
      (uid, date)
    })

    //根据uid进行分组,将同一个用户的登录数据,搞到同一个组内
    val grouped: RDD[(String, Iterable[String])] = uidAndDate.groupByKey()

    //在组内进行排序
    val uidAndDateDatedif = grouped.flatMapValues(it => {
      //toSet可以将日期去重,再tolist,进行排序
      //留意如果数据特别大,toset 或者toList 可能会内存溢出
      val sorted = it.toSet.toList.sorted

      //定义一个日期的工具类
      val calendar = Calendar.getInstance()
      val sdf = new SimpleDateFormat("yyyy-MM-dd")

      var index = 0
      sorted.map(dateStr => {
        val date = sdf.parse(dateStr)
        calendar.setTime(date)
        val difdate = calendar.add(Calendar.DATE, -index)
        index += 1

        (dateStr, sdf.format(calendar.getTime))
      })
    })
    //println(uidAndDateDatedif.collect().toBuffer)
    val result = uidAndDateDatedif.map(t => {
      ((t._1, t._2._2), t._2._1)
    }).groupByKey().mapValues(it => {
      val list: List[String] = it.toList.sorted
      val times = list.size
      val begintime = list.head
      val endtime = list.last
      (times, begintime, endtime)
    }).filter(t=>{t._2._1>=3}).map(t => {
      (t._1._1, t._2._1, t._2._2, t._2._3)
    })

    val res = result.collect()
println(res.toBuffer)
 sc.stop()
  }

 


本文链接: http://www.dtmao.cc/news_show_100199.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?