gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档大数据开发

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 分类归档: "大数据开发"
  • ( 页面40 )
Spark 2月 11,2019

Spark ML机器学习:连续型数据处理之给定边界离散化-Bucketizer

Bucketizer将连续的特征列转换成特征桶(buckets)列。这些桶由用户指定。它拥有一个splits参数。 例如商城的人群,觉得把人分为50以上和50以下太不精准了,应该分为20岁以下,20-30岁,30-40岁,36-50岁,50以上,那么就得用到数值离散化的处理方法了。离散化就是把特征进行适当的离散处理,比如上面所说的年龄是个连续的特征,但是我把它分为不同的年龄阶段就是把它离散化了,这样更利于我们分析用户行为进行精准推荐。Bucketizer能方便的将一堆数据分成不同的区间。

  • splits:如果有n+1个splits,那么将有n个桶。桶将由split x和split y共同确定,它的值范围为[x,y),如果是最后 一个桶,范围将是[x,y]。splits应该严格递增。负无穷和正无穷必须明确的提供用来覆盖所有的双精度值,否则,超出splits的值将会被 认为是一个错误。splits的两个例子是Array(Double.NegativeInfinity, 0.0, 1.0, Double.PositiveInfinity) 和 Array(0.0, 1.0, 2.0)。

  注意,如果你并不知道目标列的上界和下界,你应该添加Double.NegativeInfinity和Double.PositiveInfinity作为边界从而防止潜在的 超过边界的异常。下面是程序调用的例子。

object BucketizerDemo {
  def main(args: Array[String]): Unit = {
    var spark = SparkSession.builder().appName("BucketizerDemo").master("local[2]").getOrCreate();
    val array = Array((1,13.0),(2,16.0),(3,23.0),(4,35.0),(5,56.0),(6,44.0))
    //将数组转为DataFrame
    val df = spark.createDataFrame(array).toDF("id","age")
    // 设定边界,分为5个年龄组:[0,20),[20,30),[30,40),[40,50),[50,正无穷)
    // 注:人的年龄当然不可能正无穷,我只是为了给大家演示正无穷PositiveInfinity的用法,负无穷是NegativeInfinity。
    val splits = Array(0, 20, 30, 40, 50, Double.PositiveInfinity)
    //初始化Bucketizer对象并进行设定:setSplits是设置我们的划分依据
    val bucketizer = new Bucketizer().setSplits(splits).setInputCol("age").setOutputCol("bucketizer_feature")
    //transform方法将DataFrame二值化。
    val bucketizerdf = bucketizer.transform(df)
    //show是用于展示结果
    bucketizerdf.show
  }

}

输出结果:

+---+----+------------------+
| id| age|bucketizer_feature|
+---+----+------------------+
|  1|13.0|               0.0|
|  2|16.0|               0.0|
|  3|23.0|               1.0|
|  4|35.0|               2.0|
|  5|56.0|               4.0|
|  6|44.0|               3.0|
+---+----+------------------+
作者 east
Spark 2月 11,2019

Spark ML机器学习:连续型数据处理之二值化-Binarizer


Binarization是一个将数值特征转换为二值特征的处理过程。threshold参数表示决定二值化的阈值。 值大于阈值的特征二值化为1,否则二值化为0。 例如商城有个需求, 根据年龄来进行物品推荐,把50以上的人分为老年,50以下分为非老年人,那么我们根据二值化可以很简单的把50以上的定为1,50以下的定为0。这样就方便我们后续的推荐了。Binarizer就是根据阈值进行二值化,大于阈值的为1.0,小于等于阈值的为0.0


// $example on$
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.Binarizer
// $example off$
import org.apache.spark.sql.SparkSession

/**
* 二值化
*/
object BinarizerExample {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf();
sparkConf.setMaster("local[*]").setAppName(this.getClass.getSimpleName)
val spark = SparkSession
.builder
.config(sparkConf)
.appName("BinarizerExample")
.getOrCreate()

// $example on$
val data = Array((0, 0.1), (1, 0.8), (2, 0.6))
val dataFrame = spark.createDataFrame(data).toDF("id", "feature")
// transform 开始转换,将该列数据二值化,大于阈值的为1.0,否则为0.0
val binarizer: Binarizer = new Binarizer()
.setInputCol("feature")
.setOutputCol("binarized_feature")
.setThreshold(0.5)

val binarizedDataFrame = binarizer.transform(dataFrame)

println(s"Binarizer output with Threshold = ${binarizer.getThreshold}")
binarizedDataFrame.show()
// $example off$

spark.stop()
}
}

输出结果:

+---+----+-----------------+
| id| age|binarized_feature|
+---+----+-----------------+
|  1|34.0|              0.0|
|  2|56.0|              1.0|
|  3|58.0|              1.0|
|  4|23.0|              0.0|
+---+----+-----------------+
作者 east
Spark 1月 8,2019

