gitweixin
  • 首页
  • 小程序代码
    • 资讯读书
    • 工具类
    • O2O
    • 地图定位
    • 社交
    • 行业软件
    • 电商类
    • 互联网类
    • 企业类
    • UI控件
  • 大数据开发
    • Hadoop
    • Spark
    • Hbase
    • Elasticsearch
    • Kafka
    • Flink
    • 数据仓库
    • 数据挖掘
    • flume
    • Kafka
    • Hive
    • shardingsphere
    • solr
  • 开发博客
    • Android
    • php
    • python
    • 运维
    • 技术架构
    • 数据库
  • 程序员网赚
  • bug清单
  • 量化投资
  • 在线查询工具
    • 去行号
    • 在线时间戳转换工具
    • 免费图片批量修改尺寸在线工具
    • SVG转JPG在线工具
    • SVG转PDF/Word
    • SVG转Draw.io可二次编辑格式
    • js代码混淆
    • json格式化及任意折叠展开
    • PDF常用工具

Flink统计连续网购时间超过2个小时的女性网民信息例子

精品微信小程序开发门户,代码全部亲测可用

  • 首页   /  
  • 作者: east
  • ( 页面63 )
Flink 3月 1,2021

Flink统计连续网购时间超过2个小时的女性网民信息例子

Java样例代码

场景说明

场景说明

假定用户有某个网站周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Flink的DataStream应用程序实现如下功能:

说明:

DataStream应用程序可以在Windows环境和Linux环境中运行。

  • 实时统计总计网购时间超过2个小时的女性网民信息。
  • 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志。LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
  • log2.txt:周日网民停留日志。LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60

数据规划

DataStream样例工程的数据存储在文本中。

将log1.txt和log2.txt放置在某路径下,例如”/opt/log1.txt”和”/opt/log2.txt”。

开发思路

统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。

主要分为四个部分:

  1. 读取文本数据,生成相应DataStream,解析数据生成UserRecord信息。
  2. 筛选女性网民上网时间数据信息。
  3. 按照姓名、性别进行keyby操作,并汇总在一个时间窗口内每个女性上网时间。
  4. 筛选连续上网时间超过阈值的用户,并获取结果。

功能介绍

统计连续网购时间超过2个小时的女性网民信息,将统计结果直接打印。

java版代码:

Java样例代码











// 参数解析: // <filePath>为文本读取路径,用逗号分隔。 // <windowTime>为统计数据的窗口跨度,时间单位都是分。 public class FlinkStreamJavaExample { public static void main(String[] args) throws Exception { // 打印出执行flink run的参考命令 System.out.println("use command as: "); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2"); System.out.println("******************************************************************************************"); System.out.println("<filePath> is for text file to read data, use comma to separate"); System.out.println("<windowTime> is the width of the window, time as minutes"); System.out.println("******************************************************************************************"); // 读取文本路径信息,并使用逗号分隔 final String[] filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(","); assert filePaths.length > 0; // windowTime设置窗口时间大小,默认2分钟一个窗口足够读取文本内的所有数据了 final int windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2); // 构造执行环境,使用eventTime处理窗口数据 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); // 读取文本数据流 DataStream<String> unionStream = env.readTextFile(filePaths[0]); if (filePaths.length > 1) { for (int i = 1; i < filePaths.length; i++) { unionStream = unionStream.union(env.readTextFile(filePaths[i])); } } // 数据转换,构造整个数据处理的逻辑,计算并得出结果打印出来 unionStream.map(new MapFunction<String, UserRecord>() { @Override public UserRecord map(String value) throws Exception { return getRecord(value); } }).assignTimestampsAndWatermarks( new Record2TimestampExtractor() ).filter(new FilterFunction<UserRecord>() { @Override public boolean filter(UserRecord value) throws Exception { return value.sexy.equals("female"); } }).keyBy( new UserRecordSelector() ).window( TumblingEventTimeWindows.of(Time.minutes(windowTime)) ).reduce(new ReduceFunction<UserRecord>() { @Override public UserRecord reduce(UserRecord value1, UserRecord value2) throws Exception { value1.shoppingTime += value2.shoppingTime; return value1; } }).filter(new FilterFunction<UserRecord>() { @Override public boolean filter(UserRecord value) throws Exception { return value.shoppingTime > 120; } }).print(); // 调用execute触发执行 env.execute("FemaleInfoCollectionPrint java"); } // 构造keyBy的关键字作为分组依据 private static class UserRecordSelector implements KeySelector<UserRecord, Tuple2<String, String>> { @Override public Tuple2<String, String> getKey(UserRecord value) throws Exception { return Tuple2.of(value.name, value.sexy); } } // 解析文本行数据,构造UserRecord数据结构 private static UserRecord getRecord(String line) { String[] elems = line.split(","); assert elems.length == 3; return new UserRecord(elems[0], elems[1], Integer.parseInt(elems[2])); } // UserRecord数据结构的定义,并重写了toString打印方法 public static class UserRecord { private String name; private String sexy; private int shoppingTime; public UserRecord(String n, String s, int t) { name = n; sexy = s; shoppingTime = t; } public String toString() { return "name: " + name + " sexy: " + sexy + " shoppingTime: " + shoppingTime; } } // 构造继承AssignerWithPunctuatedWatermarks的类,用于设置eventTime以及waterMark private static class Record2TimestampExtractor implements AssignerWithPunctuatedWatermarks<UserRecord> { // add tag in the data of datastream elements @Override public long extractTimestamp(UserRecord element, long previousTimestamp) { return System.currentTimeMillis(); } // give the watermark to trigger the window to execute, and use the value to check if the window elements is ready @Override public Watermark checkAndGetNextWatermark(UserRecord element, long extractedTimestamp) { return new Watermark(extractedTimestamp - 1); } } }

scala版本:

Scala样例代码











