gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具

分类归档大数据开发

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 分类归档: "大数据开发"
  • ( 页面34 )
Flink 10月 26,2020

Flink 向Kafka生产并消费数据程序

场景说明

假定某个Flink业务每秒就会收到1个消息记录。

基于某些业务要求,开发的Flink应用程序实现功能:实时输出带有前缀的消息内容。

数据规划

Flink样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户),并从Kafka组件接收数据。

  1. 确保集群安装完成,包括HDFS、Yarn、Flink和Kafka。
  2. 创建Topic。 创建topic的命令格式: bin/kafka-topics.sh –create –zookeeper {zkQuorum}/kafka –partitions {partitionNum} –replication-factor {replicationNum} –topic {Topic} 表1 参数说明 参数名 说明 {zkQuorum} ZooKeeper集群信息,格式为IP:port。 {partitionNum} topic的分区数。 {replicationNum} topic中每个partition数据的副本数。 {Topic} topic名称。 示例:在Kafka的客户端路径下执行命令,此处以ZooKeeper集群的IP:port是10.96.101.32:24002,10.96.101.251:24002,10.96.101.177:24002,10.91.8.160:24002,Topic名称为topic1的数据为例。bin/kafka-topics.sh –create –zookeeper 10.96.101.32:24002,10.96.101.251:24002,10.96.101.177:24002,10.91.8.160:24002/kafka –partitions 5 –replication-factor 1 –topic topic1

开发思路

  1. 启动Flink Kafka Producer应用向Kafka发送数据。
  2. 启动Flink Kafka Consumer应用从Kafka接收数据,保证topic与producer一致。
  3. 在数据内容中增加前缀并进行打印。

Java样例代码

功能介绍

在Flink应用中,调用flink-connector-kafka模块的接口,生产并消费数据。

代码样例

下面列出producer和consumer主要逻辑代码作为演示。

完整代码参见com.huawei.bigdata.flink.examples.WriteIntoKafka和com.huawei.bigdata.flink.examples.ReadFromKafka

//producer代码
public class WriteIntoKafka {
  public static void main(String[] args) throws Exception {
    // 打印出执行flink run的参考命令
    System.out.println("use command as: ");
    System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
        " /opt/test.jar --topic topic-test --bootstrap.servers 10.91.8.218:21005");
    System.out.println("******************************************************************************************");
    System.out.println("<topic> is the kafka topic name");
    System.out.println("<bootstrap.servers> is the ip:port list of brokers");
    System.out.println("******************************************************************************************");

    // 构造执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 设置并发度
    env.setParallelism(1);
    // 解析运行参数
    ParameterTool paraTool = ParameterTool.fromArgs(args);
    // 构造流图,将自定义Source生成的数据写入Kafka
    DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());
    messageStream.addSink(new FlinkKafkaProducer010<>(paraTool.get("topic"),
        new SimpleStringSchema(),
        paraTool.getProperties()));
    // 调用execute触发执行
    env.execute();
  }

  // 自定义Source,每隔1s持续产生消息
  public static class SimpleStringGenerator implements SourceFunction<String> {
    private static final long serialVersionUID = 2174904787118597072L;
    boolean running = true;
    long i = 0;

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
      while (running) {
        ctx.collect("element-" + (i++));
        Thread.sleep(1000);
      }
    }

    @Override
    public void cancel() {
      running = false;
    }
  }
}

//consumer代码
public class ReadFromKafka {
  public static void main(String[] args) throws Exception {
    // 打印出执行flink run的参考命令
    System.out.println("use command as: ");
    System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +
        " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005");
    System.out.println("******************************************************************************************");
    System.out.println("<topic> is the kafka topic name");
    System.out.println("<bootstrap.servers> is the ip:port list of brokers");
    System.out.println("******************************************************************************************");

    // 构造执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 设置并发度
    env.setParallelism(1);
    // 解析运行参数
    ParameterTool paraTool = ParameterTool.fromArgs(args);
    // 构造流图,从Kafka读取数据并换行打印
    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<>(paraTool.get("topic"),
        new SimpleStringSchema(),
        paraTool.getProperties()));
    messageStream.rebalance().map(new MapFunction<String, String>() {
      @Override
      public String map(String s) throws Exception {
        return "Flink says " + s + System.getProperty("line.separator");
      }
    }).print();
    // 调用execute触发执行
    env.execute();
  }
}

Scala样例代码

功能介绍

在Flink应用中,调用flink-connector-kafka模块的接口,生产并消费数据。

代码样例

下面列出producer和consumer主要逻辑代码作为演示。 完整代码参见com.huawei.bigdata.flink.examples.WriteIntoKafka和com.huawei.bigdata.flink.examples.ReadFromKafka

//producer代码
object WriteIntoKafka {
  def main(args: Array[String]) {
    // 打印出执行flink run的参考命令
    System.out.println("use command as: ")
    System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
      " /opt/test.jar --topic topic-test --bootstrap.servers 10.91.8.218:21005")
    System.out.println("******************************************************************************************")
    System.out.println("<topic> is the kafka topic name")
    System.out.println("<bootstrap.servers> is the ip:port list of brokers")
    System.out.println("******************************************************************************************")

    // 构造执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置并发度
    env.setParallelism(1)
    // 解析运行参数
    val paraTool = ParameterTool.fromArgs(args)
    // 构造流图,将自定义Source生成的数据写入Kafka
    val messageStream: DataStream[String] = env.addSource(new SimpleStringGenerator)
    messageStream.addSink(new FlinkKafkaProducer010(
      paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties))
    // 调用execute触发执行
    env.execute
  }
}

// 自定义Source,每隔1s持续产生消息
class SimpleStringGenerator extends SourceFunction[String] {
  var running = true
  var i = 0