Spark数据挖掘实例1:基于 Audioscrobbler 数据集音乐推荐

本实例来源于《Spark高级数据分析》,这是一个很好的spark数据挖掘的实例。从经验上讲,推荐引擎属于大规模机器学习,在日常购物中大家或许深有体会,比如:你在淘宝上浏览了一些商品,或者购买了一些商品,那么淘宝就会根据你的偏好给你推荐一些其他类似的商品。然而,相比较其他机器学习算法,推荐引擎的输出更加的直观,有时候的推荐效果让人吃惊。作为机器学习开篇文章,本篇文章会系统的介绍基于Audioscrobbler数据集的音乐推荐。

数据集介绍

Audioscrobbler数据集是一个公开发布的数据集,读者可以在(https://github.com/libaoquan95/aasPractice/tree/master/c3/profiledata_06-May-2005)网站获取。数据集主要有三部分组成,user_artist_data.txt文件是主要的数据集文件记录了约2420条用户id、艺术家id以及用户收听艺术家歌曲的次数数据,包含141000个用户和160万个艺术家;artist_data.txt文件记录了艺术家id和对应的名字;artist_alias.txt记录了艺术家id和对应的别称id。

推荐算法介绍

由于所选取的数据集只记录了用户和歌曲之间的交互情况,除了艺术家名字之外没有其他信息。因此要找的学习算法不需要用户和艺术家的属性信息,这类算法通常被称为协同过滤。如果根据两个用户的年龄相同来判断他们可能具有相似的偏好,这不叫协同过滤。相反,根据两个用户播放过许多相同歌曲来判断他们可能都喜欢某首歌,这是协调过滤。

本篇所用的算法在数学上称为迭代最小二乘,把用户播放数据当成矩阵A,矩阵低i行第j列上的元素的值,代表用户i播放艺术家j的音乐。矩阵A是稀疏的,绝大多数元素是0,算法将A分解成两个小矩阵X和Y,既A=XYT,X代表用户特征矩阵,Y代表特征艺术家矩阵。两个矩阵的乘积当做用户-艺术家关系矩阵的估计。可以通过下边一组图直观的反映:

现在假如有5个听众,音乐有5首,那么A是一个5*5的矩阵,假如评分如下:

图2.1 用户订阅矩阵

假如d是三个属性,那么X的矩阵如下:

 

图2.2 用户-特征矩阵

Y的矩阵如下:

图2.3 特征-电影矩阵

实际的求解过程中通常先随机的固定矩阵Y,则,为提高计算效率,通常采用并行计算X的每一行,既。得到X之后,再反求出Y,不断的交替迭代,最终使得XYT与A的平方误差小于指定阈值,停止迭代,得到最终的X(代表用户特征矩阵)和Y矩阵(代表特征艺术家矩阵)。在根据最终X和Y矩阵结果,向用户进行推荐。

 

数据准备

首先将样例数据上传到HDFS,如果想要在本地测试这些功能的话,需要内存数量至少 6g, 当然可以通过减少数据量来达到通用的测试。

object RunRecommender {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf();
    conf.setMaster("local[*]")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    // Optional, but may help avoid errors due to long lineage
   // spark.sparkContext.setCheckpointDir("hdfs:///tmp/")
    spark.sparkContext.setCheckpointDir("d:///tmp/")

    //val base = "hdfs:///user/ds/"
    val base =  "E:/newcode/spark/aas/data/";
    val rawUserArtistData = spark.read.textFile(base + "user_artist_data.txt")
    val rawArtistData = spark.read.textFile(base + "artist_data.txt")
    val rawArtistAlias = spark.read.textFile(base + "artist_alias.txt")

    val runRecommender = new RunRecommender(spark)
    runRecommender.preparation(rawUserArtistData, rawArtistData, rawArtistAlias)
    runRecommender.model(rawUserArtistData, rawArtistData, rawArtistAlias)
    runRecommender.evaluate(rawUserArtistData, rawArtistAlias)
    runRecommender.recommend(rawUserArtistData, rawArtistData, rawArtistAlias)
  }

}


def preparation(
    rawUserArtistData: Dataset[String],
    rawArtistData: Dataset[String],
    rawArtistAlias: Dataset[String]): Unit = {

  rawUserArtistData.take(5).foreach(println)

  val userArtistDF = rawUserArtistData.map { line =>
    val Array(user, artist, _*) = line.split(' ')
    (user.toInt, artist.toInt)
  }.toDF("user", "artist")

  userArtistDF.agg(min("user"), max("user"), min("artist"), max("artist")).show()

  val artistByID = buildArtistByID(rawArtistData)
  val artistAlias = buildArtistAlias(rawArtistAlias)

  val (badID, goodID) = artistAlias.head
  artistByID.filter($"id" isin (badID, goodID)).show()
}