// 参数解析: // filePath为文本读取路径,用逗号分隔。 // windowTime;为统计数据的窗口跨度,时间单位都是分。 object FlinkStreamScalaExample { def main(args: Array[String]) { // 打印出执行flink run的参考命令 System.out.println("use command as: ") System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/test.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2") System.out.println("******************************************************************************************") System.out.println("<filePath> is for text file to read data, use comma to separate") System.out.println("<windowTime> is the width of the window, time as minutes") System.out.println("******************************************************************************************") // 读取文本路径信息,并使用逗号分隔 val filePaths = ParameterTool.fromArgs(args).get("filePath", "/opt/log1.txt,/opt/log2.txt").split(",").map(_.trim) assert(filePaths.length > 0) // windowTime设置窗口时间大小,默认2分钟一个窗口足够读取文本内的所有数据了 val windowTime = ParameterTool.fromArgs(args).getInt("windowTime", 2) // 构造执行环境,使用eventTime处理窗口数据 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) // 读取文本数据流 val unionStream = if (filePaths.length > 1) { val firstStream = env.readTextFile(filePaths.apply(0)) firstStream.union(filePaths.drop(1).map(it => env.readTextFile(it)): _*) } else { env.readTextFile(filePaths.apply(0)) } // 数据转换,构造整个数据处理的逻辑,计算并得出结果打印出来 unionStream.map(getRecord(_)) .assignTimestampsAndWatermarks(new Record2TimestampExtractor) .filter(_.sexy == "female") .keyBy("name", "sexy") .window(TumblingEventTimeWindows.of(Time.minutes(windowTime))) .reduce((e1, e2) => UserRecord(e1.name, e1.sexy, e1.shoppingTime + e2.shoppingTime)) .filter(_.shoppingTime > 120).print() // 调用execute触发执行 env.execute("FemaleInfoCollectionPrint scala") } // 解析文本行数据,构造UserRecord数据结构 def getRecord(line: String): UserRecord = { val elems = line.split(",") assert(elems.length == 3) val name = elems(0) val sexy = elems(1) val time = elems(2).toInt UserRecord(name, sexy, time) } // UserRecord数据结构的定义 case class UserRecord(name: String, sexy: String, shoppingTime: Int) // 构造继承AssignerWithPunctuatedWatermarks的类,用于设置eventTime以及waterMark private class Record2TimestampExtractor extends AssignerWithPunctuatedWatermarks[UserRecord] { // add tag in the data of datastream elements override def extractTimestamp(element: UserRecord, previousTimestamp: Long): Long = { System.currentTimeMillis() } // give the watermark to trigger the window to execute, and use the value to check if the window elements is ready def checkAndGetNextWatermark(lastElement: UserRecord, extractedTimestamp: Long): Watermark = { new Watermark(extractedTimestamp - 1) } } }
作者 east
solr 3月 1,2021

Solr增删改查例子

Solr初始化

Solr初始化

功能简介

Solr初始化是指在使用Solr提供的API之前,需要做的必要工作。目的是取得与SolrCoud的连接。

说明:

在进行完Solr操作后,需要调用cloudSolrClient.close()关闭所申请的资源。

Solr初始化