  override def run(ctx: SourceContext[String]) {
    while (running) {
      ctx.collect("element-" + i)
      i += 1
      Thread.sleep(1000)
    }
  }

  override def cancel() {
    running = false
  }
}

//consumer代码
object ReadFromKafka {
  def main(args: Array[String]) {
    // 打印出执行flink run的参考命令
    System.out.println("use command as: ")
    System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +
      " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005")
    System.out.println("******************************************************************************************")
    System.out.println("<topic> is the kafka topic name")
    System.out.println("<bootstrap.servers> is the ip:port list of brokers")
    System.out.println("******************************************************************************************")

    // 构造执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置并发度
    env.setParallelism(1)
    // 解析运行参数
    val paraTool = ParameterTool.fromArgs(args)
    // 构造流图,从Kafka读取数据并换行打印
    val messageStream = env.addSource(new FlinkKafkaConsumer010(
      paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties))
    messageStream
      .map(s => "Flink says " + s + System.getProperty("line.separator")).print()
    // 调用execute触发执行
    env.execute()
  }
}
作者 east
Flink 10月 26,2020

Spark Streaming从Kafka读取数据再写入HBase 实例

Java样例代码

功能介绍

在Spark应用中,通过使用Spark Streaming调用Kafka接口来获取数据,然后把数据经过分析后,找到对应的HBase表记录,再写到HBase表。

代码样例

下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase

/**  * 运行Spark Streaming任务,根据value值从hbase table1表读取数据,把两者数据做操作后,更新到hbase table1表  */ public class SparkOnStreamingToHbase {   public static void main(String[] args) throws Exception {     if (args.length < 3) {       printUsage();     }     String checkPointDir = args[0];     String topics = args[1];     final String brokers = args[2];     Duration batchDuration = Durations.seconds(5);     SparkConf sparkConf = new SparkConf().setAppName("SparkOnStreamingToHbase");     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, batchDuration);     // 设置Spark Streaming的CheckPoint目录     if (!"nocp".equals(checkPointDir)) {       jssc.checkpoint(checkPointDir);     }     final String columnFamily = "cf";     HashMap<String, String> kafkaParams = new HashMap<String, String>();     kafkaParams.put("metadata.broker.list", brokers);     String[] topicArr = topics.split(",");     Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArr));     // 通过brokers和topics直接创建kafka stream     // 接收Kafka中数据,生成相应DStream     JavaDStream<String> lines = KafkaUtils.createDirectStream(jssc, String.class, String.class,       StringDecoder.class, StringDecoder.class, kafkaParams, topicSet).map(       new Function<Tuple2<String, String>, String>() {         public String call(Tuple2<String, String> tuple2) {           // map(_._1)是消息的key, map(_._2)是消息的value           return tuple2._2();         }       }     );     lines.foreachRDD(       new Function<JavaRDD<String>, Void>() {         public Void call(JavaRDD<String> rdd) throws Exception {           rdd.foreachPartition(             new VoidFunction<Iterator<String>>() {               public void call(Iterator<String> iterator) throws Exception {                 hBaseWriter(iterator, columnFamily);               }             }           );           return null;         }       }     );     jssc.start();     jssc.awaitTermination();   }   /**    * 在executor端写入数据    * @param iterator  消息    * @param columnFamily    */   private static void hBaseWriter(Iterator<String> iterator, String columnFamily) throws IOException {     Configuration conf = HBaseConfiguration.create();     Connection connection = null;     Table table = null;     try {       connection = ConnectionFactory.createConnection(conf);       table = connection.getTable(TableName.valueOf("table1"));       List<Get> rowList = new ArrayList<Get>();       while (iterator.hasNext()) {         Get get = new Get(iterator.next().getBytes());         rowList.add(get);       }       // 获取table1的数据       Result[] resultDataBuffer = table.get(rowList);       // 设置table1的数据       List<Put> putList = new ArrayList<Put>();       for (int i = 0; i < resultDataBuffer.length; i++) {         String row = new String(rowList.get(i).getRow());         Result resultData = resultDataBuffer[i];         if (!resultData.isEmpty()) {           // 根据列簇和列,获取旧值           String aCid = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes()));           Put put = new Put(Bytes.toBytes(row));           // 计算结果           int resultValue = Integer.valueOf(row) + Integer.valueOf(aCid);           put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(String.valueOf(resultValue)));           putList.add(put);         }       }       if (putList.size() > 0) {         table.put(putList);       }     } catch (IOException e) {       e.printStackTrace();     } finally {       if (table != null) {         try {           table.close();         } catch (IOException e) {           e.printStackTrace();         }       }       if (connection != null) {         try {           // 关闭Hbase连接.           connection.close();         } catch (IOException e) {           e.printStackTrace();         }       }     }   }     private static void printUsage() {     System.out.println("Usage: {checkPointDir} {topic} {brokerList}");     System.exit(1);   } }


Scala样例代码

功能介绍

在Spark应用中,通过使用Spark Streaming调用Kafka接口来获取数据,然后把数据经过分析后,找到对应的HBase表记录,再写到HBase表。

代码样例

下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase

/**
  * 运行Spark Streaming任务,根据value值从hbase table1表读取数据,把两者数据做操作后,更新到hbase table1表
  */
object SparkOnStreamingToHbase {
  def main(args: Array[String]) {
    if (args.length < 3) {
      printUsage
    }

    val Array(checkPointDir, topics, brokers) = args
    val sparkConf = new SparkConf().setAppName("SparkOnStreamingToHbase")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    // 设置Spark Streaming的CheckPoint目录
    if (!"nocp".equals(checkPointDir)) {
      ssc.checkpoint(checkPointDir)
    }

    val columnFamily = "cf"
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers
    )

    val topicArr = topics.split(",")
    val topicSet = topicArr.toSet
    // map(_._1)是消息的key, map(_._2)是消息的value
    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet).map(_._2)
    lines.foreachRDD(rdd => {
      //partition运行在executor上
      rdd.foreachPartition(iterator => hBaseWriter(iterator, columnFamily))
    })

    ssc.start()
    ssc.awaitTermination()
  }

  
  /**
   * 在executor端写入数据
   * @param iterator  消息
   * @param columnFamily
   */
  def hBaseWriter(iterator: Iterator[String], columnFamily: String): Unit = {
    val conf = HBaseConfiguration.create()
    var table: Table = null
    var connection: Connection = null
    try {
      connection = ConnectionFactory.createConnection(conf)
      table = connection.getTable(TableName.valueOf("table1"))
      val iteratorArray = iterator.toArray
      val rowList = new util.ArrayList[Get]()
      for (row <- iteratorArray) {
        val get = new Get(row.getBytes)
        rowList.add(get)
      }
      // 获取table1的数据
      val resultDataBuffer = table.get(rowList)
      // 设置table1的数据
      val putList = new util.ArrayList[Put]()
      for (i <- 0 until iteratorArray.size) {
        val row = iteratorArray(i)
        val resultData = resultDataBuffer(i)
        if (!resultData.isEmpty) {
          // 根据列簇和列,获取旧值
          val aCid = Bytes.toString(resultData.getValue(columnFamily.getBytes, "cid".getBytes))
          val put = new Put(Bytes.toBytes(row))
          // 计算结果
          val resultValue = row.toInt + aCid.toInt
          put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(resultValue.toString))
          putList.add(put)
        }
      }
      if (putList.size() > 0) {
        table.put(putList)
      }
    } catch {
      case e: IOException =>
        e.printStackTrace();
    } finally {
      if (table != null) {
        try {
          table.close()
        } catch {
          case e: IOException =>
            e.printStackTrace();
        }
      }
      if (connection != null) {
        try {
          // 关闭Hbase连接.
          connection.close()
        } catch {
          case e: IOException =>
            e.printStackTrace()
        }
      }
    }
  }
  

  private def printUsage {
    System.out.println("Usage: {checkPointDir} {topic} {brokerList}")
    System.exit(1)
  }
}
作者 east
Hbase 10月 26,2020

