flink 最后一个窗口一直没有新数据,窗口不关闭问题

news/2024/6/18 3:25:21 标签: flink, 大数据

flink 最后一个窗口一直没有新数据,窗口不关闭问题

  • 自定义实现 WatermarkStrategy接口

自定义实现 WatermarkStrategy接口

代码:

    public static class WatermarkDemoFunction implements WatermarkStrategy<JSONObject>{

        private Tuple2<Long,Boolean> state = Tuple2.of(0L,true);

        @Override
        public WatermarkGenerator<JSONObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<JSONObject>() {
                private long maxWatermark;

                @Override
                public void onEvent(JSONObject waterSensor, long l, WatermarkOutput watermarkOutput) {
                    maxWatermark = Math.max(maxWatermark,waterSensor.getLong("ts"));
                    state.f0 = System.currentTimeMillis();
                    System.out.println("maxWatermark is " + maxWatermark);
                    state.f1 = false;
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                    //乱序时间
                    long outOfTime = 3000L;
                    if (maxWatermark - outOfTime <=0){
                    } else {
                        // 10s内没有数据则关闭当前窗口
                        System.out.println("System.currentTimeMillis() - state.f0:" + (System.currentTimeMillis() - state.f0));
                        System.out.println("state.f1:" + state.f1);
                        if (System.currentTimeMillis() - state.f0 >= 9000L && !state.f1){
                            watermarkOutput.emitWatermark(new Watermark(maxWatermark  + 6000L));
                            state.f1 = true;
                            System.out.println("触发窗口,maxWatermark  + 6000L:" + (maxWatermark  + 6000L));
                        } else {
                            System.out.println("正常发送水印");
                            watermarkOutput.emitWatermark(new Watermark(maxWatermark - outOfTime));
                        }
                    }
                }
            };
        }
    }

代码部分逻辑说明
在这里插入图片描述若设置了自动生成watermark 参数,根据打印日志,设置对应的时间(多久没新数据写入,触发窗口计算)
env.getConfig().setAutoWatermarkInterval(5000);

使用自定义的watermark:
在这里插入图片描述
参考:https://blog.csdn.net/lr131425/article/details/127422833


http://www.niftyadmin.cn/n/5332065.html

相关文章

RocketMQ Dashboard 详解

RocketMQ Dashboard 是 RocketMQ 的管控利器&#xff0c;为用户提供客户端和应用程序的各种事件、性能的统计信息&#xff0c;支持以可视化工具代替 Topic 配置、Broker 管理等命令行操作。 一、介绍​ 功能概览​ 面板功能运维修改nameserver 地址; 选用 VIPChannel驾驶舱查…

智能小程序相关名词解释(汇总)

小程序 ID 小程序 ID 是智能小程序分配给开发者的应用 ID&#xff0c;是应用的唯一标示&#xff0c;只有应用创建后才可以获取。创建小程序应用后&#xff0c;您可获得小程序应用的小程序 ID。 小程序框架 小程序提供一套简单高效的开发框架&#xff0c;帮助您开发具有原生 …

汽车用螺纹紧固件的拧紧力矩规范主要考虑哪些方面——SunTorque智能扭矩系统

在汽车制造过程中&#xff0c;螺纹紧固件是连接和固定各个零部件的重要元件。为了保证汽车的可靠性和安全性&#xff0c;对于螺纹紧固件的拧紧力矩有着严格的规定和规范。SunTorque智能扭矩系统和大家一起掌握这一重要知识点。 拧紧力矩是指将螺纹紧固件拧紧到预定位置所需的力…

「实战应用」如何用DHTMLX Gantt构建类似JIRA式的项目路线图(二)

DHTMLX Gantt是用于跨浏览器和跨平台应用程序的功能齐全的Gantt图表。可满足项目管理应用程序的所有需求&#xff0c;是最完善的甘特图图表库。 在web项目中使用DHTMLX Gantt时&#xff0c;开发人员经常需要满足与UI外观相关的各种需求。因此他们必须确定JavaScript甘特图库的…

变量、流程控制、游标-练习题

变量 0.准备工作 CREATE DATABASE test16_var_cur; use test16_var_cur; CREATE TABLE employees AS SELECT * FROM atguigudb.employees; CREATE TABLE departments AS SELECT * FROM atguigudb.departments;无参有返回 1. 创建函数get_count(),返回公司的员工个数 DELIMITER…

科技的成就(五十五)

519、Machine Learning "1959 年 7 月&#xff0c;塞缪尔首创 Machine Learning 一词。塞缪尔在“Some Studies in Machine Learning Using theGame of Checkers”一文中给 Machine Learning 下了个非正式定义&#xff1a;没有明确编程指令的情况下&#xff0c;能让计算机…

[go语言]输入输出

目录 知识结构 输入 1.Scan ​编辑 2.Scanf 3.Scanln 4.os.Stdin --标准输入&#xff0c;从键盘输入 输出 1.Print 2.Printf 3.Println 知识结构 输入 为了展示集中输入的区别&#xff0c;将直接进行代码演示。 三者区别的结论&#xff1a;Scanf格式化输入&#x…

实战 | OpenCV两种不同方法实现粘连大米粒分割计数(步骤 + 源码)

导 读 本文主要介绍基于OpenCV的两种不同方法实现粘连大米分割计数,并给详细步骤和源码。源码和图片素材见文末。 背景介绍 测试图如下,图中有个别米粒相互粘连,本文主要演示如何使用OpenCV用两种不同方法将其分割并计数。 方法一:基于分水岭算法 基于分水岭算法…