/** *初始化CloudSolrClient实例,连接SolrCloud private CloudSolrClient getCloudSolrClient(String zkHost) throws SolrException { Builder builder = new CloudSolrClient.Builder(); builder.withZkHost(zkHost); CloudSolrClient cloudSolrClient = builder.build(); cloudSolrClient.setZkClientTimeout(zkClientTimeout); cloudSolrClient.setZkConnectTimeout(zkConnectTimeout); cloudSolrClient.connect(); LOG.info("The cloud Server has been connected !!!!"); ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader(); ClusterState cloudState = zkStateReader.getClusterState(); LOG.info("The zookeeper state is : {}", cloudState); return cloudSolrClient; }

查询collection

查询collection

功能简介

通过调用CollectionAdminRequest.List的process(cloudSolrClient)并调用返回的response来获取所有collection的名字。

代码样例

private List<String> queryAllCollections(CloudSolrClient 
                             cloudSolrClient) throws SolrException {
        CollectionAdminRequest.List list = 
        new CollectionAdminRequest.List();
        CollectionAdminResponse listRes = null;
        try {
            listRes = list.process(cloudSolrClient);
        } catch (SolrServerException | IOException e) {
            LOG.error("Failed to list collection", e);
            throw new SolrException("Failed to list collection");
        } catch (Exception e) {
            LOG.error("Failed to list collection", e);
            throw new SolrException("unknown exception");
        }

        List<String> collectionNames = (List<String>) 
        listRes.getResponse().get("collections");
        LOG.info("All existed collections : {}", collectionNames);
        return collectionNames;
    }

删除collection

删除collection

功能简介

通过调用CollectionAdminRequest.Delete的process(cloudSolrClient)并调用返回的response来判断是否执行删除collection操作成功。

代码样例

private void deleteCollection(CloudSolrClient cloudSolrClient) 
                                               throws SolrException {
        CollectionAdminRequest.Delete delete = 
        new CollectionAdminRequest.Delete();
        delete.setCollectionName(COLLECTION_NAME);
        CollectionAdminResponse response = null;
        try {
            response = delete.process(cloudSolrClient);
        } catch (SolrServerException | IOException e) {
            LOG.error("Failed to delete collection", e);
            throw new SolrException("Failed to create collection");
        } catch (Exception e) {
            LOG.error("Failed to delete collection", e);
            throw new SolrException("unknown exception");
        }
        if (response.isSuccess()) {
            LOG.info("Success to delete collection[{}]", 
            COLLECTION_NAME);
        } else {
            LOG.error("Failed to delete collection[{}], cause : {}",             COLLECTION_NAME, response.getErrorMessages());
            throw new SolrException("Failed to delete collection");
        }
    }

创建collection

创建collection

功能简介

通过调用CollectionAdminRequest.Create的process(cloudSolrClient)并调用返回的response来判断是否执行创建collection操作成功。

代码样例

  private void createCollection(CloudSolrClient cloudSolrClient) throws SolrException {
        CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLLECTION_NAME, DEFAULT_CONFIG_NAME, shardNum, replicaNum);
        CollectionAdminResponse response = null;
        try {
            response = create.process(cloudSolrClient);
        } catch (SolrServerException e) {
            LOG.error("Failed to create collection", e);
            throw new SolrException("Failed to create collection");
        } catch (IOException e) {
            LOG.error("Failed to create collection", e);
            throw new SolrException("Failed to create collection");
        } catch (Exception e) {
            LOG.error("Failed to create collection", e);
            throw new SolrException("unknown exception");
        }
        if (response.isSuccess()) {
            LOG.info("Success to create collection[{}]", COLLECTION_NAME);
        } else {
            LOG.error("Failed to create collection[{}], cause : {}", COLLECTION_NAME, response.getErrorMessages());
            throw new SolrException("Failed to create collection");
        }
    }

添加Doc

添加Doc

功能简介

通过调用cloudSolrClient的add方法或者构造UpdateRequest调用cloudSolrClient的request方法来添加索引数据。

代码样例1

private void addDocs(CloudSolrClient cloudSolrClient) throws SolrException {
        Collection<SolrInputDocument> documents = new ArrayList<SolrInputDocument>();
        for (Integer i = 0; i < 5; i++) {
            SolrInputDocument doc = new SolrInputDocument();
            doc.addField("id", i.toString());
            doc.addField("name", "Luna_" + i);
            doc.addField("features", "test" + i);
            doc.addField("price", (float) i * 1.01);
            documents.add(doc);
        }
        try {
            cloudSolrClient.add(documents);
            LOG.info("success to add index");
        } catch (SolrServerException e) {
            LOG.error("Failed to add document to collection", e);
            throw new SolrException("Failed to add document to collection");
        } catch (IOException e) {
            LOG.error("Failed to add document to collection", e);
            throw new SolrException("Failed to add document to collection");
        } catch (Exception e) {
            LOG.error("Failed to add document to collection", e);
            throw new SolrException("unknown exception");
        }
    }

代码样例2

private void addDocs2(CloudSolrClient cloudSolrClient) throws 
SolrException{
    UpdateRequest request = new UpdateRequest();
    Collection<SolrInputDocument> documents = new ArrayList<>();
    for (Integer i = 5; i < 10; i++) {
        SolrInputDocument doc = new SolrInputDocument();
        doc.addField("id", i.toString());
        doc.addField("name", "张三" + i);
        doc.addField("features", "test" + i);
        doc.addField("price", (float) i * 1.01);
        documents.add(doc);
     }
     request.add(documents);
    try {
        cloudSolrClient.request(request);
        cloudSolrClient.commit();
    } catch (SolrServerException | IOException e) {
        LOG.error("Failed to add document to collection", e);
        throw new SolrException("Failed to add document to 
        collection");
    }
 }

查询Doc

查询Doc

功能简介

通过构造SolrQuery实例,并调用cloudSolrClient.query接口来查询索引数据。

样例代码

    private void queryIndex(CloudSolrClient cloudSolrClient) throws SolrException {
        SolrQuery query = new SolrQuery();
        query.setQuery("name:Luna*");

        try {
            QueryResponse response = cloudSolrClient.query(query);
            SolrDocumentList docs = response.getResults();
            LOG.info("Query wasted time : {}ms", response.getQTime());

            LOG.info("Total doc num : {}", docs.getNumFound());
            for (SolrDocument doc : docs) {
                LOG.info("doc detail : " + doc.getFieldValueMap());
            }
        } catch (SolrServerException e) {
            LOG.error("Failed to query document", e);
            throw new SolrException("Failed to query document");
        } catch (IOException e) {
            LOG.error("Failed to query document", e);
            throw new SolrException("Failed to query document");
        } catch (Exception e) {
            LOG.error("Failed to query document", e);
            throw new SolrException("unknown exception");
        }
    }

删除Doc

删除Doc

功能简介

通过调用cloudSolrClient.deleteByQuery方法删除指定匹配的索引数据。

代码样例

private void removeIndex(CloudSolrClient cloudSolrClient) throws 
SolrException {
        try {
            cloudSolrClient.deleteByQuery("*:*");
            cloudSolrClient.commit();
            LOG.info("Success to delete index");
        } catch (SolrServerException | IOException e){
            LOG.error("Failed to remove document", e);
            throw new SolrException("Failed to remove document");
        }
    }
作者 east
Hive 3月 1,2021

Hive数据查询

数据查询

功能介绍

本小节介绍了如何使用HQL对数据进行查询分析。从本节中可以掌握如下查询分析方法:

  • SELECT查询的常用特性,如JOIN等。
  • 加载数据进指定分区。
  • 如何使用Hive自带函数。
  • 如何使用自定义函数进行查询分析,如何创建、定义自定义函数请见用户自定义函数。

样例代码

-- 查看薪水支付币种为美元的雇员联系方式. 
SELECT  
a.name,  
b.tel_phone,  
b.email  
FROM employees_info a JOIN employees_contact b  ON(a.id = b.id) WHERE usd_flag='D'; 
 
-- 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中. 
INSERT OVERWRITE TABLE employees_info_extended PARTITION (entrytime = '2014')  
SELECT  
a.id,  
a.name,  
a.usd_flag,  
a.salary,  
a.deductions,  
a.address, 
b.tel_phone, 
b.email  
FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE a.entrytime = '2014'; 
 
-- 使用Hive中已有的函数COUNT(),统计表employees_info中有多少条记录. 
SELECT COUNT(*) FROM employees_info; 
 
-- 查询使用以“cn”结尾的邮箱的员工信息. 
SELECT a.name, b.tel_phone FROM  employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE b.email like '%cn'; 

扩展使用

  • 配置Hive中间过程的数据加密 指定表的格式为RCFile(推荐使用)或SequenceFile,加密算法为ARC4Codec。SequenceFile是Hadoop特有的文件格式,RCFile是Hive优化的文件格式。RCFile优化了列存储,在对大表进行查询时,综合性能表现比SequenceFile更优。 set hive.exec.compress.output=true; set hive.exec.compress.intermediate=true; set hive.intermediate.compression.codec=org.apache.hadoop.io.encryption.arc4.ARC4Codec;
作者 east
Hive 3月 1,2021

Hive创建表示例

创建表

功能介绍

本小节介绍了如何使用HQL创建内部表、外部表的基本操作。创建表主要有以下三种方式:

  • 自定义表结构,以关键字EXTERNAL区分创建内部表和外部表。
    • 内部表,如果对数据的处理都由Hive完成,则应该使用内部表。在删除内部表时,元数据和数据一起被删除。
    • 外部表,如果数据要被多种工具(如Pig等)共同处理,则应该使用外部表,可避免对该数据的误操作。删除外部表时,只删除掉元数据。
  • 根据已有表创建新表,使用CREATE LIKE句式,完全复制原有的表结构,包括表的存储格式。
  • 根据查询结果创建新表,使用CREATE AS SELECT句式。 这种方式比较灵活,可以在复制原表表结构的同时指定要复制哪些字段,不包括表的存储格式。

样例代码

-- 创建外部表employees_info. 
CREATE EXTERNAL TABLE IF NOT EXISTS employees_info 
( 
id INT, 
name STRING, 
usd_flag STRING, 
salary DOUBLE, 
deductions MAP<STRING, DOUBLE>, 
address STRING, 
entrytime STRING 
) 
-- 指定行中各字段分隔符. 
-- "delimited fields terminated by"指定列与列之间的分隔符为',',"MAP KEYS TERMINATED BY"指定MAP中键值的分隔符为'&'. 
ROW FORMAT delimited fields terminated by ',' MAP KEYS TERMINATED BY '&'  
-- 指定表的存储格式为TEXTFILE. 
STORED AS TEXTFILE;  
 
-- 使用CREATE Like创建表. 
CREATE TABLE employees_like LIKE employees_info; 
 
-- 使用DESCRIBE查看employees_info、employees_like、 employees_as_select表结构. 
DESCRIBE employees_info; 
DESCRIBE employees_like; 

扩展应用

  • 创建分区表 一个表可以拥有一个或者多个分区,每个分区以文件夹的形式单独存在表文件夹的目录下。对分区内数据进行查询,可缩小查询范围,加快数据的检索速度和可对数据按照一定的条件进行管理。 分区是在创建表的时候用PARTITIONED BY子句定义的。 CREATE EXTERNAL TABLE IF NOT EXISTS employees_info_extended ( id INT, name STRING, usd_flag STRING, salary DOUBLE, deductions MAP<STRING, DOUBLE>, address STRING ) — 使用关键字PARTITIONED BY指定分区列名及数据类型 . PARTITIONED BY (entrytime STRING) STORED AS TEXTFILE;
  • 更新表的结构 一个表在创建完成后,还可以使用ALTER TABLE执行增、删字段,修改表属性,添加分区等操作 — 为表employees_info_extended增加tel_phone、email字段. ALTER TABLE employees_info_extended ADD COLUMNS (tel_phone STRING, email STRING);
  • 建表时配置Hive数据加密 指定表的格式为RCFile(推荐使用)或SequenceFile,加密算法为ARC4Codec。SequenceFile是Hadoop特有的文件格式,RCFile是Hive优化的文件格式。RCFile优化了列存储,在对大表进行查询时,综合性能表现比SequenceFile更优。 set hive.exec.compress.output=true; set hive.exec.compress.intermediate=true; set hive.intermediate.compression.codec=org.apache.hadoop.io.encryption.arc4.ARC4Codec; create table seq_Codec (key string, value string) stored as RCFile;
作者 east
Hbase 3月 1,2021

Hbase 基于二级索引的查询

基于二级索引的查询

功能介绍

针对添加了二级索引的用户表,您可以通过Filter来查询数据。其数据查询性能高于针对无二级索引用户表的数据查询。

二级索引的使用规则如下:

  • 针对某一列或者多列创建了单索引的场景下:
    • 当查询时使用此列进行过滤时,不管是AND还是OR操作,该索引都会被利用来提升查询性能。 例如:Filter_Condition(IndexCol1) AND/OR Filter_Condition(IndexCol2)
    • 当查询时使用“索引列AND非索引列”过滤时,此索引会被利用来提升查询性能。 例如:Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(NonIndexCol1)
    • 当查询时使用“索引列OR非索引列”过滤时,此索引将不会被使用,查询性能不会因为索引得到提升。 例如:Filter_Condition(IndexCol1) AND/OR Filter_Condition(IndexCol2) OR Filter_Condition(NonIndexCol1)
  • 针对多个列创建的联合索引场景下:
    • 当查询时使用的列(多个),是联合索引所有对应列的一部分或者全部,且列的顺序与联合索引一致时,此索引会被利用来提升查询性能。 例如,针对C1、C2、C3列创建了联合索引,生效的场景包括: Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) Filter_Condition(IndexCol1) 不生效的场景包括: Filter_Condition(IndexCol2) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol2) Filter_Condition(IndexCol3)
    • 当查询时使用“索引列AND非索引列”过滤时,此索引会被利用来提升查询性能。 例如: Filter_Condition(IndexCol1) AND Filter_Condition(NonIndexCol1) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(NonIndexCol1)
    • 当查询时使用“索引列OR非索引列”过滤时,此索引不会被使用,查询性能不会因为索引得到提升。 例如: Filter_Condition(IndexCol1) OR Filter_Condition(NonIndexCol1) (Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2))OR ( Filter_Condition(NonIndexCol1))
    • 当查询时使用多个列进行范围查询时,只有联合索引中最后一个列可指定取值范围,前面的列只能设置为“=”。 例如:针对C1、C2、C3列创建了联合索引,需要进行范围查询时,只能针对C3设置取值范围,过滤条件为“C1=XXX,C2=XXX,C3=取值范围”。
  • 针对添加了二级索引的用户表,可以通过Filter来查询数据,在单列索引和复合列索引上进行过滤查询,查询结果都与无索引结果相同,且其数据查询性能高于无二级索引用户表的数据查询性能。

代码样例

下面代码片段在com.huawei.hadoop.hbase.example包的“HBaseSample”类的testScanDataByIndex方法中:

样例:使用二级索引查找数据

  public void testScanDataByIndex() {
    LOG.info("Entering testScanDataByIndex.");
    Table table = null;
    ResultScanner scanner = null;
    try {
      table = conn.getTable(tableName);
      
      // Create a filter for indexed column.
      Filter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("name"),
          CompareOp.EQUAL, "Li Gang".getBytes());
      Scan scan = new Scan();
      scan.setFilter(filter);
      scanner = table.getScanner(scan);
      LOG.info("Scan indexed data.");
      
      for (Result result : scanner) {
        for (Cell cell : result.rawCells()) {
          LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":"
              + Bytes.toString(CellUtil.cloneFamily(cell)) + ","
              + Bytes.toString(CellUtil.cloneQualifier(cell)) + ","
              + Bytes.toString(CellUtil.cloneValue(cell)));
        }
      }
      LOG.info("Scan data by index successfully.");
    } catch (IOException e) {
      LOG.error("Scan data by index failed.");
    } finally {
      if (scanner != null) {
        // Close the scanner object.
        scanner.close();
      }
      try {
        if (table != null) {
          table.close();
        }
      } catch (IOException e) {
        LOG.error("Close table failed.");
      }
    }
    
    LOG.info("Exiting testScanDataByIndex.");
  }

注意事项

需要预先对字段name创建二级索引。

相关操作

基于二级索引表查询。

查询样例如下:

用户在hbase_sample_table的info列族的name列添加一个索引,在客户端执行,

hbase org.apache.hadoop.hbase.hindex.mapreduce.TableIndexer -Dtablename.to.index=hbase_sample_table -Dindexspecs.to.add='IDX1=>info:[name->String]'

然后用户需要查询“info:name”,在hbase shell执行如下命令:

>scan ‘hbase_sample_table’,{FILTER=>”SingleColumnValueFilter(family,qualifier,compareOp,comparator,filterIfMissing,latestVersionOnly)”}

说明:

hbase shell下面做复杂的查询请使用API进行处理。

参数说明:

  • family:需要查询的列所在的列族,例如info;
  • qualifier:需要查询的列,例如name;
  • compareOp:比较符,例如=、>等;
  • comparator:需要查找的目标值,例如binary:Zhang San;
  • filterIfMissing:如果某一行不存在该列,是否过滤,默认值为false;
  • latestVersionOnly:是否仅查询最新版本的值,默认值为false。

例如:

>scan 'hbase_sample_table',{FILTER=>"SingleColumnValueFilter('info','name',=,'binary:Zhang San',true,true)"}
作者 east
Hbase 3月 1,2021

Hbase创建二级索引

创建二级索引

功能简介

一般都通过调用org.apache.hadoop.hbase.hindex.client.HIndexAdmin中方法进行HBase二级索引的管理,该类中提供了创建索引的方法。

说明:

二级索引不支持修改,如果需要修改,请先删除旧的然后重新创建。

代码样例

以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的createIndex方法中。

public void createIndex() {     
LOG.info("Entering createIndex.");
String indexName = "index_name";
// Create index instance
TableIndices tableIndices = new TableIndices();
IndexSpecification iSpec = new IndexSpecification(indexName); iSpec.addIndexColumn(new HColumnDescriptor("info"), "name", ValueType.String);//注[1]
tableIndices.addIndex(iSpec);
HIndexAdmin iAdmin = null;
Admin admin = null;
try {
admin = conn.getAdmin();
iAdmin = new IndexAdmin(conf);
// add index to the table
iAdmin.addIndices(tableName, tableIndices);
LOG.info("Create index successfully.");
} catch (IOException e) {
LOG.error("Create index failed " ,e);
} finally {
if (admin != null) {
try {
admin.close();
} catch (IOException e) {
LOG.error("Close admin failed " ,e);
}
}
if (iAdmin != null) {
try {
// Close IndexAdmin Object
iAdmin.close();
} catch (IOException e) {
LOG.error("Close admin failed " ,e);
}
}
}
LOG.info("Exiting createIndex.");
}

新创建的二级索引默认是不启用的,如果需要启用指定的二级索引,可以参考如下代码片段。该代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的enableIndex方法中。

  public void enableIndex() {
    LOG.info("Entering createIndex.");

    // Name of the index to be enabled
    String indexName = "index_name";

    List<String> indexNameList = new ArrayList<String>();
    indexNameList.add(indexName);
    HIndexAdmin iAdmin = null;
    try {
      iAdmin = HIndexClient.newHIndexAdmin(conn.getAdmin());
      // Alternately, enable the specified indices
      iAdmin.enableIndices(tableName, indexNameList);
      System.out.println("Successfully enable indices " + indexNameList + " of the table " + tableName);
    } catch (IOException e) {
      System.out.println("Failed to enable indices " + indexNameList + " of the table " + tableName + "." + e);
    } finally {
      if (iAdmin != null) {
        try {
          iAdmin.close();
        } catch (IOException e) {
          LOG.error("Close admin failed ", e);
        }
      }
    }
  }

注意事项

注[1]:创建联合索引

HBase支持在多个字段上创建二级索引,例如在列name和age上。

HIndexSpecification iSpecUnite = new HIndexSpecification(indexName); 
 iSpecUnite.addIndexColumn(new HColumnDescriptor("info"), "name", ValueType.String); 
 iSpecUnite.addIndexColumn(new HColumnDescriptor("info"), "age", ValueType.String);

相关操作

使用命令创建索引表。

您还可以通过TableIndexer工具在已有用户表中创建索引。

说明:

<table_name>用户表必须存在。

hbase org.apache.hadoop.hbase.index.mapreduce.TableIndexer -Dindexspecs.to.add=<table_name> -Dtable.columns.index='IDX1=>cf1:[q1->datatype&length];cf2:[q1->datatype],[q2->datatype],[q3->datatype]#IDX2=>cf1:[q5->datatype&length]

“#”用于区分不同的索引,“;”用于区分不同的列族,“,”用于区分不同的列。

tablename.to.index:创建索引的用户表表名。

indexspecs.to.add:创建索引对应的用户表列。

其中命令中各参数的含义如下:

  • IDX1:索引名称
  • cf1:列族名称。
  • q1:列名。
  • datatype:数据类型。数据类型仅支持Integer、String、Double、Float、Long、Short、Byte、Char类型。
作者 east
Hbase 3月 1,2021

HBase支持全文索引

HBase支持全文索引

功能简介

通过org.apache.luna.client.LunaAdmin对象的createTable方法来创建表和索引,并指定表名、列族名、索引创建请求,mapping文件所在目录路径。也可通过addCollection往已有表中添加索引。查询时通过org.apache.luna.client.LunaAdmin对象的getTable方法来获取Table对象进行scan操作。

说明:

表的列名以及列族名不能包含特殊字符,可以由字母、数字以及下划线组成。

带有全文索引的HBase表限制:

1、不支持多实例;

2、不支持容灾备份恢复;

3、不支持删除行/列族操作;

4、Solr侧查询不支持强一致性;

代码样例片段

以下代码片段在com.huawei.bigdata.hbase.examples包的“LunaSample”类的testFullTextScan方法中。

  public static void testFullTextScan() throws Exception {
    /**
     * Create create request of Solr. Specify collection name, confset name,
     * number of shards, and number of replication factor.
     */
    Create create = new Create();
    create.setCollectionName(COLLECTION_NAME);
    create.setConfigName(CONFSET_NAME);
    create.setNumShards(NUM_OF_SHARDS);
    create.setReplicationFactor(NUM_OF_REPLICATIONFACTOR);
    /**
     * Create mapping. Specify index fields(mandatory) and non-index
     * fields(optional).
     */
    List<ColumnField> indexedFields = new ArrayList<ColumnField>();
    indexedFields.add(new ColumnField("name", "f:n"));
    indexedFields.add(new ColumnField("cat", "f:t"));
    indexedFields.add(new ColumnField("features", "f:d"));
    Mapping mapping = new Mapping(indexedFields);
    /**
     * Create table descriptor of HBase.
     */
    HTableDescriptor desc = new HTableDescriptor(HBASE_TABLE);
    desc.addFamily(new HColumnDescriptor(TABLE_FAMILY));
    /**
     * Create table and collection at the same time.
     */
    LunaAdmin admin = null;
    try {
      admin = new AdminSingleton().getAdmin();
      admin.deleteTable(HBASE_TABLE);
      if (!admin.tableExists(HBASE_TABLE)) {
        admin.createTable(desc, Bytes.toByteArrays(new String[] { "0", "1", "2", "3", "4" }),
            create, mapping);
      }
      /**
       * Put data.
       */
      Table table = admin.getTable(HBASE_TABLE);
      int i = 0;
      while (i < 5) {
        byte[] row = Bytes.toBytes(i + "+sohrowkey");
        Put put = new Put(row);
        put.addColumn(TABLE_FAMILY, Bytes.toBytes("n"), Bytes.toBytes("ZhangSan" + i));
        put.addColumn(TABLE_FAMILY, Bytes.toBytes("t"), Bytes.toBytes("CO" + i));
        put.addColumn(TABLE_FAMILY, Bytes.toBytes("d"), Bytes.toBytes("Male, Leader of M.O" + i));
        table.put(put);
        i++;
      }

      /**
       * Scan table.
       */
      Scan scan = new Scan();
      SolrQuery query = new SolrQuery();
      query.setQuery("name:ZhangSan1 AND cat:CO1");
      Filter filter = new FullTextFilter(query, COLLECTION_NAME);
      scan.setFilter(filter);
      ResultScanner scanner = table.getScanner(scan);
      LOG.info("-----------------records----------------");
      for (Result r = scanner.next(); r != null; r = scanner.next()) {
        for (Cell cell : r.rawCells()) {
          LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":"
              + Bytes.toString(CellUtil.cloneFamily(cell)) + ","
              + Bytes.toString(CellUtil.cloneQualifier(cell)) + ","
              + Bytes.toString(CellUtil.cloneValue(cell)));
        }
      }
      LOG.info("-------------------end------------------");
      /**
       * Delete collection.
       */
      admin.deleteCollection(HBASE_TABLE, COLLECTION_NAME);

      /**
       * Delete table.
       */
      admin.deleteTable(HBASE_TABLE);
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      /**
       * When everything done, close LunaAdmin.
       */
      admin.close();
    }
  }

