度量
Flink公开了一个度量系统,允许收集和公开指标到外部系统。
注册指标
您可以通过调用从扩展RichFunction的任何用户函数访问度量标准系统getRuntimeContext().getMetricGroup()
。此方法返回一个MetricGroup
对象,您可以在该对象上创建和注册新指标。
度量类型
Flink支持Counters
,Gauges
,Histograms
和Meters
。
计数器
A Counter
用于计算某些东西。可以使用inc()/inc(long n)
或来Reduce当前值dec()/dec(long n)
。您可以创建并注册Counter
调用counter(String name)
上MetricGroup
。
public class MyMapper extends RichMapFunction<String, String> {
private transient Counter counter;
@Override
public void open(Configuration config) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
}
@Override
public String map(String value) throws Exception {
this.counter.inc();
return value;
}
}
class MyMapper extends RichMapFunction[String,String] {
@transient private var counter: Counter = _
override def open(parameters: Configuration): Unit = {
counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter")
}
override def map(value: String): String = {
counter.inc()
value
}
}
或者,您也可以使用自己的Counter
实现:
public class MyMapper extends RichMapFunction<String, String> {
private transient Counter counter;
@Override
public void open(Configuration config) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCustomCounter", new CustomCounter());
}
@Override
public String map(String value) throws Exception {
this.counter.inc();
return value;
}
}
class MyMapper extends RichMapFunction[String,String] {
@transient private var counter: Counter = _
override def open(parameters: Configuration): Unit = {
counter = getRuntimeContext()
.getMetricGroup()
.counter("myCustomCounter", new CustomCounter())
}
override def map(value: String): String = {
counter.inc()
value
}
}
测量
A根据需要Gauge
提供任何类型的值。为了使用a,Gauge
您必须首先创建一个实现该org.apache.flink.metrics.Gauge
接口的类。返回值的类型没有限制。你可以通过调用注册一个计gauge(String name, Gauge gauge)
上MetricGroup
。
public class MyMapper extends RichMapFunction<String, String> {
private transient int valueToExpose = 0;
@Override
public void open(Configuration config) {
getRuntimeContext()
.getMetricGroup()
.gauge("MyGauge", new Gauge<Integer>() {
@Override
public Integer getValue() {
return valueToExpose;
}
});
}
@Override
public String map(String value) throws Exception {
valueToExpose++;
return value;
}
}
new class MyMapper extends RichMapFunction[String,String] {
@transient private var valueToExpose = 0
override def open(parameters: Configuration): Unit = {
getRuntimeContext()
.getMetricGroup()
.gauge[Int, ScalaGauge[Int]]("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
}
override def map(value: String): String = {
valueToExpose += 1
value
}
}
请注意,报告会将公开的对象转换为a String
,这意味着需要进行有意义的toString()
实现。
直方图
A Histogram
衡量长值的分布。你可以通过调用注册一个histogram(String name, Histogram histogram)
上一个MetricGroup
。
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Histogram histogram;
@Override
public void open(Configuration config) {
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram());
}
@Override
public Long map(Long value) throws Exception {
this.histogram.update(value);
return value;
}
}
class MyMapper extends RichMapFunction[Long,Long] {
@transient private var histogram: Histogram = _
override def open(parameters: Configuration): Unit = {
histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram())
}
override def map(value: Long): Long = {
histogram.update(value)
value
}
}
Flink没有提供默认实现Histogram
,但提供了一个允许使用Codahale / DropWizard直方图的Wrapper。要使用此打包,请在以下内容中添加以下依赖项pom.xml
:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>1.7-SNAPSHOT</version>
</dependency>
然后你可以像这样注册一个Codahale / DropWizard直方图:
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Histogram histogram;
@Override
public void open(Configuration config) {
com.codahale.metrics.Histogram dropwizardHistogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram));
}
@Override
public Long map(Long value) throws Exception {
this.histogram.update(value);
return value;
}
}
class MyMapper extends RichMapFunction[Long, Long] {
@transient private var histogram: Histogram = _
override def open(config: Configuration): Unit = {
com.codahale.metrics.Histogram dropwizardHistogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))
histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram))
}
override def map(value: Long): Long = {
histogram.update(value)
value
}
}
仪表
A Meter
衡量平均吞吐量。可以使用该markEvent()
方法注册事件的发生。可以使用markEvent(long n)
方法注册同时发生多个事件。你可以通过调用注册一个仪表meter(String name, Meter meter)
上MetricGroup
。
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Meter meter;
@Override
public void open(Configuration config) {
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter());
}
@Override
public Long map(Long value) throws Exception {
this.meter.markEvent();
return value;
}
}
class MyMapper extends RichMapFunction[Long,Long] {
@transient private var meter: Meter = _
override def open(config: Configuration): Unit = {
meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter())
}
override def map(value: Long): Long = {
meter.markEvent()
value
}
}
Flink提供了一个允许使用Codahale / DropWizard表的打包器。要使用此打包,请在以下内容中添加以下依赖项pom.xml
:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>1.7-SNAPSHOT</version>
</dependency>
然后你可以像这样注册一个Codahale / DropWizard仪表:
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Meter meter;
@Override
public void open(Configuration config) {
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter));
}
@Override
public Long map(Long value) throws Exception {
this.meter.markEvent();
return value;
}
}
class MyMapper extends RichMapFunction[Long,Long] {
@transient private var meter: Meter = _
override def open(config: Configuration): Unit = {
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter()
meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter))
}
override def map(value: Long): Long = {
meter.markEvent()
value
}
}
范围
为每个度量标准分配一个标识符和一组键值对,在该键值对下将报告度量标准。
标识符基于3个组件:注册度量标准时的用户定义名称,可选的用户定义范围和系统提供的范围。例如,如果A.B
是系统范围,C.D
用户范围和E
名称,则度量标识符将是A.B.C.D.E
。
您可以.
通过设置metrics.scope.delimiter
Keys来配置要用于标识符的分隔符(默认值:) conf/flink-conf.yaml
。
用户范围
你可以通过调用定义用户范围MetricGroup#addGroup(String name)
,MetricGroup#addGroup(int name)
或Metric#addGroup(String key, String value)
。这些方法影响什么MetricGroup#getMetricIdentifier
和MetricGroup#getScopeComponents
返回。
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetrics")
.counter("myCounter");
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter");
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetrics")
.counter("myCounter")
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter")
系统范围
系统范围包含有关度量标准的上下文信息,例如,它在哪个任务中注册或该任务属于哪个作业。
可以通过设置以下键来配置应包含哪些上下文信息conf/flink-conf.yaml
。这些键中的每一个都期望一个格式字符串可能包含常量(例如“taskmanager”)和变量(例如“<task_id>”),它们将在运行时被替换。
metrics.scope.jm
- 默认值:<host> .jobmanager
- 应用于作用域JobManager的所有指标。
metrics.scope.jm.job
- 默认值:<host> .jobmanager。<job_name>
- 应用于作用于JobManager和作业的所有度量标准。
metrics.scope.tm
- 默认值:<host> .taskmanager。<tm_id>
- 应用于作用于TaskManager的所有度量标准。
metrics.scope.tm.job
- 默认值:<host> .taskmanager。<tm_id>。<job_name>
- 应用于作用于TaskManager和作业的所有度量标准。
metrics.scope.task
- 默认值:<host> .taskmanager。<tm_id>。<job_name>。<task_name>。<subtask_index>
- 应用于作用于任务的所有指标。
metrics.scope.operator
- 默认值:<host> .taskmanager。<tm_id>。<job_name>。<operator_name>。<subtask_index>
- 应用于作用于算子的所有指标。
变量的数量或顺序没有限制。变量区分大小写。
算子指标的默认范围将产生类似于的标识符 localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric
如果您还想包含任务名称但省略TaskManager信息,则可以指定以下格式:
metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>
这可以创建标识符localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric
。
请注意,对于此格式字符串,如果同时多次运行同一作业,则可能发生标识符冲突,这可能导致度量标准数据不一致。因此,建议使用通过包含ID(例如<job_id>)或通过为作业和 算子分配唯一名称来提供一定程度的唯一性的格式字符串。
所有变量列表
- JobManager:<host>
- TaskManager:<host>,<tm_id>
- 作业:<job_id>,<作业名称>
- 任务:<task_id>,<task_name>,<task_attempt_id>,<task_attempt_num>,<subtask_index>
- 算子:<operator_id>,<operator_name>,<subtask_index>
要点:对于Batch API,<operator_id>始终等于<task_id>。
用户变量
您可以通过调用来定义用户变量MetricGroup#addGroup(String key, String value)
。这种方法会影响什么MetricGroup#getMetricIdentifier
,MetricGroup#getScopeComponents
并MetricGroup#getAllVariables()
返回。
重要提示:用户变量不能用于范围格式。
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter");
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter")
报告
通过配置一个或多个报告,可以将度量标准暴露给外部系统conf/flink-conf.yaml
。这些报告将在每个工作和TaskManager启动时进行实例化。
metrics.reporter.<name>.<config>
:<config>
报告的通用设置命名<name>
。metrics.reporter.<name>.class
:报告类用于为报告命名<name>
。metrics.reporter.<name>.interval
:报告间隔用于报告的名字<name>
。metrics.reporter.<name>.scope.delimiter
:用于名称的报告者的标识符(默认值使用metrics.scope.delimiter
)的分隔符<name>
。metrics.reporters
:(可选)以逗号分隔的包含报告名称列表。默认情况下,将使用所有已配置的报告。
所有报告必须至少拥有该class
财产,其中一些允许指定报告interval
。下面,我们将列出针对每位报告的更多设置。
示例报表配置,指定多个报告:
metrics.reporters: my_jmx_reporter,my_other_reporter
metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000
重要说明:启动Flink时,通过将其放在/ lib文件夹中,可以访问包含报告者的jar。
您可以Reporter
通过实现org.apache.flink.metrics.reporter.MetricReporter
接口编写自己的。如果Reporter应定期发送报告,您还必须实现该Scheduled
接口。
以下部分列出了受支持的报告。
JMX(org.apache.flink.metrics.jmx.JMXReporter)
您不必包含其他依赖项,因为默认情况下JMX报告器可用但未激活。
参数:
port
- (可选)JMX侦听连接的端口。为了能够在一个主机上运行多个报告实例(例如,当一个TaskManager与JobManager共同使用时),建议使用类似的端口范围9250-9260
。指定范围时,实际端口将显示在相关作业或TaskManager日志中。如果设置此设置,Flink将为给定的端口/范围启动额外的JMX连接器。度量标准始终在默认的本地JMX界面上可用。
配置示例:
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789
通过JMX公开的度量标准由域和键属性列表标识,这些键属性一起形成对象名称。
域始终以org.apache.flink
广义度量标识符开头。与通常的标识符相反,它不受作用域格式的影响,不包含任何变量,并且在作业中保持不变。这种域的一个例子是org.apache.flink.job.task.numBytesOut
。
键属性列表包含与给定度量关联的所有变量的值,无论配置的范围格式如何。这样一个列表的一个例子是host=localhost,job_name=MyJob,task_name=MyTask
。
因此,域标识度量标准类,而关键属性列表标识该度量标准的一个(或多个)实例。
Ganglia(org.apache.flink.metrics.ganglia.GangliaReporter)
要使用此报告,您必须复制/opt/flink-metrics-ganglia-1.7-SNAPSHOT.jar
到/lib
Flink发行版的文件夹中。
参数:
host
-下配置的的gmond主机地址udp_recv_channel.bind
在gmond.conf
port
-下配置的端口的gmondudp_recv_channel.port
在gmond.conf
tmax
- 应保存旧度量标准的软限制dmax
- 应保存旧指标多长时间的硬限制ttl
- 传输的UDP数据包的生存时间addressingMode
- 要使用的UDP寻址模式(UNICAST / MULTICAST)
配置示例:
metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
metrics.reporter.gang.host: localhost
metrics.reporter.gang.port: 8649
metrics.reporter.gang.tmax: 60
metrics.reporter.gang.dmax: 0
metrics.reporter.gang.ttl: 1
metrics.reporter.gang.addressingMode: MULTICAST
Graphite(org.apache.flink.metrics.graphite.GraphiteReporter)
要使用此报告,您必须复制/opt/flink-metrics-graphite-1.7-SNAPSHOT.jar
到/lib
Flink发行版的文件夹中。
参数:
host
- Graphite服务器主机port
- Graphite服务器端口protocol
- 使用协议(TCP / UDP)
配置示例:
metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.grph.host: localhost
metrics.reporter.grph.port: 2003
metrics.reporter.grph.protocol: TCP
Prometheus (org.apache.flink.metrics.prometheus.PrometheusReporter)
要使用此报告,您必须复制/opt/flink-metrics-prometheus-1.7-SNAPSHOT.jar
到/lib
Flink发行版的文件夹中。
参数:
port
- (可选)Prometheus导出器侦听的端口,默认为9249。为了能够在一个主机上运行多个报告实例(例如,当一个TaskManager与JobManager共同使用时),建议使用类似的端口范围9250-9260
。
配置示例:
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
Flink度量标准类型映射到Prometheus度量标准类型,如下所示:
Flink | Prometheus | 注意 |
---|---|---|
计数器 | 测量 | Prometheus 计数器不能Reduce。 |
测量 | 测量 | 仅支持数字和布尔值。 |
直方图 | 概要 | 分位数.5,.75,.95,.98,.99和.999 |
仪表 | 测量 | 仪表输出仪表的速率。 |
所有Flink度量变量(请参阅所有变量列表)都将作为标签导出到Prometheus。
PrometheusPushGateway(org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter)
要使用此报告,您必须复制/opt/flink-metrics-prometheus-1.7-SNAPSHOT.jar
到/lib
Flink发行版的文件夹中。
参数:
键 | 默认 | 描述 |
---|---|---|
deleteOnShutdown |
true | 指定是否在关闭时从PushGateway中删除指标。 |
Host |
(none) | PushGateway服务器主机。 |
jobName |
(none) | 将推送指标的作业名称 |
port |
-1 | PushGateway服务器端口。 |
randomJobNameSuffix |
true | 指定是否应将随机后缀附加到作业名称。 |
配置示例:
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
PrometheusPushGatewayReporter将指标推送到Pushgateway,可由Prometheus 抓取。
有关用例,请参阅Prometheus文档。
StatsD(org.apache.flink.metrics.statsd.StatsDReporter)
要使用此报告,您必须复制/opt/flink-metrics-statsd-1.7-SNAPSHOT.jar
到/lib
Flink发行版的文件夹中。
参数:
host
- StatsD服务器主机port
- StatsD服务器端口
配置示例:
metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125
Datadog(org.apache.flink.metrics.datadog.DatadogHttpReporter)
要使用此报告,您必须复制/opt/flink-metrics-datadog-1.7-SNAPSHOT.jar
到/lib
Flink发行版的文件夹中。
注意Flink指标,如任何变量<host>
,<job_name>
,<tm_id>
,<subtask_index>
,<task_name>
,和<operator_name>
,将被发送到Datadog的标签。标签看起来像host:localhost
和job_name:myjobname
。
参数:
apikey
- Datadog APIKeystags
- (可选)发送到Datadog时将应用于度量标准的全局标记。标签应仅以逗号分隔
配置示例:
metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.apikey: xxx
metrics.reporter.dghttp.tags: myflinkapp,prod
Slf4j(org.apache.flink.metrics.slf4j.Slf4jReporter)
要使用此报告,您必须复制/opt/flink-metrics-slf4j-1.7-SNAPSHOT.jar
到/lib
Flink发行版的文件夹中。
配置示例:
metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 60 SECONDS
系统指标
默认情况下,Flink会收集几个指标,这些指标可以提供有关当前状态的深入见解。本节是所有这些指标的参考。
下表通常包含5列:
-
“范围”列描述了用于生成系统范围的范围格式。例如,如果单元格包含“Operator”,则使用“metrics.scope.operator”的范围格式。如果单元格包含多个值(以斜杠分隔),则会针对不同的实体多次报告度量标准,例如作业和TaskManager。
-
(可选)“Infix”列描述了哪个中缀附加到系统范围。
-
“度量标准”列列出了为给定范围和中缀注册的所有度量标准的名称。
-
“描述”列提供有关给定度量正在测量的信息。
-
“类型”列描述了用于测量的度量类型。
请注意,中缀/指标名称列中的所有点仍受“metrics.delimiter”设置的约束。
因此,为了推断度量标识符:
- 根据“范围”列获取范围格式
- 如果存在,则将值附加到“中缀”列中,并考虑“metrics.delimiter”设置
- 附加指标名称。
中央处理器
范围 | 中缀 | 度量 | 描述 | 类型 |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.CPU | 加载 | JVM最近的CPU使用情况。 | 测量 |
时间 | JVM使用的CPU时间。 | 测量 |
内存
范围 | 中缀 | 度量 | 描述 | 类型 |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.Memory | Heap.Used | 当前使用的堆内存量(以字节为单位)。 | 测量 |
Heap.Committed | 保证可供JVM使用的堆内存量(以字节为单位)。 | 测量 | ||
Heap.Max | 可用于内存管理的最大堆内存量(以字节为单位)。 | 测量 | ||
NonHeap.Used | 当前使用的非堆内存量(以字节为单位)。 | 测量 | ||
NonHeap.Committed | 保证JVM可用的非堆内存量(以字节为单位)。 | 测量 | ||
NonHeap.Max | 可用于内存管理的最大非堆内存量(以字节为单位)。 | 测量 | ||
Direct.Count | 直接缓冲池中的缓冲区数。 | 测量 | ||
Direct.MemoryUsed | JVM用于直接缓冲池的内存量(以字节为单位)。 | 测量 | ||
Direct.TotalCapacity | 直接缓冲池中所有缓冲区的总容量(以字节为单位)。 | 测量 | ||
Mapped.Count | 映射缓冲池中的缓冲区数。 | 测量 | ||
Mapped.MemoryUsed | JVM用于映射缓冲池的内存量(以字节为单位)。 | 测量 | ||
Mapped.TotalCapacity | 映射缓冲池中的缓冲区数(以字节为单位)。 | 测量 |
线程
范围 | 中缀 | 度量 | 描述 | 类型 |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.Threads | 计数 | 活动线程总数。 | 测量 |
垃圾收集
范围 | 中缀 | 度量 | 描述 | 类型 |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.GarbageCollector | <GarbageCollector> .Count之间 | 已发生的集合总数。 | 测量 |
<GarbageCollector>。时间 | 执行垃圾收集所花费的总时间。 | 测量 |
类加载器
范围 | 中缀 | 度量 | 描述 | 类型 |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.ClassLoader | ClassesLoaded | 自JVM启动以来加载的类总数。 | 测量 |
ClassesUnloaded | 自JVM启动以来卸载的类总数。 | 测量 |
网络
范围 | 中缀 | 度量 | 描述 | 类型 |
---|---|---|---|---|
TaskManager | Status.Network | AvailableMemorySegments | 未使用的内存段数。 | 测量 |
TotalMemorySegments | 分配的内存段数。 | 测量 | ||
Task | buffers | inputQueueLength | 排队的输入缓冲区数。 | 测量 |
outputQueueLength | 排队输出缓冲区的数量。 | 测量 | ||
inPoolUsage | 估计输入缓冲区的使用情况。 | 测量 | ||
outPoolUsage | 估计输出缓冲区的使用情况。 | 测量 | ||
Network.<Input|Output>.<gate>(only available if taskmanager.net.detailed-metrics config option is set) | totalQueueLen | 所有输入/输出通道中排队缓冲区的总数。 | 测量 | |
minQueueLen | 所有输入/输出通道中的最小排队缓冲区数。 | 测量 | ||
maxQueueLen | 所有输入/输出通道中的最大排队缓冲区数。 | 测量 | ||
avgQueueLen | 所有输入/输出通道中的平均缓冲区数。 | 测量 |
集群
范围 | 度量 | 描述 | 类型 |
---|---|---|---|
JobManager | numRegisteredTaskManagers | 注册任务管理员的数量。 | 测量 |
numRunningJobs | 正在运行的作业数量。 | 测量 | |
taskSlotsAvailable | 可用任务槽的数量。 | 测量 | |
taskSlotsTotal | 任务槽的总数。 | 测量 |
可用性
范围 | 度量 | 描述 | 类型 |
---|---|---|---|
Job (only available on JobManager) | restartingTime | 重新启动作业所花费的时间,或当前重新启动的持续时间(以毫秒为单位)。 | 测量 |
uptime | 作业运行的时间不间断。对于已完成的作业,返回-1(以毫秒为单位)。 | 测量 | |
downtime | 对于当前处于故障/恢复状态的作业,在此中断期间经过的时间。对于正在运行的作业返回0,对于已完成的作业返回-1(以毫秒为单位)。 | 测量 | |
fullRestarts | 自提交此作业以来完全重新启动的总次数。 | 测量 |
检查点
范围 | 度量 | 描述 | 类型 |
---|---|---|---|
Job (only available on JobManager) | lastCheckpointDuration | 完成最后一个检查点所花费的时间(以毫秒为单位)。 | 测量 |
lastCheckpointSize | 最后一个检查点的总大小(以字节为单位)。 | 测量 | |
lastCheckpointExternalPath | 存储最后一个外部检查点的路径。 | 测量 | |
lastCheckpointRestoreTimestamp | 在协调器上恢复最后一个检查点时的时间戳(以毫秒为单位)。 | 测量 | |
lastCheckpointAlignmentBuffered | 在最后一个检查点的所有子任务上进行对齐期间的缓冲字节数(以字节为单位)。 | 测量 | |
numberOfInProgressCheckpoints | 进行中检查点的数量。 | 测量 | |
numberOfCompletedCheckpoints | 成功完成检查点的数量。 | 测量 | |
numberOfFailedCheckpoints | 失败检查点的数量。 | 测量 | |
totalNumberOfCheckpoints | 总检查点的数量(正在进行,已完成,失败)。 | 测量 | |
Task | checkpointAlignmentTime | 最后一次屏障对齐完成所花费的时间(以纳秒为单位),或当前对齐到目前为止所用的时间(以纳秒为单位)。 | 测量 |
IO
范围 | 度量 | 描述 | 类型 |
---|---|---|---|
Job (only available on TaskManager) | <SOURCE_ID> <source_subtask_index> <operator_id> <operator_subtask_index> .latency | 从给定源子任务到算子子任务的延迟分布(以毫秒为单位)。 | 直方图 |
任务 | numBytesInLocal | 此任务从本地源读取的总字节数。 | 计数器 |
numBytesInLocalPerSecond | 此任务每秒从本地源读取的字节数。 | 仪表 | |
numBytesInRemote | 此任务从远程源读取的总字节数。 | 计数器 | |
numBytesInRemotePerSecond | 此任务每秒从远程源读取的字节数。 | 仪表 | |
numBuffersInLocal | 此任务从本地源读取的网络缓冲区总数。 | 计数器 | |
numBuffersInLocalPerSecond | 此任务每秒从本地源读取的网络缓冲区数。 | 仪表 | |
numBuffersInRemote | 此任务从远程源读取的网络缓冲区总数。 | 计数器 | |
numBuffersInRemotePerSecond | 此任务每秒从远程源读取的网络缓冲区数。 | 仪表 | |
numBytesOut | 此任务已发出的总字节数。 | 计数器 | |
numBytesOutPerSecond | 此任务每秒发出的字节数。 | 仪表 | |
numBuffersOut | 此任务已发出的网络缓冲区总数。 | 计数器 | |
numBuffersOutPerSecond | 此任务每秒发出的网络缓冲区数。 | 仪表 | |
任务/算子 | numRecordsIn | 此 算子/任务已收到的记录总数。 | 计数器 |
numRecordsInPerSecond | 此 算子/任务每秒接收的记录数。 | 仪表 | |
numRecordsOut | 此 算子/任务已发出的记录总数。 | 计数器 | |
numRecordsOutPerSecond | 此 算子/任务每秒发送的记录数。 | 仪表 | |
numLateRecordsDropped | 此算子/任务因迟到而丢失的记录数。 | 计数器 | |
currentInputWatermark | 此 算子/任务收到的最后一个水印(以毫秒为单位)。注意:对于具有2个输入的算子/任务,这是最后收到的水印的最小值。 | 测量 | |
算子 | currentInput1Watermark | 此 算子在其第一个输入(毫秒)中收到的最后一个水印。注意:仅适用于具有2个输入的算子。 | 测量 |
currentInput2Watermark | 此 算子在其第二个输入中接收的最后一个水印(以毫秒为单位)。注意:仅适用于具有2个输入的算子。 | 测量 | |
currentOutputWatermark | 此 算子发出的最后一个水印(以毫秒为单位)。 | 测量 | |
numSplitsProcessed | 此数据源已处理的InputSplits总数(如果 算子是数据源)。 | 测量 |
连接器
Kafka连接器
范围 | 度量 | 用户变量 | 描述 | 类型 |
---|---|---|---|---|
算子 | commitsSucceeded | N / A | 如果启用了偏移提交并且启用了检查点,则成功向Kafka提交的偏移提交总数。 | 计数器 |
算子 | commitsFailed | N / A | 如果启用了偏移提交并且启用了检查点,则Kafka的偏移提交失败总数。请注意,将偏移量提交回Kafka只是暴露消费者进度的一种方法,因此提交失败不会影响Flink的检查点分区偏移的完整性。 | 计数器 |
算子 | committedOffsets | Topic,分区 | 对于每个分区,最后成功提交到Kafka的偏移量。可以通过主题名称和分区ID指定特定分区的度量标准。 | 测量 |
算子 | currentOffsets | Topic,分区 | 消费者对每个分区的当前读取偏移量。可以通过主题名称和分区ID指定特定分区的度量标准。 | 测量 |
Kinesis连接器
范围 | 度量 | 用户变量 | 描述 | 类型 |
---|---|---|---|---|
算子 | millisBehindLatest | stream,shardId | 对于每个Kinesis分片,消费者在流的头部后面的毫秒数,表示消费者当前时间落后多少。可以通过流名称和分片标识指定特定分片的度量标准。值为0表示记录处理被捕获,此时没有要处理的新记录。值-1表示该度量标准尚未报告。 | 测量 |
算子 | sleepTimeMillis | stream,shardId | 消费者在从Kinesis获取记录之前花费的毫秒数。可以通过流名称和分片标识指定特定分片的度量标准。 | 测量 |
算子 | maxNumberOfRecordsPerFetch | stream,shardId | 消费者在单个getRecords调用Kinesis时请求的最大记录数。如果ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS设置为true,则自适应地计算此值以最大化Kinesis的2 Mbps读取限制。 | 测量 |
算子 | numberOfAggregatedRecordsPerFetch | stream,shardId | 消费者在单个getRecords调用Kinesis时获取的聚合Kinesis记录数。 | 测量 |
算子 | numberOfDeggregatedRecordsPerFetch | stream,shardId | 消费者在单个getRecords调用Kinesis时获取的分解Kinesis记录的数量。 | 测量 |
算子 | averageRecordSizeBytes | stream,shardId | Kinesis记录的平均大小(以字节为单位),由消费者在单个getRecords调用中获取。 | 测量 |
算子 | runLoopTimeNanos | stream,shardId | 消费者在运行循环中花费的实际时间(以纳秒为单位)。 | 测量 |
算子 | loopFrequencyHz | stream,shardId | 一秒钟内调用getRecords的次数。 | 测量 |
算子 | bytesRequestedPerFetch | stream,shardId | 在一次调用getRecords中请求的字节数(2 Mbps / loopFrequencyHz)。 | 测量 |
系统资源
默认情况下禁用系统资源报告。当metrics.system-resource
启用下面列出的指标将是可利用的作业-与TaskManager。系统资源度量标准会定期更新,并显示已配置间隔(metrics.system-resource-probing-interval
)的平均值。
系统资源报告要求在类路径上存在可选的依赖项(例如,放在Flink的lib
目录中):
com.github.oshi:oshi-core:3.4.0
(根据EPL 1.0许可证授权)
包括它的传递依赖:
net.java.dev.jna:jna-platform:jar:4.2.2
net.java.dev.jna:jna:jar:4.2.2
这方面的失败将被报告为启动期间NoClassDefFoundError
记录的警告消息SystemResourcesMetricsInitializer
。
系统CPU
范围 | 中缀 | 度量 | 描述 |
---|---|---|---|
Job-/TaskManager | System.CPU | 用法 | 机器上CPU使用率的总体百分比。 |
闲 | 机器上CPU空闲使用率的百分比。 | ||
SYS | 计算机上系统CPU使用率的百分比。 | ||
用户 | 计算机上用户CPU使用率的百分比。 | ||
IOWAIT | 计算机上IOWait CPU使用率的百分比。 | ||
IRQ | 机器上Irq CPU使用率的百分比。 | ||
软中断 | 计算机上SoftIrq CPU使用率的百分比。 | ||
尼斯 | 在机器上使用Nice Idle的百分比。 | ||
Load1min | 平均CPU负载超过1分钟 | ||
Load5min | 平均CPU负载超过5分钟 | ||
Load15min | 平均CPU负载超过15分钟 | ||
UsageCPU * | 每个处理器的CPU使用率百分比 |
系统内存
范围 | 中缀 | 度量 | 描述 |
---|---|---|---|
Job-/TaskManager | System.Memory | 可得到 | 可用内存字节数 |
总 | 总内存(字节) | ||
System.Swap | 用过的 | 使用的交换字节 | |
总 | 总交换字节数 |
系统网络
范围 | 中缀 | 度量 | 描述 |
---|---|---|---|
Job-/TaskManager | System.Network.INTERFACE_NAME | ReceiveRate | 平均接收速率,以每秒字节数为单位 |
SendRate | 平均发送速率,以字节/秒为单位 |
延迟跟踪
Flink允许跟踪通过系统传输的记录的延迟。默认情况下禁用此函数。为了使延迟跟踪你必须设置latencyTrackingInterval
在无论是正数 Flink配置或ExecutionConfig
。
在latencyTrackingInterval
,源将定期发出一个特殊的记录,称为LatencyMarker
。标记包含从源发出记录时的时间戳。延迟标记不能超过常规用户记录,因此如果记录在算子面前排队,则会增加标记跟踪的延迟。
请注意,延迟标记不会考虑用户记录在算子中绕过它们的时间。特别是标记不考虑记录在窗口缓冲区中花费的时间。只有当算子无法接受新记录,因此他们排队时,使用标记测量的延迟才会反映出来。
所有中间 算子都会保存n
每个源的最后一个延迟列表,以计算延迟分布。接收器算子保存每个源的列表,以及每个并行源实例,以允许检测由各个机器引起的延迟问题。
目前,Flink假定群集中所有计算机的时钟都是同步的。我们建议设置自动时钟同步服务(如NTP)以避免错误的延迟结果。
警告启用延迟指标可能会显着影响群集的性能。强烈建议仅将它们用于调试目的。
REST API集成
可以通过Monitoring REST API查询度量标准。
下面是可用端点列表,带有示例JSON响应。所有端点都是样本表单http://hostname:8081/jobmanager/metrics
,下面我们仅列出URL 的_路径_部分。
尖括号中的值是变量,例如http://hostname:8081/jobs/<jobid>/metrics
,必须请求变量http://hostname:8081/jobs/7684be6004e4e955c2a558a9bc463f65/metrics
。
请求特定实体的指标:
/jobmanager/metrics
/taskmanagers/<taskmanagerid>/metrics
/jobs/<jobid>/metrics
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>
请求在相应类型的所有实体之间聚合的指标:
/taskmanagers/metrics
/jobs/metrics
/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics
请求在相应类型的所有实体的子集上聚合的度量标准:
/taskmanagers/metrics?taskmanagers=A,B,C
/jobs/metrics?jobs=D,E,F
/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3
请求可用指标列表:
GET /jobmanager/metrics
[ { "id": "metric1" }, { "id": "metric2" } ]
请求特定(未聚合)指标的值:
GET taskmanagers/ABCDE/metrics?get=metric1,metric2
[ { "id": "metric1", "value": "34" }, { "id": "metric2", "value": "2" } ]
请求特定指标的汇总值:
GET /taskmanagers/metrics?get=metric1,metric2
[ { "id": "metric1", "min": 1, "max": 34, "avg": 15, "sum": 45 }, { "id": "metric2", "min": 2, "max": 14, "avg": 7, "sum": 16 } ]
请求特定指标的特定聚合值:
GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max
[ { "id": "metric1", "min": 1, "max": 34, }, { "id": "metric2", "min": 2, "max": 14, } ]
仪表板集成
为每个任务或算子收集的度量标准也可以在仪表板中显示。在作业的主页面上,选择Metrics
选项卡。选择顶部图表中的一个任务后,您可以使用Add Metric
下拉菜单选择要显示的指标。
- 任务指标列为
<subtask_index>.<metric_name>
。 - 算子指标列为
<subtask_index>.<operator_name>.<metric_name>
。
每个度量将可视化为单独的图形,x轴表示时间,y轴表示测量值。所有图表每10秒自动更新一次,并在导航到另一页时继续更新。
可视化指标的数量没有限制; 但是只能显示数字指标。