/**
  * 过滤无效的用户艺术家ID和名字行,将格式不正确的数据行剔除掉。
  * @param rawArtistData
  * @return
  */
def buildArtistByID(rawArtistData: Dataset[String]): DataFrame = {
  rawArtistData.flatMap { line =>
    val (id, name) = line.span(_ != '\t')
    if (name.isEmpty) {
      None
    } else {
      try {
        Some((id.toInt, name.trim))
      } catch {
        case _: NumberFormatException => None
      }
    }
  }.toDF("id", "name")
}

/**
  * 过滤艺术家id和对应的别名id,将格式拼写错误的行剔除掉。
  * @param rawArtistAlias
  * @return
  */
def buildArtistAlias(rawArtistAlias: Dataset[String]): Map[Int,Int] = {
  rawArtistAlias.flatMap { line =>
    val Array(artist, alias) = line.split('\t')
    if (artist.isEmpty) {
      None
    } else {
      Some((artist.toInt, alias.toInt))
    }
  }.collect().toMap
}

代码中模型训练好之后,预测了用户 2093760 的推荐结果,我测试结果如下,由于里面代码使用了随机生成初始矩阵,每个人的结果都有可能不一样。

Some((2814,50 Cent))
Some((829,Nas))
Some((1003249,Ludacris))
Some((1001819,2Pac))
Some((1300642,The Game))

代码中也给出了该用户以前听过的艺术家的名字如下:

Some((1180,David Gray))
Some((378,Blackalicious))
Some((813,Jurassic 5))
Some((1255340,The Saw Doctors))
Some((942,Xzibit))

模型评价

auc评价方法

def areaUnderCurve(
    positiveData: DataFrame,
    bAllArtistIDs: Broadcast[Array[Int]],
    predictFunction: (DataFrame => DataFrame)): Double = {

  // What this actually computes is AUC, per user. The result is actually something
  // that might be called "mean AUC".

  // Take held-out data as the "positive".
  // Make predictions for each of them, including a numeric score
  val positivePredictions = predictFunction(positiveData.select("user", "artist")).
    withColumnRenamed("prediction", "positivePrediction")

  // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
  // small AUC problems, and it would be inefficient, when a direct computation is available.

  // Create a set of "negative" products for each user. These are randomly chosen
  // from among all of the other artists, excluding those that are "positive" for the user.
  val negativeData = positiveData.select("user", "artist").as[(Int,Int)].
    groupByKey { case (user, _) => user }.
    flatMapGroups { case (userID, userIDAndPosArtistIDs) =>
      val random = new Random()
      val posItemIDSet = userIDAndPosArtistIDs.map { case (_, artist) => artist }.toSet
      val negative = new ArrayBuffer[Int]()
      val allArtistIDs = bAllArtistIDs.value
      var i = 0
      // Make at most one pass over all artists to avoid an infinite loop.
      // Also stop when number of negative equals positive set size
      while (i < allArtistIDs.length && negative.size < posItemIDSet.size) {
        val artistID = allArtistIDs(random.nextInt(allArtistIDs.length))
        // Only add new distinct IDs
        if (!posItemIDSet.contains(artistID)) {
          negative += artistID
        }
        i += 1
      }
      // Return the set with user ID added back
      negative.map(artistID => (userID, artistID))
    }.toDF("user", "artist")

  // Make predictions on the rest:
  val negativePredictions = predictFunction(negativeData).
    withColumnRenamed("prediction", "negativePrediction")

  // Join positive predictions to negative predictions by user, only.
  // This will result in a row for every possible pairing of positive and negative
  // predictions within each user.
  val joinedPredictions = positivePredictions.join(negativePredictions, "user").
    select("user", "positivePrediction", "negativePrediction").cache()

  // Count the number of pairs per user
  val allCounts = joinedPredictions.
    groupBy("user").agg(count(lit("1")).as("total")).
    select("user", "total")
  // Count the number of correctly ordered pairs per user
  val correctCounts = joinedPredictions.
    filter($"positivePrediction" > $"negativePrediction").
    groupBy("user").agg(count("user").as("correct")).
    select("user", "correct")

  // Combine these, compute their ratio, and average over all users
  val meanAUC = allCounts.join(correctCounts, Seq("user"), "left_outer").
    select($"user", (coalesce($"correct", lit(0)) / $"total").as("auc")).
    agg(mean("auc")).
    as[Double].first()

  joinedPredictions.unpersist()

  meanAUC
}

完整代码下载:RunRecommender.scala


						
作者 east
Hadoop 12月 29,2018

hadoop权威指南第四版中英文代码资料合集一键下载