解释

(1)创建索引请求

(2)创建表描述符

(3)获取LunaAdmin对象,LunaAdmin提供了建表和索引、添加索引、检查表是否存在、检查索引是否存在、删除索引和删除表等功能。

(4)调用LunaAdmin的建表方法。

(5)往表中插入数据。

(6)构造全文索引条件,设置FullTextFilter,进行查询。

(7)删除索引。

(8)删除表。

(9)关闭admin资源。

注意事项

  • 创建表和索引都必须不存在。
  • 必须使用LunaAdmin获取Table对象进行scan操作。
作者 east
bug清单 2月 28,2021

ZooKeeper客户端无法使用

ZooKeeper客户端无法使用

现象描述

当往ZooKeeper节点写入超过4MB数据的文件时,ZooKeeper客户端无法使用,出现如下信息。

2014-11-07 15:23:34,237 | WARN | NIOServerCxn.Factory:/10.18.51.157:24002 | 
Exception causing close of session 0xe4985ef3128000d due to java.io.IOException: Len error 1080037 | 
org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:362)
2014-11-07 15:23:34,238 | INFO | NIOServerCxn.Factory:/10.18.51.157:24002 | 
Closed socket connection for client /10.18.51.156:44987 which had sessionid 0xe4985ef3128000d | 
org.apache.zookeeper.server.NIOServerCnxn.closeSock(NIOServerCnxn.java:1007)