Hbase使用过滤器Filter例子

使用过滤器Filter

功能简介

HBase Filter主要在Scan和Get过程中进行数据过滤,通过设置一些过滤条件来实现,如设置RowKey、列名或者列值的过滤条件。

代码样例

以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testSingleColumnValueFilter方法中。

public void testSingleColumnValueFilter() {    
LOG.info("Entering testSingleColumnValueFilter.");
Table table = null;
ResultScanner rScanner = null;
try {
table = conn.getTable(tableName);
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
// Set the filter criteria.
SingleColumnValueFilter filter = new SingleColumnValueFilter( Bytes.toBytes("info"), Bytes.toBytes("name"), CompareOp.EQUAL, Bytes.toBytes("Xu Bing"));
scan.setFilter(filter);
// Submit a scan request.
rScanner = table.getScanner(scan);
// Print query results.
for (Result r = rScanner.next(); r != null; r = rScanner.next()) { for (Cell cell : r.rawCells()) { LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + "," + Bytes.toString(CellUtil.cloneQualifier(cell)) + "," + Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Single column value filter successfully."); } catch (IOException e) {
LOG.error("Single column value filter failed " ,e);
} finally {
if (rScanner != null) {
// Close the scanner object.
rScanner.close();
}
if (table != null) {
try {
// Close the HTable object.
table.close();
} catch (IOException e) {
LOG.error("Close table failed " ,e);
}
}
}
LOG.info("Exiting testSingleColumnValueFilter.");
}

注意事项

当前二级索引不支持使用SubstringComparator类定义的对象作为Filter的比较器。

例如,如下示例中的用法当前不支持:

Scan scan = new Scan();
filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filterList.addFilter(new SingleColumnValueFilter(Bytes
.toBytes(columnFamily), Bytes.toBytes(qualifier),
CompareOp.EQUAL, new SubstringComparator(substring)));
scan.setFilter(filterList);
作者 east
Hive 10月 26,2020

Hive用户自定义函数

用户自定义函数

当Hive的内置函数不能满足需要时,可以通过编写用户自定义函数UDF(User-Defined Functions)插入自己的处理代码并在查询中使用它们。

按实现方式,UDF分如下分类:

  • 普通的UDF,用于操作单个数据行,且产生一个数据行作为输出。
  • 用户定义聚集函数UDAF(User-Defined Aggregating Functions),用于接受多个输入数据行,并产生一个输出数据行。
  • 用户定义表生成函数UDTF(User-Defined Table-Generating Functions),用于操作单个输入行,产生多个输出行。

按使用方法,UDF有如下分类:

  • 临时函数,只能在当前会话使用,重启会话后需要重新创建。
  • 永久函数,可以在多个会话中使用,不需要每次创建。

下面以编写一个AddDoublesUDF为例,说明UDF的编写和使用方法:

功能介绍

AddDoublesUDF主要用来对两个及多个浮点数进行相加。在该样例中可以掌握如何编写和使用UDF。

说明:

  • 一个普通UDF必须继承自“org.apache.hadoop.hive.ql.exec.UDF”。
  • 一个普通UDF必须至少实现一个evaluate()方法,evaluate函数支持重载。
  • 开发自定义函数需要在工程中添加hive-exec-1.3.0.jar依赖包,可从hive安装目录下获取。

样例代码

以下为UDF示例代码:

package com.huawei.bigdata.hive.example.udf;
import org.apache.hadoop.hive.ql.exec.UDF;

public class AddDoublesUDF extends UDF { 
 public Double evaluate(Double... a) { 
    Double total = 0.0; 
    // 处理逻辑部分. 
    for (int i = 0; i < a.length; i++) 
      if (a[i] != null) 
        total += a[i]; 
    return total; 
  } 
} 

如何使用

  1. 把以上程序打包成AddDoublesUDF.jar,并上传到HDFS指定目录下(如“/user/hive_examples_jars/”)且创建函数的用户与使用函数的用户有该文件的可读权限。示例语句: hdfs dfs -put ./hive_examples_jars /user/hive_examples_jars hdfs dfs -chmod 777 /user/hive_examples_jars
  2. 需要使用一个具有admin权限的用户登录beeline客户端,执行如下命令: kinit Hive业务用户 beeline set role admin;
  3. 在Hive Server中定义该函数,以下语句用于创建永久函数: CREATE FUNCTION addDoubles AS ‘com.bigdata.hive.example.udf.AddDoublesUDF’ using jar ‘hdfs://hacluster/user/hive_examples_jars/AddDoublesUDF.jar’; 其中addDoubles是该函数的别名,用于SELECT查询中使用。 以下语句用于创建临时函数: CREATE TEMPORARY FUNCTION addDoubles AS ‘com.bigdata.hive.example.udf.AddDoublesUDF’ using jar ‘hdfs://hacluster/user/hive_examples_jars/AddDoublesUDF.jar’;
    • addDoubles是该函数的别名,用于SELECT查询中使用。
    • 关键字TEMPORARY说明该函数只在当前这个Hive Server的会话过程中定义使用。
  4. 在Hive Server中使用该函数,执行SQL语句: SELECT addDoubles(1,2,3); 说明: 若重新连接客户端再使用函数出现[Error 10011]的错误,可执行reload function;命令后再使用该函数。
  5. 在Hive Server中删除该函数,执行SQL语句: DROP FUNCTION addDoubles;
作者 east
Flink 10月 26,2020

flink调优经验

数据倾斜

当数据发生倾斜(某一部分数据量特别大),虽然没有GC(Gabage Collection,垃圾回收),但是task执行时间严重不一致。

  • 需要重新设计key,以更小粒度的key使得task大小合理化。
  • 修改并行度。
  • 调用rebalance操作,使数据分区均匀。

缓冲区超时设置

  • 由于task在执行过程中存在数据通过网络进行交换,数据在不同服务器之间传递的缓冲区超时时间可以通过setBufferTimeout进行设置。
  • 当设置“setBufferTimeout(-1)”,会等待缓冲区满之后才会刷新,使其达到最大吞吐量;当设置“setBufferTimeout(0)”时,可以最小化延迟,数据一旦接收到就会刷新;当设置“setBufferTimeout”大于0时,缓冲区会在该时间之后超时,然后进行缓冲区的刷新。 示例可以参考如下:env.setBufferTimeout(timeoutMillis); env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
作者 east
Spark 10月 26,2020

Spark Streaming调优经验

Spark Streaming调优

操作场景

Streaming作为一种mini-batch方式的流式处理框架,它主要的特点是:秒级时延和高吞吐量。因此Streaming调优的目标:在秒级延迟的情景下,提高Streaming的吞吐能力,在单位时间处理尽可能多的数据。

说明:

本章节适用于输入数据源为Kafka的使用场景。

操作步骤

一个简单的流处理系统由以下三部分组件组成:数据源 + 接收器 + 处理器。数据源为Kafka,接受器为Streaming中的Kafka数据源接收器,处理器为Streaming。

对Streaming调优,就必须使该三个部件的性能都最优化。

  • 数据源调优 在实际的应用场景中,数据源为了保证数据的容错性,会将数据保存在本地磁盘中,而Streaming的计算结果全部在内存中完成,数据源很有可能成为流式系统的最大瓶颈点。 对Kafka的性能调优,有以下几个点:
    • 使用Kafka-0.8.2以后版本,可以使用异步模式的新Producer接口。
    • 配置多个Broker的目录,设置多个IO线程,配置Topic合理的Partition个数。
    详情请参见Kafka开源文档中的“性能调优”部分:http://kafka.apache.org/documentation.html
  • 接收器调优 Streaming中已有多种数据源的接收器,例如Kafka、Flume、MQTT、ZeroMQ等,其中Kafka的接收器类型最多,也是最成熟一套接收器。 Kafka包括三种模式的接收器API:
    • KafkaReceiver:直接接收Kafka数据,进程异常后,可能出现数据丢失。
    • ReliableKafkaReceiver:通过ZooKeeper记录接收数据位移。
    • DirectKafka:直接通过RDD读取Kafka每个Partition中的数据,数据高可靠。
    从实现上来看,DirectKafka的性能会是最好的,实际测试上来看,DirectKafka也确实比其他两个API性能好了不少。因此推荐使用DirectKafka的API实现接收器。 数据接收器作为一个Kafka的消费者,对于它的配置优化,请参见Kafka开源文档:http://kafka.apache.org/documentation.html
  • 处理器调优 Streaming的底层由Spark执行,因此大部分对于Spark的调优措施,都可以应用在Streaming之中,例如:
    • 数据序列化
    • 配置内存
    • 设置并行度
    • 使用External Shuffle Service提升性能
    说明: 在做Spark Streaming的性能优化时需注意一点,越追求性能上的优化,Streaming整体的可靠性会越差。例如: “spark.streaming.receiver.writeAheadLog.enable”配置为“false”的时候,会明显减少磁盘的操作,提高性能,但由于缺少WAL机制,会出现异常恢复时,数据丢失。 因此,在调优Streaming的时候,这些保证数据可靠性的配置项,在生产环境中是不能关闭的。
  • 日志归档调优 参数“spark.eventLog.group.size”用来设置一个应用的JobHistory日志按照指定job个数分组,每个分组会单独创建一个文件记录日志,从而避免应用长期运行时形成单个过大日志造成JobHistory无法读取的问题,设置为“0”时表示不分组。 大部分Spark Streaming任务属于小型job,而且产生速度较快,会导致频繁的分组,产生大量日志小文件消耗磁盘I/O。建议增大此值,例如改为“1000”或更大值。
作者 east
Spark 10月 26,2020

Spark Core调优经验

使用mapPartitions,按每个分区计算结果

如果每条记录的开销太大,例:

rdd.map{x=>conn=getDBConn;conn.write(x.toString);conn.close}

则可以使用MapPartitions,按每个分区计算结果,如

rdd.mapPartitions(records => conn.getDBConn;for(item <- records)
write(item.toString); conn.close)

使用mapPartitions可以更灵活地操作数据,例如对一个很大的数据求TopN,当N不是很大时,可以先使用mapPartitions对每个partition求TopN,collect结果到本地之后再做排序取TopN。这样相比直接对全量数据做排序取TopN效率要高很多。

使用coalesce调整分片的数量

coalesce可以调整分片的数量。coalesce函数有两个参数:

coalesce(numPartitions: Int, shuffle: Boolean = false)

当shuffle为true的时候,函数作用与repartition(numPartitions: Int)相同,会将数据通过Shuffle的方式重新分区;当shuffle为false的时候,则只是简单的将父RDD的多个partition合并到同一个task进行计算,shuffle为false时,如果numPartitions大于父RDD的切片数,那么分区不会重新调整。

遇到下列场景,可选择使用coalesce算子:

  • 当之前的操作有很多filter时,使用coalesce减少空运行的任务数量。此时使用coalesce(numPartitions, false),numPartitions小于父RDD切片数。
  • 当输入切片个数太大,导致程序无法正常运行时使用。
  • 当任务数过大时候Shuffle压力太大导致程序挂住不动,或者出现linux资源受限的问题。此时需要对数据重新进行分区,使用coalesce(numPartitions, true)。

localDir配置

Spark的Shuffle过程需要写本地磁盘,Shuffle是Spark性能的瓶颈,I/O是Shuffle的瓶颈。配置多个磁盘则可以并行的把数据写入磁盘。如果节点中挂载多个磁盘,则在每个磁盘配置一个Spark的localDir,这将有效分散Shuffle文件的存放,提高磁盘I/O的效率。如果只有一个磁盘,配置了多个目录,性能提升效果不明显。

Collect小数据

大数据量不适用collect操作。

collect操作会将Executor的数据发送到Driver端,因此使用collect前需要确保Driver端内存足够,以免Driver进程发生OutOfMemory异常。当不确定数据量大小时,可使用saveAsTextFile等操作把数据写入HDFS中。只有在能够大致确定数据大小且driver内存充足的时候,才能使用collect。

使用reduceByKey

reduceByKey会在Map端做本地聚合,使得Shuffle过程更加平缓,而groupByKey等Shuffle操作不会在Map端做聚合。因此能使用reduceByKey的地方尽量使用该算子,避免出现groupByKey().map(x=>(x._1,x._2.size))这类实现方式。

广播map代替数组

当每条记录需要查表,如果是Driver端用广播方式传递的数据,数据结构优先采用set/map而不是Iterator,因为Set/Map的查询速率接近O(1),而Iterator是O(n)。

数据倾斜

当数据发生倾斜(某一部分数据量特别大),虽然没有GC(Gabage Collection,垃圾回收),但是task执行时间严重不一致。

  • 需要重新设计key,以更小粒度的key使得task大小合理化。
  • 修改并行度。

优化数据结构

  • 把数据按列存放,读取数据时就可以只扫描需要的列。
  • 使用Hash Shuffle时,通过设置spark.shuffle.consolidateFiles为true,来合并shuffle中间文件,减少shuffle文件的数量,减少文件IO操作以提升性能。最终文件数为reduce tasks数目。
作者 east
python, 人工智能, 数据挖掘 10月 8,2020

python多项式回归代码实现

多项式回归是在上文python源码实现线性回归并绘图

基础上实现的,要实现下面的多项式

可以用矩阵相乘来实现

代码如下:

import numpy as np
import matplotlib.pyplot as plt

# 读入训练数据
train = np.loadtxt('click.csv', delimiter=',', dtype='int', skiprows=1)
train_x = train[:,0]
train_y = train[:,1]

# 标准化
mu = train_x.mean()
sigma = train_x.std()
def standardize(x):
    return (x - mu) / sigma

train_z = standardize(train_x)

# 参数初始化
theta = np.random.rand(3)

# 创建训练数据的矩阵
def to_matrix(x):
    return np.vstack([np.ones(x.size), x, x ** 2]).T

X = to_matrix(train_z)

# 预测函数
def f(x):
    return np.dot(x, theta)

# 目标函数
def E(x, y):
    return 0.5 * np.sum((y - f(x)) ** 2)

# 学习率
ETA = 1e-3

# 误差的差值
diff = 1

# 更新次数
count = 0

# 直到误差的差值小于 0.01 为止,重复参数更新
error = E(X, train_y)
while diff > 1e-2:
    # 更新结果保存到临时变量
    theta = theta - ETA * np.dot(f(X) - train_y, X)

    # 计算与上一次误差的差值
    current_error = E(X, train_y)
    diff = error - current_error
    error = current_error

    # 输出日志
    count += 1
    log = '第 {} 次 : theta = {}, 差值 = {:.4f}'
    print(log.format(count, theta, diff))

# 绘图确认
x = np.linspace(-3, 3, 100)
plt.plot(train_z, train_y, 'o')
plt.plot(x, f(to_matrix(x)))
plt.show()

最后输出效果如下:

作者 east
python, 数据挖掘 10月 8,2020

python源码实现线性回归并绘图

用python实现一次函数 的线性回归。把fθ(x)作为一次函数来实现吧。我们要实现下面这样的fθ(x)和目标函数E(θ)。

要把下面的训练数据变成平均值为0、方差为1的数据。作用是参数的收敛会更快。这种做法也被称为标准化或者z-score规范化,变换表达式是这样的。µ是训练数据的平均值,σ是标准差。

参数更新表达式如下:

训练数据如下:

x	y
235	591
216	539
148	413
35	310
85	308
204	519
49	325
25	332
173	498
191	498
134	392
99	334
117	385
112	387
162	425
272	659
159	400
159	427
59	319
198	522

import numpy as np
import matplotlib.pyplot as plt

# 读入训练数据
train = np.loadtxt('click.csv', delimiter=',', dtype='int', skiprows=1)
train_x = train[:,0]
train_y = train[:,1]

# 标准化
mu = train_x.mean()
sigma = train_x.std()
def standardize(x):
    return (x - mu) / sigma

train_z = standardize(train_x)

# 参数初始化
theta0 = np.random.rand()
theta1 = np.random.rand()

# 预测函数
def f(x):
    return theta0 + theta1 * x

# 目标函数
def E(x, y):
    return 0.5 * np.sum((y - f(x)) ** 2)

# 学习率
ETA = 1e-3

# 误差的差值
diff = 1

# 更新次数
count = 0

# 直到误差的差值小于 0.01 为止,重复参数更新
error = E(train_z, train_y)
while diff > 1e-2:
    # 更新结果保存到临时变量
    tmp_theta0 = theta0 - ETA * np.sum((f(train_z) - train_y))
    tmp_theta1 = theta1 - ETA * np.sum((f(train_z) - train_y) * train_z)

    # 更新参数
    theta0 = tmp_theta0
    theta1 = tmp_theta1

    # 计算与上一次误差的差值
    current_error = E(train_z, train_y)
    diff = error - current_error
    error = current_error

    # 输出日志
    count += 1
    log = '第 {} 次 : theta0 = {:.3f}, theta1 = {:.3f}, 差值 = {:.4f}'
    print(log.format(count, theta0, theta1, diff))

# 绘图确认
x = np.linspace(-3, 3, 100)
plt.plot(train_z, train_y, 'o')
plt.plot(x, f(x))
plt.show()

最终输出图形如下:

作者 east
Kafka, Spark 9月 5,2020

Spark Streaming读取kafka直连案例

Spark Streaming读取kafka有2种方式:基于Receiver的方式和 基于Direct的方式 。 Direct 更高效, 负责追踪消费的offset 可以用redis或mysql来保存。

 
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
object DirectKafka {
def main(args: Array[String]): Unit = {
if (args.length < 2){ System.err.println( s""" |DirectKafka
| is a list of one or more kafka brokers
| is a list of one or more kafka topics
""".stripMargin)
System.exit(1)
}


/*启动zookeeper
cd /root/kafka/kafka_2.11-1.0.0/bin
./zookeeper-server-start.sh /root/kafka/kafka_2.11-1.0.0/config/zookeeper.properties

启动kafka
cd /root/kafka/kafka_2.11-1.0.0/bin
./kafka-server-start.sh /root/kafka/kafka_2.11-1.0.0/config/server.properties

启动kafka生产者
./kafka-console-producer.sh --broker-list master:9092 --topic kafka_test

提交任务
spark-submit --class com.kafka.DirectKafka \
--master spark://master:7077 \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
/root/datafile/SparkStreamingKafka-1.0-SNAPSHOT-jar-with-dependencies.jar \
master:9092 kafka_test
*/

val sparkConf = new SparkConf().setAppName("SparkStreaming-DirectKafka")
val sc = new SparkContext(sparkConf)
val Array(brokers, topics) = args
val ssc = new StreamingContext(sc, Seconds(2))
val topicset = topics.split(",").toSet
val KafkaParams = Map[String,String]("metadata.broker.list" -> brokers)
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, KafkaParams, topicset)
directKafkaStream.print()
var offsetRanges = Array.empty[OffsetRange]
directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map(._2) .flatMap(.split(" "))
.map(x => (x, 1L))
.reduceByKey(_ + _)
.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
rdd.take(10).foreach(println)
}
ssc.start()
ssc.awaitTermination()
}
}


