spark 2.x中使用 scala和spark 实现:订单中 门店下的用户数统计,并且计算平均消费金额,交易笔数

这是命令行的简单操作案例,借用spark里面的API:flatMap、reduceByKey来实现,步骤如下:
1)启动spark shell:
./spark-shell
注:spark shell 是spark自带的一个快速开发工具,在spark目录下bin目录里面
2)读入需要处理的单词文件
val file = sc.textFile("file:///home/hadoop/data/hello.txt")
3)将文件中的每一行单词按照分隔符(这里是空格)分隔
val a = file.flatMap(line =& line.split(" "))
4)给每个单词计数为1
val b = a.map(word =& (word,1))
或者:val b = a.map((_,1))
5)统计相同单词的数量
val c = b.reduceByKey(_ + _)
或者:val c = b.reduceByKey((A,B)=&(A+B))
合并成一行代码:
sc.textFile("file:///home/hadoop/data/hello.txt").flatMap(line =& line.split(" ")).map(word =& (word,1)) .reduceByKey(_ + _).collect
Spark学习—统计文件单词出现次数
上一节我们简单介绍了RDD中转化和执行操作的用法,本节将通过一个具体的示例来加深对RDD的认识。
统计本地文件中单词出现次数
二.操作流程
1.读取外部文件创建JavaRDD;
Spark案例:Scala版统计单词个数
利用Spark的RDD可以对本机或集群上的文件进行词频统计。1、创建Scala项目SparkScalaWordCount2、创建lib目录,添加spark的jar,并添加作为项目的库3、在项目根目录创...
简单SparkRDD单词计数操作
1 工具IDEA
2 添加jar依赖
3 spark的local模式
在自己电脑下创建文件:
E://sparktext//text.txt然后写入:
Spark 加强版WordCount ,统计日志中文件访问数量
原文地址:http://blog.csdn.net/whzhaochao/article/details/写在前面学习Scala和Spark基本语法比较枯燥无味,搞搞简单的实际运用可以...
spark 单词统计
maven 项目 前提是装好hadoop集群和spark集群 并上传好文件到hdfs
pom.xml 如下
使用spark的dataframe实现单词统计
package com.org.spark.
import com.google.common.collect.L
import org.apache.spark.Spark...
上篇文章搭建了spark,并给出了java版本的WordCount示例,但是总感觉spark程序用scala语言编写才更好,因为scala语言会让spark程序很简洁,能在很大程度上提高开发效率,下面...
SparkRDD练习,加深理解。
没有更多推荐了,
(window.slotbydup=window.slotbydup || []).push({
id: "5865575",
container: s,
size: "300,250",
display: "inlay-fix"101.scala实现相似度计算购买该课程后可享受以下付费特权:价  格:
¥598.00(0份评价)章节1大数据案例第一天章节2大数据案例第二天章节3大数据案例第三天章节4大数据案例第四天章节5大数据案例第五天章节6大数据案例第六天章节7大数据案例第七天章节8大数据案例第八天章节9大数据案例第九天章节10大数据案例第十天还可以输入200字共有0条笔记,参加学习后可见·····相关推荐·····精品推荐··········欢迎登录课课家教育下次自动登录登录还没账号?后使用快捷导航没有帐号?
查看: 482|回复: 5
python、java、scala在spark开发中哪个有前途?
新手上路, 积分 48, 距离下一级还需 2 积分
论坛徽章:8
python、、在spark开发中哪个有前途?
新手上路, 积分 48, 距离下一级还需 2 积分
论坛徽章:8
python、java、scala在spark开发中哪个有前途?
新手上路, 积分 35, 距离下一级还需 15 积分
论坛徽章:1
谈不上哪个有前途,只要你牛逼,精一个,能做出事情来就ok了,不知道哪个合适,就用它原生的
金牌会员, 积分 1549, 距离下一级还需 1451 积分
论坛徽章:25
应该scala才是原配,性能较好
金牌会员, 积分 2323, 距离下一级还需 677 积分
论坛徽章:8
也觉得Scala 更好点,Python 支持慢一点
论坛徽章:132
应该都差不多吧,主要看实际项目环境和喜好
dataguru.cn All Right Reserved.
扫一扫加入本版微信群Access denied | www.somtvso.com used Cloudflare to restrict access
Please enable cookies.
What happened?
The owner of this website (www.somtvso.com) has banned your access based on your browser's signature (44cff87cd1834070-ua98).SparkSQL结合SparkStreaming,使用SQL完成实时计算中的数据统计
关键字:SparkSQL、Spark Streaming、SQL、实时计算
Flume+Kafka+SparkStreaming已经发展为一个比较成熟的实时日志收集与计算架构,利用Kafka,即可以支持将用于离线分析的数据流到HDFS,又可以同时支撑多个消费者实时消费数据,包括SparkStreaming。然而,在SparkStreaming程序中如果有复杂业务逻辑的统计,使用scala代码实现起来比较困难,也不易于别人理解。但如果在SparkSteaming中也使用SQL来做统计分析,是不是就简单的多呢?
本文介绍将SparkSQL与SparkStreaming结合起来,使用SQL完成实时的日志数据统计。SparkStreaming程序以yarn-cluster模式运行在YARN上,不单独部署Spark集群。
Hadoop-2.3.0-cdh5.0.0(YARN)
spark-1.5.0-bin-hadoop2.3
kafka_2.10-0.8.2.1
另外,还编译了SparkStreaming用于读取Kafka数据的插件:
spark-streaming-kafka_2.10-1.5.0.jar
相关环境的部署本文不做介绍,请参考文章最后的相关阅读。
实时统计需求
以60秒为间隔,统计60秒内的pv,ip数,uv
最终结果包括:
时间点:pv:ips:uv
原始日志格式
2015-11-11T14:59:59|~|xxx|~|202.109.201.181|~|xxx|~|xxx|~|xxx|~|B5C96DCA
2015-11-11T14:59:59|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|B
2015-11-11T14:59:59|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|1555BDE76F
2015-11-11T15:00:00|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|C0EAE70E
2015-11-11T15:00:00|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|C0EAE70E
2015-11-11T15:00:01|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|4E9FDB9
每条日志包含7个字段,分隔符为|~|,其中,第3列为ip,第7列为cookieid。假设原始日志已经由Flume流到Kafka中。
SparkStreaming程序代码
程序中使用下面的SQL语句完成对一个批次的数据统计:
SELECT date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') AS time,
COUNT(1) AS pv,
COUNT(DISTINCT ip) AS ips,
COUNT(DISTINCT cookieid) as uv
FROM daplog
SparkStreaming程序代码:
package com.lxw.test
import scala.reflect.runtime.universe
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.kafka.KafkaUtils
object DapLogStreaming {
def main (args : Array[String]) {
val sparkConf = new SparkConf().setMaster("yarn-cluster").setAppName("DapLogStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(60))
val kafkaStream = KafkaUtils.createStream(
"bj11-65:2181",
"group_spark_streaming",
Map[String, Int]("daplog" -& 0,"daplog" -& 1),
StorageLevel.MEMORY_AND_DISK_SER)
.map(x =& x._2.split("\\|~\\|", -1))
kafkaStream.foreachRDD((rdd: RDD[Array[String]], time: Time) =& {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val logDataFrame = rdd.map(w =& DapLog(w(0).substring(0, 10),w(2),w(6))).toDF()
logDataFrame.registerTempTable("daplog")
val logCountsDataFrame =
sqlContext.sql("select date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as time,count(1) as pv,count(distinct ip) as ips,count(distinct cookieid) as uv from daplog")
logCountsDataFrame.show()
ssc.start()
ssc.awaitTermination()
case class DapLog(day:String, ip:String, cookieid:String)
object SQLContextSingleton {
@transient
private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
示例中只是将实时统计的结果打印到标准输出,真实场景一般是将结果持久化到数据库中。
将该程序打包成DapLogStreaming.jar,上传至网关机。
运行SparkStreaming程序
进入$SPARK_HOME/bin执行下面的命令,将SparkStreaming程序提交到YARN:
./spark-submit \
--class com.lxw.test.DapLogStreaming \
--master yarn-cluster \
--executor-memory 2G \
--num-executors 6 \
--jars /home/liuxiaowen/kafka-clients-0.8.2.1.jar,/home/liuxiaowen/metrics-core-2.2.0.jar,/home/liuxiaowen/zkclient-0.3.jar,/home/liuxiaowen/spark-streaming-kafka_2.10-1.5.0.jar,/home/liuxiaowen/kafka_2.10-0.8.2.1.jar \
/home/liuxiaowen/DapLogStreaming.jar
注意:SparkStreaming及Kafka插件运行时候需要依赖相应的jar包。
查看运行结果
进入YARN ResourceManager的WEB界面,找到该程序对应的Application,点击ApplicationMaster的链接,进入SparkMaster界面:
每个批次(60秒),会生成一个Job。
点击TAB页”Streaming”,进入Streaming的监控页面:
在最下方,显示正在处理的批次和已经完成的批次,包括每个批次的events数量。
最后,最主要的,点击ApplicationMaster的logs链接,查看stdout标准输出:
已经按照SQL中统计的字段,打印出统计结果,每60秒一个批次打印一次。
由于kafka_2.10-0.8.2.1是基于Scala2.10的,因此Spark、Spark的Kafka插件、SparkStreaming应用程序都需要使用Scala2.10,如果使用Scala2.11,运行时候会报出因Scala版本不一致而造成的错误,比如:
15/11/11 15:36:26 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:59)
at com.lxw.test.DapLogStreaming$.main(DapLogStreaming.scala:23)
at com.lxw.test.DapLogStreaming.main(DapLogStreaming.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.sc
sparkstreaming结合sparksql-2.x实时向hive中写数据
今天主要来介绍一下SparkSql,2.x新版本操作hive的一个写法.
Spark SQL 的功能之一是执行 SQL 查询.Spark SQL 也能够被用于从已存在的 Hive 环境中读取数据
第114课:SparkStreaming+Kafka+Spark SQL+TopN+Mysql+KafkaOffsetMonitor电商广告点击综合案例实战(详细内幕版本)
第114课:Spark Streaming电商广告点击综合案例实战(测试版本)
/* 王家林老师授课http://weibo.com/ilovepains
每天晚上20:00YY频道现场授课频道68...
使用Spark Streaming+Spark SQL实现在线动态计算出特定时间窗口下的不同种类商品中的热门商品排名
第97课: 使用Spark Streaming+Spark SQL实现在线动态计算出特定时间窗口下的不同种类商品中的热门商品排名
本节课将在之前学习的Spark SQL和 DataFra...
SparkStreaming转SparkSql
上篇讲述不同topic之间Join,很多聚合操作如group by,不如SparkSql灵活.所以想将join后topic转变成DataSet格式.
发现官网有现成demo例子 words.fore...
spark streaming 中使用 spark sql
package com.immooc.spark
import org.apache.spark.{SparkConf, rdd}
import org.apache.spark.rdd.RDD
Spark(四)————SparkSQL和SparkStreaming
1、什么是SparkSQL
SparkSQL模块能构建在Spark上运行sql语句,主要有DataFrame(数据框、表),它类似于构建在hadoop上的hive以及构建在hbase上的pheo...
Spark Streaming 结合Spark SQL 案例
本博文主要包含以下内容:
String+SQL技术实现解析
Streaming+SQL实现实战
一:SparkString+SparkSQL技术实现解析:使用Spark Streaming + Spa...
基于Spark streaming的SQL服务实时自动化运维
设计背景spark thriftserver目前线上有10个实例,以往通过监控端口存活的方式很不准确,当出故障时进程不退出情况很多,而手动去查看日志再重启处理服务这个过程很低效,故设计利用Spark ...
SparkSQL和Spark Streaming结合统计热词
pom文件: 4.0.0 com.it18zhang.spark.streaming spark-nc 1.0-SNAPSHOT org.apache.maven.plugins maven-comp...
Spark SQL+Spark Streaming案例
package SparkStreaming
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SQLConte...
没有更多推荐了,
(window.slotbydup=window.slotbydup || []).push({
id: "5865575",
container: s,
size: "300,250",
display: "inlay-fix"

我要回帖

更多关于 scala和spark 的文章

 

随机推荐