可能原因

ZooKeeper的数据大小是由“jute.maxbuffer”参数的值决定的。如果数据超过配置的值,服务端会拒绝访问并出现以上异常。该参数的默认值为4MB,且该参数不能修改。在集群的服务端和客户端必须保持这两个参数的一致性。

定位思路

无。

处理步骤

建议用户不要去修改“jute.maxbuffer”参数的值,在ZooKeeper节点写入数据时,确保单个文件的大小不超过4MB。为确保ZooKeeper的性能,建议不要将大量的数据写入到ZooKeeper节点中。

作者 east
bug清单 2月 28,2021

NodeManager出现DBException导致无法启动

NodeManager出现DBException导致无法启动

现象描述

NodeManager无法启动。NodeManager日志中显示如下错误信息。

org.fusesource.leveldbjni.internal.NativeDB$DBException: Corruption: 1 missing files;

可能原因

当“yarn.nodemanager.recovery.enabled”=“true”时,并且由于磁盘空间不足或文件句柄用尽,使“levelDB”作为NodeManager恢复被损坏时,会发生此异常情况。

处理步骤

  1. 删除在“yarn-site.xml”中“{yarn.nodemanager.recovery.dir}/yarn-nm-state”所指定的文件夹。 其产生结果如下:
    • 对于已经在这个节点上完成的应用程序,其日志聚合可能会受影响。您需要手动删除已完成应用残留的文件或文件夹。进入{yarn.nodemanager.log-dirs}参数指定的路径,删除应用ID对应的文件夹。
    • 在此节点上已本地化的临时文件和container临时文件不会被清理。您需要手动删除已完成应用残留的文件和文件夹。进入{yarn.nodemanager.local-dirs}参数指定的路径,删除应用ID对应的文件夹。
    • 由于Container-tokens在NodeManager停用之后没有刷新,当前container可能会失败。
  2. 启动NodeManager。