Hadoop权威指南从Hadoop的缘起开始,由浅入深,结合理论和实践,全方位地介绍Hadoop这一高性能处理海量数据集的理想工具。涉及的主题包括:Haddoop简介;MapReduce简介:Hadoop分布式文件系统;Hadoop的I/O、MapReduee应用程序开发:MapReduee的工作机制;MapReduee的类型和格式;MapReduce的特性;如何安装Hadoop集群,如何管理Hadoop:Pig简介;Hbase简介;ZooKeeper简介,最后还提供了丰富的案例分析。

hadoop权威指南第四版英文原版、中文版以及配套的代码,内容以Hadoop2.x为主,包含一些hadoop的stable版本的新特性,与第三版本比较增加了介绍YARN , Parquet , Flume, Crunch , Spark的章节。

百度网盘下载地址:

链接:https://pan.baidu.com/s/19yvs8qUSR-0K2tOwDbQ4yQ
提取码:vsy4

作者 east
Spark 12月 27,2018

Spark集群的不同启动模式

1、Yarn-Client
spark-submit \
–master spark://SparkMaster:7077 \
–executor-memory 1g \
–total-executor-cores 2 \
–class MySpark \
MyFirstProject.jar
yarn-client可以在终端看到交互结果
2、 Yarn-Cluster
spark-submit \
–master yarn-cluster \
–executor-memory 1g \
–total-executor-cores 2 \
–class MySpark \
MyFirstProject.jar
yarn-cluster不能在终端看到结果,要在http://sparkmaster:8088/cluster,点左侧的FINISHED,找到执行的ApplicationID -> Logs -> stdout

作者 east
运维 12月 20,2018

备份mysql数据库到远程机器

本网站文章《定时自动备份mysql数据库并删除过期备份》

介绍的备份mysql方法,这样的做法还不够安全,万一本地磁盘坏了,造成的损失将无法弥补。

远程手动备份数据费时费力且不及时。最好的方法就是通过脚本实现远程自动互备。但远程无论是通过SSH登陆,还是通过scp拷贝文件都需要输入密码。为了克服这个问题,首先需要实现不需要密码的SSH登陆,这样就可以使用 rsync,scp,rexec等命令来做的远程备份了。

前提:本地服务器:A, 远程服务器:B

生成密钥对
假设A,B两服务器,现在需要在A机上用root登陆B机,而不需要输入密码。那我们可按照下面的步骤来做:

在本地服务器A上生成rsa证书
在本地服务器A上生成rsa证书,运行命令:

ssh-keygen -t rsa
cp生成rsa公钥证书到远程服务器B
使用scp命令进行远程复制,将A机生成的id_rsa.pub.A拷贝到远程服务器B的/root/.ssh目录下

scp /root/.ssh/id_rsa.pub.A root@远程服务器ip:/root/.ssh/

这里使用scp命令需要输入密码,当我们把下面的第三步执行完毕之后,以后本地服务器A使用scp命令复制文件到远程服务器B的话,就不需要再次输入密码。

密钥配对
1、 创建authorized_keys文件
当上面将服务器A上的id_rsa.pub.A 文件copy到了服务器B后,现在我们在 B 的/root/.ssh下创建authorized_keys文件,使用如下命令

touch authorized_keys

2、 将id_rsa.pub.A文件内容追加到authorized_keys 文件中
通过 cat 命令 把id_rsa.pub.A 追写到 authorized_keys 文件中,命令依次如下:

cat id_rsa.pub.A >> authorized_keys

3、修改authorized_keys文件的权限
执行如下命令,修改authorized_keys文件的权限

chmod 400 authorized_keys

authorized_keys文件的权限很重要,如果设置为777,那么登录的时候,还是需要提供密码的。

4、 测试
测试服务器A使用scp命令复制文件到服务器B是否还需要密码

在服务A上,再次使用刚才的命令,发现已经可以不需要输入密码。

最后修改mysql_backup.sh的脚本,达到自动备份数据库到远程服务器。

最终脚本如下:

#备份文件后缀时间
time=_` date +%Y_%m_%d_%H_%M_%S `
#需要备份的数据库名称
db_name=user1
#mysql 用户名
db_user=root
#mysql 密码
db_pass=123456
# 远程备份服务器 gitlab备份文件存放路径
RemoteBackDir=/home/rangfeiBackup
# 远程备份服务器 登录账户
RemoteUser=root
# 远程备份服务器 IP地址
RemoteIP=120.79.28.12
#本地备份路径
localBackDir=$backupdir/$time.sql.gz
#mysqldump命令使用绝对路径
mysqldump -u $db_user -p$db_pass $db_name | gzip > $localBackDir
scp $localBackDir $RemoteUser@$RemoteIP:$RemoteBackDir
#删除7天之前的备份文件
find $backupdir -name “*.sql.gz” -type f -mtime +7 -exec rm -rf {} \; > /dev/null 2>&1

