基于Flink实时数仓——DWS 层-地区主题表(8)

news/2024/5/20 1:56:30 标签: flink, hdfs, big data

在这里插入图片描述
这个主题使用FlinkSQL实现:数据直接从dwm_order_wide主题获取
代码实现:

public class ProvinceStatsSqlApp {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //1.1 设置CK&状态后端
        //env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-flink-210325/ck"));
        //env.enableCheckpointing(5000L);
        //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //env.getCheckpointConfig().setCheckpointTimeout(10000L);
        //env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);

        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart());

        //TODO 2.使用DDL创建表 提取时间戳生成WaterMark
        String groupId = "province_stats";
        String orderWideTopic = "dwm_order_wide";
        tableEnv.executeSql("CREATE TABLE order_wide ( " +
                "  `province_id` BIGINT, " +
                "  `province_name` STRING, " +
                "  `province_area_code` STRING, " +
                "  `province_iso_code` STRING, " +
                "  `province_3166_2_code` STRING, " +
                "  `order_id` BIGINT, " +
                "  `split_total_amount` DECIMAL, " +
                "  `create_time` STRING, " +
                "  `rt` as TO_TIMESTAMP(create_time), " +
                "  WATERMARK FOR rt AS rt - INTERVAL '1' SECOND ) with(" +
                MyKafkaUtil.getKafkaDDL(orderWideTopic, groupId) + ")");

        //TODO 3.查询数据  分组、开窗、聚合
        Table table = tableEnv.sqlQuery("select " +
                "    DATE_FORMAT(TUMBLE_START(rt, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') stt, " +
                "    DATE_FORMAT(TUMBLE_END(rt, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') edt, " +
                "    province_id, " +
                "    province_name, " +
                "    province_area_code, " +
                "    province_iso_code, " +
                "    province_3166_2_code, " +
                "    count(distinct order_id) order_count, " +
                "    sum(split_total_amount) order_amount, " +
                "    UNIX_TIMESTAMP()*1000 ts " +
                "from " +
                "    order_wide " +
                "group by " +
                "    province_id, " +
                "    province_name, " +
                "    province_area_code, " +
                "    province_iso_code, " +
                "    province_3166_2_code, " +
                "    TUMBLE(rt, INTERVAL '10' SECOND)");

        //TODO 4.将动态表转换为流
        DataStream<ProvinceStats> provinceStatsDataStream = tableEnv.toAppendStream(table, ProvinceStats.class);

        //TODO 5.打印数据并写入ClickHouse
        provinceStatsDataStream.print();
        provinceStatsDataStream.addSink(ClickHouseUtil.getSink("insert into province_stats_210325 values(?,?,?,?,?,?,?,?,?,?)"));

        //TODO 6.启动任务
        env.execute("ProvinceStatsSqlApp");

    }

}

http://www.niftyadmin.cn/n/1204453.html

相关文章

运行在不同端系统上的进程间的通信

运行在不同端系统上的进程间的通信 一个进程可以被认为是运行在端系统中的一个程序。在两个不同端系统上的进程&#xff0c;通过跨越计算机网络交换报文&#xff08;message&#xff09;而相互通信。发送进程生成并向网络中发送报文;接收进程接收这些报文并可能通过将报文发送回…

运输层协议为应用层所提供的服务(TCP、UDP)

一个运输层协议能够为调用它的应用程序提供什么样的服务呢?我们大体能够从四个方面对应用程序服务要求进行分类:可靠数据传输、吞吐量、定时和安全性。 可靠数据传输 如第1章讨论的那样&#xff0c;分组在计算机网络中可能丢失。例如&#xff0c;分组能够使路由器中的缓存溢出…

HTTP概况

Web的应用层协议是超文本传输协议&#xff08;HyperText Transfer Protocol, HTTP)&#xff0c;它是Web的核心。HTTP由两个程序实现:一个客户程序和一个服务器程序。客户程序和服务器程序运行在不同的端系统中&#xff0c;通过交换HTTP报文进行会话。HTTP定义了这些报文的结构以…

基于Flink实时数仓——DWS层-关键词主题表FlinkSQL(9)

需求分析与思路&#xff1a; 关键词主题这个主要是为了大屏展示中的字符云的展示效果&#xff0c;用于感性的让大屏观看者感知目前的用户都更关心的那些商品和关键词。 关键词的展示也是一种维度聚合的结果&#xff0c;根据聚合的大小来决定关键词的大小。 关键词的第一重要来…

Web缓存(浏览器的缓存)

Web缓存 Web缓存器( Web cache)也叫代理服务器&#xff08;proxy server),它是能够代表初始Web服务器来满足HTTP请求的网络实体。Web缓存器有自己的磁盘存储空间,并在存储空间中保存最近请求过的对象的副本。可以配置用户的浏览器,使得用户的所有HTTP请求首先指向 Web缓存器。…

文件传输协议:FTP(和HTTP的异同)

一个典型的FTP会话 用户坐在一台主机&#xff08;本地主机&#xff09;前面&#xff0c;向一台远程主机传输&#xff08;或接收来自远程主机的)文件。为使用户能访问它的远程账户&#xff0c;用户必须提供一个用户标识和口令。在提供了这种授权信息后&#xff0c;用户就能从本…

SMTP说明(与HTTP的对比)

SMTP简单说明 下图是因特网电子邮件系统的总体情况&#xff1a; 从该图中我们可以看到它有3个主要组成部分:用户代理&#xff08;user agent)、邮件服务器&#xff08;mail server&#xff09;和简单邮件传输协议( Simple Mail Transfer Protocol &#xff0c;SMTP)。用户代…

关于DNS

对于机器而言区分它们的方式是IP地址或者主机名(hostname),前者有严格的层次结构&#xff0c;更容易被路由器区分&#xff0c;后者主机名如“www.baidu.com”更容易被人区分。那么DNS就是用来进行将主机名(hostname)和IP地址进行转换的。 DNS是什么&#xff1f; 是&#xff1…