作者 east
bug清单 2月 28,2021

Hive 执行动态插入分区时,在MapReduce日志中报“java.lang.OutOfMemoryError: GC overhead limit exceeded”错误

执行动态插入分区时,在MapReduce日志中报“java.lang.OutOfMemoryError: GC overhead limit exceeded”错误

现象描述

在HiveServer服务正常的情况下,执行动态插入分区时,在MapReduce日志中报“java.lang.OutOfMemoryError: GC overhead limit exceeded”错误。

可能原因

产生OOM的原因是单个任务处理的分区数过多,需要针对具体场景,减少单个task处理的分区数。

定位思路

参照如下样例进行操作。

样例建表语句如下:

create table test(id int )partitioned by (dt int);

create table test1(id int, dt int);

正常的动态插入分区语句为:

insert overwrite table test partition (dt) select id, dt from test;

处理步骤

  1. 由于dt是分区字段,减少单个task处理分区数的办法是,将分区字段distribute到不同的task来处理。 修改后的语句: insert overwrite table test partition (dt) select id, dt from test1 distribute by dt;
  2. 当distribute by的分区字段存在倾斜时,比如值为NULL的占了很大部分,那么还可以将其打散处理。 存在倾斜字段为NULL时的优化后语句: insert overwrite table test partition (dt) select id, dt from test1 distribute by nvl(dt,round(rand()*50)); 说明: nvl函数是一个将null转换为需要的值的hive内置udf。内置udf的使用,可参考https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF。其中rand返回一个0-1之间的随机数,乘以一个常数50(也可以是其他数字,根据自己任务的并发度合理选取,以能在合理的时间处理完为宜)。 然后通过round函数取整,就能够将值为NULL的分区,分散到多个不同的task中处理。