作者 east
Spark 12月 17,2018

spark实例9:Spark Streaming小例子

在服务端安装nc

yum install nmap-ncat.x86_64

并启动

nc -lk 9999

 

客户端代码如下:

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver

/**
 * Custom Receiver that receives data over a socket. Received bytes are interpreted as
 * text and \n delimited lines are considered as records. They are then counted and printed.
 *
 * To run this on your local machine, you need to first run a Netcat server
 *    `$ nc -lk 9999`
 * and then run the example
 *    `$ bin/run-example org.apache.spark.examples.streaming.CustomReceiver localhost 9999`
 */
object CustomReceiver {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: CustomReceiver <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("CustomReceiver").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create an input stream with the custom receiver on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    val lines = ssc.receiverStream(new CustomReceiver(args(0), args(1).toInt))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}


class CustomReceiver(host: String, port: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
   // There is nothing much to do as the thread calling receive()
   // is designed to stop by itself isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
   var socket: Socket = null
   var userInput: String = null
   try {
     logInfo(s"Connecting to $host : $port")
     socket = new Socket(host, port)
     logInfo(s"Connected to $host : $port")
     val reader = new BufferedReader(
       new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
     userInput = reader.readLine()
     while(!isStopped && userInput != null) {
       store(userInput)
       userInput = reader.readLine()
     }
     reader.close()
     socket.close()
     logInfo("Stopped receiving")
     restart("Trying to connect again")
   } catch {
     case e: java.net.ConnectException =>
       restart(s"Error connecting to $host : $port", e)
     case t: Throwable =>
       restart("Error receiving data", t)
   }
  }
}
作者 east
运维 12月 17,2018

定时自动备份mysql数据库并删除过期备份

在实际项目中,遇到有客户机房断电导致数据库数据丢失的问题,又因为备份容灾不及时,导致部分数据恢复不了。使用Linux的自动定时任务命令crontab对数据库进行定时备份可以减少这种情况发生。

 

新建备份文件并赋予可以执行的权限

mkdir -p /home/mysql_backup/
touch /home/mysql_backup/mysql_backup.sh
chmod 551 /home/mysql_backup/mysql_backup.sh

编辑/home/mysql_backup/mysql_backup.sh

vim /home/mysql_backup/mysql_backup.sh

写入以下内容

# Name:mysql_backup.sh
# This is a ShellScript For Auto DB Backup and Delete old Backup
#备份地址
backupdir=/home/mysql_backup
#备份文件后缀时间
time=_` date +%Y_%m_%d_%H_%M_%S `
#需要备份的数据库名称
db_name=test
#mysql 用户名
#db_user=
#mysql 密码
#db_pass=
#mysqldump命令使用绝对路径
mysqldump --all-databases -u $db_user -p$db_pass | gzip > $backupdir/$time.sql.gz
#删除7天之前的备份文件
find $backupdir -name $db_name"*.sql.gz" -type f -mtime +7 -exec rm -rf {} \; > /dev/null 2>&1

编辑crontab

vim /etc/crontab

在最后一行加入

* */1 * * * root /home/mysql_backup/mysql_backup.sh

重启crontab

service crond restart
作者 east
Hadoop 12月 14,2018

hadoop实例4:分析网站日志