import com.alibaba.fastjson.JSON
import main.scala.until.dataschema
import org.apache.spark.sql.SaveMode
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.DateTime
import scalikejdbc.config.{DBs, DBsWithEnv}
import scalikejdbc._
import main.scala.until.ParamsUtils
import main.scala.until.SparkUtils

object readkafka {
  def main(args: Array[String]): Unit = {
    if (args.length != 1){
      System.err.println("please input data args")
      System.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("SparkStreaming-test")
      .setMaster("local[*]")
      .set("spark.testing.memory","2147480000")
    val sc = new SparkContext(sparkConf)

//topic : spark_example_topic , countly_event ,countly_imp
//broker : 172.31.2.6:9292,172.31.2.7:9292,172.31.2.8:9292

//    val ssc = new StreamingContext(sc, Seconds(2))
    val ssc = new StreamingContext(sc, Seconds(2))

    val messages = new SparkUtils(ssc).getDirectStream(ParamsUtils.kafka.KAFKA_TOPIC)
    messages.print()

//    SparkUtils.apply(null).getDirectStream()

//-------------------------------------------------------------------------------

    // messages 从kafka获取数据,将数据转为RDD
    messages.foreachRDD((rdd, batchTime) => {
      import org.apache.spark.streaming.kafka.HasOffsetRanges
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges   // 获取偏移量信息
      /**
        * OffsetRange 是对topic name,partition id,fromOffset(当前消费的开始偏移),untilOffset(当前消费的结束偏移)的封装。
        * *  所以OffsetRange 包含信息有:topic名字,分区Id,开始偏移,结束偏移
        */
      println("===========================> count: " + rdd.map(x => x + "1").count())
     // offsetRanges.foreach(offset => println(offset.topic, offset.partition, offset.fromOffset, offset.untilOffset))
      for (offset <- offsetRanges) {
        // 遍历offsetRanges,里面有多个partition
        println(offset.topic, offset.partition, offset.fromOffset, offset.untilOffset)
        DBs.setupAll()
        // 将partition及对应的untilOffset存到MySQL中
        val saveoffset = DB localTx {
          implicit session =>
           sql"DELETE FROM offsetinfo WHERE topic = ${offset.topic} AND partitionname = ${offset.partition}".update.apply()
            sql"INSERT INTO offsetinfo (topic, partitionname, untilOffset) VALUES (${offset.topic},${offset.partition},${offset.untilOffset})".update.apply()
        }
      }
    })

    // 处理从kafka获取的message信息
    val parameter = messages.flatMap(line => {
      //获取服务端事件日期 reqts_day
      val reqts_day = try {
        new DateTime(JSON.parseObject(line._2).getJSONObject("i").getLong("timestamp") * 1000).toDateTime.toString("yyyy-MM-dd HH:mm:ss")
      } catch {
        case ex: Exception => "(unknown)"
      }

      //获取 设备号
      val cookieid = try {
        JSON.parseObject(line._2).getJSONObject("d").get("d")    //将Json字符串转化为相应的对象  .getString("kid")
      } catch {
        case ex: Exception => "(unknown)"
      }

      //组合成一个字符串
      val data = reqts_day + "##" + cookieid
      Some(data)       //some是一定有值的, some.get获取值,如果没有值,会报异常
    }).map(_.split("##")).map(x => (x(0),x(1)))

    println("------------------")

    parameter.foreachRDD{ rdd =>

      val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      import sqlContext.implicits._
      // 转换成DataFrame
      val SaveParameter = rdd.map(w => dataschema(w._1.toString,w._2.toString)).toDF("data_date","cookies_num")
      // 注册视图
      SaveParameter.createOrReplaceTempView("dau_tmp_table")
      val insertsql =sqlContext.sql("select * from dau_tmp_table")
      insertsql.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/userprofile_test","dau_tmp_table",ParamsUtils.mysql.mysqlProp)

    }

    messages.print()
    ssc.start()
    ssc.awaitTermination()

  }
}
作者 east
Elasticsearch 8月 31,2020

Elsticsearch 数据查询流程

Elasticsearch在存储数据的时候默认是根据每条记录的_id字段做路由分发的,这意味着es服务端是准确知道每个document分布在那个shard上的。

相对比于CURD上操作,search一个比较复杂的执行模式,因为我们不知道那些document会被匹配到,任何一个shard上都有可能,所以一个search请求必须查询一个索引或多个索引里面的所有shard才能完整的查询到我们想要的结果。

找到所有匹配的结果是查询的第一步,来自多个shard上的数据集在分页返回到客户端的之前会被合并到一个排序后的list列表,由于需要经过一步取top N的操作,所以search需要进过两个阶段才能完成,分别是query和fetch。

如下图所示:

(一)query(查询阶段)

当一个search请求发出的时候,这个query会被广播到索引里面的每一个shard(主shard或副本shard),每个shard会在本地执行查询请求后会生成一个命中文档的优先级队列。

这个队列是一个排序好的top N数据的列表,它的size等于from+size的和,也就是说如果你的from是10,size是10,那么这个队列的size就是20,所以这也是为什么深度分页不能用from+size这种方式,因为from越大,性能就越低。

es里面分布式search的查询流程如下:

1,客户端发送一个search请求到Node 3上,然后Node 3会创建一个优先级队列它的大小=from+size

2,接着Node 3转发这个search请求到索引里面每一个主shard或者副本shard上,每个shard会在本地查询然后添加结果到本地的排序好的优先级队列里面。

3,每个shard返回docId和所有参与排序字段的值例如_score到优先级队列里面,然后再返回给coordinating节点也就是Node 3,然后Node 3负责将所有shard里面的数据给合并到一个全局的排序的列表。

上面提到一个术语叫coordinating node,这个节点是当search请求随机负载的发送到一个节点上,然后这个节点就会成为一个coordinating node,它的职责是广播search请求到所有相关的shard上,然后合并他们的响应结果到一个全局的排序列表中然后进行第二个fetch阶段,注意这个结果集仅仅包含docId和所有排序的字段值,search请求可以被主shard或者副本shard处理,这也是为什么我们说增加副本的个数就能增加搜索吞吐量的原因,coordinating节点将会通过round-robin的方式自动负载均衡。

(二)fetch(读取阶段)

query阶段标识了那些文档满足了该次的search请求,但是我们仍然需要检索回document整条数据,这个阶段称为fetch。

流程如下:

1,coordinating 节点标识了那些document需要被拉取出来,并发送一个批量的mutil get请求到相关的shard上。

2,每个shard加载相关document,如果需要他们将会被返回到coordinating 节点上。

3,一旦所有的document被拉取回来,coordinating节点将会返回结果集到客户端上。

作者 east
Elasticsearch 8月 31,2020

Elsticsearch 数据写入流程

前言

由于Elasticsearch使用Lucene来处理shard级别的索引和查询,因此数据目录中的文件由Elasticsearch和Lucene共同编写。

Lucene负责编写和维护Lucene索引文件,而Elasticsearch在Lucene之上编写与功能相关的元数据,例如字段映射,索引设置和其他集群元数据,用户和支持功能由Elasticsearch提供。

ES数据

Node Data

只需启动ES就会有生成下面的目录树:

node.lock:为了确保一次只有一个ES应用从这个数据目录读取/写入。

global-0.st:global-prefix表示这是一个全局状态的文件,而.st扩展名表示这是一个包含元数据状态的文件。这个二进制文件是包含集群的全局元数据,前缀后的数字表示集群元数据版本,这是一个严格增加的版本控制方案。版本号跟随集群而变。

不要去编辑这些文件,因为有可能会导致数据丢失。

Index Data

当我们创建index,文件目录发生变化:

此时可以看出,已经创建了与索引名称对应的新目录。目录有两类子文件夹:_state和0.。

_state包含state-{version}.st索引状态文件,包含了索引的元数据,例如:创建的时间戳。还包含唯一标识符以及索引的settings和mappings。

数字0表示的shard编号,这个文件夹包含了shard相关的数据。

Shard Data

_state目录包含shard的状态文件,其中包含版本控制以及有关分片是主分片还是副本的信息。

index目录包含Lucene拥有的文件。ES通常不直接写入此文件夹。这些文件构成了ES数据目录的大小。

translog:ES事物日志。ES事物日志确保数据可以安全的写入到索引中,而无需为每个文档执行Lucene commit。提交Lucene index会产生一个新的segment,即fsync()-ed,会导致大量的磁盘I/O,从而影响性能。

Lucene数据

Lucene文件如下表所示:

Name Extension Brief Description
Segemnts File segments_N 存储lucene数据文件的元信息,记录所有segment的元数据信息。主要记录了当前有多少个segment,每个segment有一些基本信息,更新这些信息能定位到每个segment的元信息文件。
Lock File write.lock 防止多个IndexWriters写入同一个文件
Segement Info .si 存储有关段的元数据
Compound File .cfs,.cfe 一个segment包含了如下表的各个文件,为减少打开文件的数量,在segment小的时候,segment的所有文件内容都保存在cfs文件中,cfe文件保存了lucene各文件在cfs文件的位置信息
Fields .fnm 存储fileds的相关信息
Fields Index .fdx 正排存储文件的元数据信息
Fields Data .fdt 存储了正排存储数据,写入的原文存储在这
Term Dictionary .tim 倒排索引的元数据信息
Term Index .tip 倒排索引文件,存储了所有的倒排索引数据
Frequencies .doc 保存了每个term的doc id列表和term在doc中位置
Positions .pos 全文索引的字段,会有该文件,保存了term在doc中的位置
Playloads .pay Stores additional per-position metadata information such as character offsets and user payloads全文索引的字段,使用了一些像payloads的高级特性会有该文件,保存了term在doc中的一些高级特性
Norms .nvd,.nvm 存储索引字段加权数据
Per-Document Values .dvd,.dvm Lucene的docvalues文件,即列式存储,用作聚合和排序
Term Vector Data .tvx,.tvd,.tvf 保存索引字段的矢量信息,用在对term进行高亮,计算文本相关性中使用
Live Documents .liv 记录了segment中删除的doc

数据写入流程

1.segment file: 存储倒排索引的文件,每个segment本质上就是一个倒排索引,每秒都会生成一个segment文件,当文件过多时es会自动进行segment merge(合并文件),合并时会同时将已经标注删除的文档物理删除;

2.commit point(重点理解): 记录当前所有可用的segment,每个commit point都会维护一个.del文件(es删除数据本质是不属于物理删除),当es做删改操作时首先会在.del文件中声明某个document已经被删除,文件内记录了在某个segment内某个文档已经被删除,当查询请求过来时在segment中被删除的文件是能够查出来的,但是当返回结果时会根据commit point维护的那个.del文件把已经删除的文档过滤掉;

3.translog日志文件: 为了防止elasticsearch宕机造成数据丢失保证可靠存储,es会将每次写入数据同时写到translog日志中。

translog的写入也可以设置,默认是request,每次请求都会写入磁盘(fsync),这样就保证所有数据不会丢,但写入性能会受影响;如果改成async,则按照配置触发trangslog写入磁盘,注意这里说的只是trangslog本身的写盘。

4. flush:translog什么时候清空?默认是512mb,或30分钟。这个动作就是flush,同时伴随着segment提交(写入磁盘)。flush之后,这段translog的使命就完成了,因为segment已经写入磁盘,就算故障,也可以从segment文件恢复。

5.fsync:另外,有一个/_flush/sync命令,在做数据节点维护时很有用。其逻辑就是flush translog并且将sync_id同步到各个分片。可以实现快速恢复。

综述:fsync指的是translog本身被写入磁盘的动作;flush指的是逻辑上的刷新,包含一系列逻辑操作。

其实flush API的作用是强制停止translog的fsync行为,提交segment并清除translog。间隔要尽量大一点(需要优化写速度的话,就稍微调大一点flush间隔。)

作者 east

上一 1 … 33 34 35 … 41 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?
  • 如何设计AUTOSAR中的“域控制器”以支持未来扩展?

文章归档

  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (491)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (78)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (34)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (7)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.