作者 east
bug清单 2月 28,2021

Hive TEXTFILE类型文件ARC4压缩Select时乱码

TEXTFILE类型文件ARC4压缩Select时乱码

现象描述

Hive查询结果表做压缩存储(ARC4),对结果表做select * 查询时返回结果为乱码。

可能原因

TEXTFILE文件非块文件,使用ARC4按条加密后,读取文件会异常,无法解析内容,导致乱码。

定位思路

  1. 设置压缩类:org.apache.hadoop.io.encryption.arc4.ARC4BlockCodec,按块对TEXTFILE类型文件加密。
  2. 正常设置耗时1分钟以内。

处理步骤

  1. 查询出乱码以后,在beeline客户端执行以下命令。 set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.encryption.arc4.ARC4BlockCodec; set hive.exec.compress.output=true; 输入admin用户的密码完成登录。
  2. 重新导入数据到结果表。 insert overwrite table tbl_result select * from tbl_source;
  3. 执行select命令检查内容是否为乱码。 select * from tbl_result; 没有乱码证明问题得到解决。
作者 east
bug清单 2月 28,2021

Hive客户端连接失败,提示“Read timed out”

客户端连接失败,提示“Read timed out”

现象描述

安全版本的集群中,HiveServer服务正常的情况下,使用Shell客户端或二次开发工具登录HiveServer失败,日志异常提“Read timed out”示关键字,具体信息如下:

org.apache.hadoop.hive.ha.client.HAConnectMonitor.<init>(HAConnectMonitor.java:54) 
 at org.apache.hadoop.hive.ha.client.HATTransport.open(HATTransport.java:158) 
 at org.apache.hadoop.hive.jdbc.ha.HAHiveConnection.<init>(HAHiveConnection.java:60) 
 ... 6 more 
 Caused by: java.net.SocketTimeoutException: Read timed out
 at java.net.SocketInputStream.socketRead0(Native Method) 
 at java.net.SocketInputStream.read(SocketInputStream.java:152) 
 at java.net.SocketInputStream.read(SocketInputStream.java:122) 
 at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)

可能原因

网络原因导致Hive客户端连接HiveServer时,Socket超时,连接失败。

定位思路

  1. Hive客户端连接HiveServer时,是建立Socket连接,当网络丢包时,就有可能导致Socket超时,使Hive客户端连接HiveServer失败。
  2. 执行ping命令检查客户端到HiveServer所在节点的网络连通性与稳定性。
  3. 当网络经过排查和修复达到稳定后,正常连接耗时在1分钟以内。