public class LogCleanJob extends Configured implements Tool {

public static void main(String[] args) {
Configuration conf = new Configuration();
try {
int res = ToolRunner.run(conf, new LogCleanJob(), args);
System.exit(res);
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
//设置mapper的配置,既就是hadoop/conf/mapred-site.xml的配置信息
conf.set(“mapred.job.tracker”, “hadoop01:9001”);
final Job job = new Job(new Configuration(),
LogCleanJob.class.getSimpleName());
// 设置为可以打包运行
job.setJarByClass(LogCleanJob.class);
// FileInputFormat.setInputPaths(job, args[0]);
FileInputFormat.addInputPath(job, new Path(“hdfs://localhost:9000/user/logclean”));
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
/*
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 清理已存在的输出文件
FileSystem fs = FileSystem.get(new URI(args[0]), getConf());
Path outPath = new Path(args[1]);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}*/

Path outputPath = new Path(“hdfs://localhost:9000/user/logcleanOutput”);
FileSystem fs = outputPath.getFileSystem(conf);
if(fs.exists(outputPath)){
fs.delete(outputPath, true);
}

FileOutputFormat.setOutputPath(job, outputPath);

boolean success = job.waitForCompletion(true);
if (success) {
System.out.println(“Clean process success!”);
} else {
System.out.println(“Clean process failed!”);
}
return 0;
}

// 静态内部类
static class MyMapper extends Mapper {
LogParser logParser = new LogParser();
Text outputValue = new Text();

protected void map(LongWritable key, Text value,
org.apache.hadoop.mapreduce.Mapper.Context context)
throws java.io.IOException, InterruptedException {
final String[] parsed = logParser.parse(value.toString());

// step1.过滤掉静态资源访问请求
if (parsed[2].startsWith(“GET /static/”)
|| parsed[2].startsWith(“GET /uc_server”)) {
return;
}
// step2.过滤掉开头的指定字符串
if (parsed[2].startsWith(“GET /”)) {
parsed[2] = parsed[2].substring(“GET /”.length());
} else if (parsed[2].startsWith(“POST /”)) {
parsed[2] = parsed[2].substring(“POST /”.length());
}
// step3.过滤掉结尾的特定字符串
if (parsed[2].endsWith(” HTTP/1.1″)) {
parsed[2] = parsed[2].substring(0, parsed[2].length()
– ” HTTP/1.1″.length());
}
// step4.只写入前三个记录类型项
outputValue.set(parsed[0] + “\t” + parsed[1] + “\t” + parsed[2]);
context.write(key, outputValue);
}
}

// 静态内部类
static class MyReducer extends Reducer {
protected void reduce(LongWritable k2, java.lang.Iterable<Text> v2s,
org.apache.hadoop.mapreduce.Reducer.Context context)
throws java.io.IOException, InterruptedException {
for (Text v2 : v2s) {
context.write(v2, NullWritable.get());
}
};
}

/*
* 日志解析类 静态内部类
*/
static class LogParser {
public static final SimpleDateFormat FORMAT = new SimpleDateFormat(
“d/MMM/yyyy:HH:mm:ss”, Locale.ENGLISH);
public static final SimpleDateFormat dateformat1 = new SimpleDateFormat(
“yyyyMMddHHmmss”);

public static void main(String[] args) throws ParseException {
final String S1 = “27.19.74.143 – – [30/May/2013:17:38:20 +0800] \”GET /static/image/common/faq.gif HTTP/1.1\” 200 1127″;
LogParser parser = new LogParser();
final String[] array = parser.parse(S1);
System.out.println(“样例数据: ” + S1);
System.out.format(
“解析结果: ip=%s, time=%s, url=%s, status=%s, traffic=%s”,
array[0], array[1], array[2], array[3], array[4]);
}

/**
* 解析英文时间字符串
*
* @param string
* @return
* @throws ParseException
*/
private Date parseDateFormat(String string) {
Date parse = null;
try {
parse = FORMAT.parse(string);
} catch (ParseException e) {
e.printStackTrace();
}
return parse;
}

/**
* 解析日志的行记录
*
* @param line
* @return 数组含有5个元素,分别是ip、时间、url、状态、流量
*/
public String[] parse(String line) {
String ip = parseIP(line);
String time = parseTime(line);
String url = parseURL(line);
String status = parseStatus(line);
String traffic = parseTraffic(line);
return new String[] { ip, time, url, status, traffic };
}

private String parseTraffic(String line) {
final String trim = line.substring(line.lastIndexOf(“\””) + 1)
.trim();
String traffic = trim.split(” “)[1];
return traffic;
}

private String parseStatus(String line) {
final String trim = line.substring(line.lastIndexOf(“\””) + 1)
.trim();
String status = trim.split(” “)[0];
return status;
}

private String parseURL(String line) {
final int first = line.indexOf(“\””);
final int last = line.lastIndexOf(“\””);
String url = line.substring(first + 1, last);
return url;
}

private String parseTime(String line) {
final int first = line.indexOf(“[“);
final int last = line.indexOf(“+0800]”);
String time = line.substring(first + 1, last).trim();
Date date = parseDateFormat(time);
return dateformat1.format(date);
}

private String parseIP(String line) {
String ip = line.split(“- -“)[0].trim();
return ip;
}
}
}

作者 east
Hadoop 12月 14,2018

hadoop实例3:合并相同数据

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
/**
* 需要把相同订单id的记录放在一个文件中,并以订单id命名
* @author Administrator
*
*/
public class MultipleOutputTest {

static class SortMapper extends
Mapper<LongWritable, Text, Text, Text> {

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
try {
System.out.println(“Before Mapper: ” + value + “, ” + value);
String line = value.toString();
String[] fields = line.split(“,”);
System.out.println(“fields[0]: ” + fields[0] + “, fields[2]=”
+ Double.parseDouble(fields[2]));
context.write(new Text(fields[0]), value);
System.out.println(“After Mapper: “);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}

static class SortReducer extends Reducer<Text, Text, NullWritable, Text> {
private MultipleOutputs<NullWritable, Text> multipleOutputs;

@Override
protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
}

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
System.out.print(“Before Reduce: ” + key + “, ” + key);
for (Text value : values) {
multipleOutputs.write(NullWritable.get(), value, key.toString());
}
}

@Override
protected void cleanup(
org.apache.hadoop.mapreduce.Reducer.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
multipleOutputs.close();
}

// 主函数
public static void main(String[] args) throws Exception {
// 获取配置参数
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
// 检查命令语法
if (otherArgs.length != 2) {
System.err.println(“Usage: Dedup <in> <out>”);
System.exit(2);
}
// 定义作业对象
Job job = new Job(conf, “multiple”);
// 注册分布式类
job.setJarByClass(MultipleOutputTest.class);
// 注册输出格式类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 注册Mapper类
job.setMapperClass(SortMapper.class);
// 注册Reducer类
job.setReducerClass(SortReducer.class);
// 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// 运行程序
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

}

作者 east
Hadoop 12月 14,2018

hadoop实例2:分析学生成绩(最高、最低、平均分)

输入数据:

computer,huangxiaoming,85
computer,xuzheng,54
computer,huangbo,86
computer,liutao,85
computer,huanglei,99
computer,liujialing,85
computer,liuyifei,75
computer,huangdatou,48
computer,huangjiaju,88
computer,huangzitao,85
english,zhaobenshan,57
english,liuyifei,85
english,liuyifei,76
english,huangdatou,48
english,zhouqi,85
english,huangbo,85
english,huangxiaoming,96
english,huanglei,85
english,liujialing,75
algorithm,liuyifei,75
algorithm,huanglei,76
algorithm,huangjiaju,85
algorithm,liutao,85
algorithm,huangdou,42
algorithm,huangzitao,81
math,wangbaoqiang,85
math,huanglei,76
math,huangjiaju,85
math,liutao,48
math,xuzheng,54
math,huangxiaoming,85
math,liujialing,85

 

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class StudentBean implements Writable{
private String course;
private long maxScore;
private long minScore;
private long avgScore;
private long score;

public StudentBean(){

}

public StudentBean(String course, long score){
this.course = course;
this.score = score;
}

public String getCourse() {
return course;
}

public void setCourse(String course) {
this.course = course;
}

public long getMaxScore() {
return maxScore;
}

public void setMaxScore(long maxScore) {
this.maxScore = maxScore;
}

public long getMinScore() {
return minScore;
}

public void setMinScore(long minScore) {
this.minScore = minScore;
}

public long getAvgScore() {
return avgScore;
}

public void setAvgScore(long avgScore) {
this.avgScore = avgScore;
}

public long getScore() {
return score;
}

public void setScore(long score) {
this.score = score;
}

@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
try{

// course = in.readUTF();
/*maxScore = in.readLong();
minScore = in.readLong();
avgScore = in.readLong();*/
score = in.readLong();
}catch(Exception ex){
ex.printStackTrace();
}

}

@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
//out.writeBytes(course);
// out.writeLong(maxScore);
// out.writeLong(minScore);
out.writeLong(score);
}

@Override
public String toString() {
// TODO Auto-generated method stub
return “\tmax=” + maxScore + “\tmin=” + minScore + “\tavg=” + avgScore;
}
}

 

import java.io.IOException;
/**
* 需要统计手机用户流量日志,日志内容实例:
要把同一个用户的上行流量、下行流量进行累加,并计算出综合。
*/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import test.Dedup;
import test.Dedup.Map;
import test.Dedup.Reduce;

public class StudentScore {

static class FlowCountMapper extends
Mapper<LongWritable, Text, Text, StudentBean> {

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
try {
String line = value.toString();
if (null != line && line.length() > 0) {
String[] fields = line.split(“,”);
String course = fields[0];
long score = Long.parseLong(fields[2]);
System.out.println(“map: course=” + course + ” score=”
+ score);
context.write(new Text(fields[0]), new StudentBean(course,
score));
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}

static class FlowCountReducer extends
Reducer<Text, StudentBean, Text, StudentBean> {

@Override
protected void reduce(
Text key,
Iterable<StudentBean> values,
org.apache.hadoop.mapreduce.Reducer<Text, StudentBean, Text, StudentBean>.Context context)
throws IOException, InterruptedException {
try {
// TODO Auto-generated method stub
long sum_score = 0;
long maxScore = Integer.MIN_VALUE;
long minScore = Integer.MAX_VALUE;
int index = 0;
for (StudentBean bean : values) {
System.out.println(“reduce: score=” + bean.getScore());
sum_score += bean.getScore();
index++;
if (maxScore < bean.getScore()) {
maxScore = bean.getScore();
}
if (minScore > bean.getScore()) {
minScore = bean.getScore();
}
}
System.out.println(“reduce: maxScore=” + maxScore
+ ” minScore=” + minScore);
StudentBean resultBean = new StudentBean();
resultBean.setAvgScore(sum_score / index);
resultBean.setMaxScore(maxScore);
resultBean.setMinScore(minScore);
context.write(key, resultBean);
} catch (Exception ex) {
ex.printStackTrace();
}
}

// 主函数
public static void main(String[] args) throws Exception {
// 获取配置参数
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
// 检查命令语法
if (otherArgs.length != 2) {
System.err.println(“Usage: Dedup <in> <out>”);
System.exit(2);
}
// 定义作业对象
Job job = new Job(conf, “FlowCount”);
// 注册分布式类
job.setJarByClass(StudentScore.class);
// 注册Mapper类
job.setMapperClass(FlowCountMapper.class);
// 注册Reducer类
job.setReducerClass(FlowCountReducer.class);
// 指定我们自定义的分区器
// job.setPartitionerClass(ProvincePartitioner.class);
// 同时指定相应分区数量的reducetask
// job.setNumReduceTasks(5);
// 注册输出格式类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(StudentBean.class);
// 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// 运行程序
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

}

 

作者 east
Hadoop 12月 14,2018

hadoop实例1:采集的气象数据分析最高气温

输入数据:

2014010114
2014010216
2014010317
2014010410
2014010506
2012010609
2012010732
2012010812
2012010919
2012011023
2001010116
2001010212
2001010310
2001010411
2001010529
2013010619
2013010722
2013010812
2013010929
2013011023
2008010105
2008010216
2008010337
2008010414
2008010516
2007010619
2007010712
2007010812
2007010999
2007011023
2010010114
2010010216
2010010317
2010010410
2010010506
2015010649
2015010722
2015010812
2015010999
2015011023

 

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;

public class Temperature {
/**
* 四个泛型类型分别代表:
* KeyIn Mapper的输入数据的Key,这里是每行文字的起始位置(0,11,…)
* ValueIn Mapper的输入数据的Value,这里是每行文字
* KeyOut Mapper的输出数据的Key,这里是每行文字中的“年份”
* ValueOut Mapper的输出数据的Value,这里是每行文字中的“气温”
*/
static class TempMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
// 打印样本: Before Mapper: 0, 2000010115
System.out.println(“Before Mapper: ” + key + “, ” + value);
if(null == value || value.getLength() == 0){
return;
}
String line = value.toString();
String year = line.substring(0, 4);
int temperature = Integer.parseInt(line.substring(8));
context.write(new Text(year), new IntWritable(temperature));
// 打印样本: After Mapper:2000, 15
System.out.println(
“======”+
“After Mapper:” + new Text(year) + “, ” + new IntWritable(temperature));

}
}

/**
* 四个泛型类型分别代表:
* KeyIn Reducer的输入数据的Key,这里是每行文字中的“年份”
* ValueIn Reducer的输入数据的Value,这里是每行文字中的“气温”
* KeyOut Reducer的输出数据的Key,这里是不重复的“年份”
* ValueOut Reducer的输出数据的Value,这里是这一年中的“最高气温”
*/
static class TempReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
StringBuffer sb = new StringBuffer();
for(IntWritable value: values){
maxValue = Math.max(maxValue, value.get());
sb.append(value).append(“, “);
}
// 打印样本: Before Reduce: 2000, 15, 23, 99, 12, 22,
System.out.print(“Before Reduce: ” + key + “, ” + sb.toString());
context.write(key,new IntWritable(maxValue));
// 打印样本: After Reduce: 2000, 99
System.out.println(
“======”+
“After Reduce: ” + key + “, ” + maxValue);
}
}

public static void main(String[] args) throws Exception {
Configuration hadoopConfig = new Configuration();
String[] otherArgs = new GenericOptionsParser(hadoopConfig, args).getRemainingArgs();
// 检查命令语法
if(otherArgs.length != 2){
System.err.println(“Usage: Dedup <in> <out>”);
System.exit(2);
}
//输入路径
// String dst = “hdfs://localhost:9000/user/tminput/”;
//输出路径,必须是不存在的,空文件加也不行。
// String dstOut = “hdfs://localhost:9000/user/tmoutput/”;

/* hadoopConfig.set(“fs.hdfs.impl”,
org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
);
hadoopConfig.set(“fs.file.impl”,
org.apache.hadoop.fs.LocalFileSystem.class.getName()
);*/
Job job = new Job(hadoopConfig, “Temperature”);
//如果需要打成jar运行,需要下面这句
job.setJarByClass(Temperature.class);

FileInputFormat.setInputPaths(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));

//指定自定义的Mapper和Reducer作为两个阶段的任务处理类
job.setMapperClass(TempMapper.class);
job.setReducerClass(TempReducer.class);
//job执行作业时输入和输出文件的路径

//设置最后输出结果的Key和Value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//执行job,直到完成
job.waitForCompletion(true);
System.out.println(“Finished”);
}

}

 

作者 east

上一 1 … 39 40 41 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?

文章归档

  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (491)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (7)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.