处理步骤

  1. 在客户端所在机器上,执行ping HiveServer所在节点IP地址命令检查Hive客户端与集群网络质量。 如果执行命令后,如果网络不通或者延迟较大,证明网络状况较差。 请联系网络管理员排查网络问题,以保证满足业务使用。
  2. 增加网络闪断的保护,使用Hive客户端的应用层,增加连接失败重试机
作者 east

上一 1 … 62 63 64 … 93 下一个

关注公众号“大模型全栈程序员”回复“小程序”获取1000个小程序打包源码。回复”chatgpt”获取免注册可用chatgpt。回复“大数据”获取多本大数据电子书

标签

AIGC AI创作 bert chatgpt github GPT-3 gpt3 GTP-3 hive mysql O2O tensorflow UI控件 不含后台 交流 共享经济 出行 图像 地图定位 外卖 多媒体 娱乐 小程序 布局 带后台完整项目 开源项目 搜索 支付 效率 教育 日历 机器学习 深度学习 物流 用户系统 电商 画图 画布(canvas) 社交 签到 联网 读书 资讯 阅读 预订

官方QQ群

小程序开发群:74052405

大数据开发群: 952493060

近期文章

  • 解决gitlab配置Webhooks,提示 Invalid url given的问题
  • 如何在Chrome中设置启动时自动打开多个默认网页
  • spark内存溢出怎样区分是软件还是代码原因
  • MQTT完全解析和实践
  • 解决运行Selenium报错:self.driver = webdriver.Chrome(service=service) TypeError: __init__() got an unexpected keyword argument ‘service’
  • python 3.6使用mysql-connector-python报错:SyntaxError: future feature annotations is not defined
  • 详解Python当中的pip常用命令
  • AUTOSAR如何在多个供应商交付的配置中避免ARXML不兼容?
  • C++thread pool(线程池)设计应关注哪些扩展性问题?
  • 各类MCAL(Microcontroller Abstraction Layer)如何与AUTOSAR工具链解耦?

文章归档

  • 2025年12月
  • 2025年10月
  • 2025年8月
  • 2025年7月
  • 2025年6月
  • 2025年5月
  • 2025年4月
  • 2025年3月
  • 2025年2月
  • 2025年1月
  • 2024年12月
  • 2024年11月
  • 2024年10月
  • 2024年9月
  • 2024年8月
  • 2024年7月
  • 2024年6月
  • 2024年5月
  • 2024年4月
  • 2024年3月
  • 2023年11月
  • 2023年10月
  • 2023年9月
  • 2023年8月
  • 2023年7月
  • 2023年6月
  • 2023年5月
  • 2023年4月
  • 2023年3月
  • 2023年1月
  • 2022年11月
  • 2022年10月
  • 2022年9月
  • 2022年8月
  • 2022年7月
  • 2022年6月
  • 2022年5月
  • 2022年4月
  • 2022年3月
  • 2022年2月
  • 2022年1月
  • 2021年12月
  • 2021年11月
  • 2021年9月
  • 2021年8月
  • 2021年7月
  • 2021年6月
  • 2021年5月
  • 2021年4月
  • 2021年3月
  • 2021年2月
  • 2021年1月
  • 2020年12月
  • 2020年11月
  • 2020年10月
  • 2020年9月
  • 2020年8月
  • 2020年7月
  • 2020年6月
  • 2020年5月
  • 2020年4月
  • 2020年3月
  • 2020年2月
  • 2020年1月
  • 2019年7月
  • 2019年6月
  • 2019年5月
  • 2019年4月
  • 2019年3月
  • 2019年2月
  • 2019年1月
  • 2018年12月
  • 2018年7月
  • 2018年6月

分类目录

  • Android (73)
  • bug清单 (79)
  • C++ (34)
  • Fuchsia (15)
  • php (4)
  • python (45)
  • sklearn (1)
  • 云计算 (20)
  • 人工智能 (61)
    • chatgpt (21)
      • 提示词 (6)
    • Keras (1)
    • Tensorflow (3)
    • 大模型 (1)
    • 智能体 (4)
    • 深度学习 (14)
  • 储能 (44)
  • 前端 (5)
  • 大数据开发 (497)
    • CDH (6)
    • datax (4)
    • doris (31)
    • Elasticsearch (15)
    • Flink (79)
    • flume (7)
    • Hadoop (19)
    • Hbase (23)
    • Hive (41)
    • Impala (2)
    • Java (71)
    • Kafka (10)
    • neo4j (5)
    • shardingsphere (6)
    • solr (5)
    • Spark (100)
    • spring (11)
    • 数据仓库 (9)
    • 数据挖掘 (7)
    • 海豚调度器 (10)
    • 运维 (39)
      • Docker (3)
  • 小游戏代码 (1)
  • 小程序代码 (139)
    • O2O (16)
    • UI控件 (5)
    • 互联网类 (23)
    • 企业类 (6)
    • 地图定位 (9)
    • 多媒体 (6)
    • 工具类 (25)
    • 电商类 (22)
    • 社交 (7)
    • 行业软件 (7)
    • 资讯读书 (11)
  • 嵌入式 (71)
    • autosar (63)
    • RTOS (1)
    • 总线 (1)
  • 开发博客 (16)
    • Harmony (9)
  • 技术架构 (6)
  • 数据库 (32)
    • mongodb (1)
    • mysql (13)
    • pgsql (2)
    • redis (1)
    • tdengine (4)
  • 未分类 (8)
  • 程序员网赚 (20)
    • 广告联盟 (3)
    • 私域流量 (5)
    • 自媒体 (5)
  • 量化投资 (4)
  • 面试 (14)

功能

  • 登录
  • 文章RSS
  • 评论RSS
  • WordPress.org

All Rights Reserved by Gitweixin.本站收集网友上传代码, 如有侵犯版权,请发邮件联系yiyuyos@gmail.com删除.