使用flink版本:1.13.0
最近在调试程序过程中遇到了需要针对某个特定key查看日志的需求,因为启动了两百多个TaskManager,而且没有权限访问yarn的日志,如果在web-ui上挨个去找怕是要找到猴年马月,所以我就翻了下flink的源码,在 org.apache.flink.runtime.state.KeyGroupRangeAssignment 类中找到了计算某个key分配至子任务编号的方法:
/**
* Assigns the given key to a parallel operator index.
*
* @param key the key to assign
* @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
* @param parallelism the current parallelism of the operator
* @return the index of the parallel operator to which the given key should be routed.
*/
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
Preconditions.checkNotNull(key, "Assigned key must not be null!");
return computeOperatorIndexForKeyGroup(
maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}
/**
* Computes a default maximum parallelism from the operator parallelism. This is used in case
* the user has not explicitly configured a maximum parallelism to still allow a certain degree
* of scale-up.
*
* @param operatorParallelism the operator parallelism as basis for computation.
* @return the computed default maximum parallelism.
*/
public static int computeDefaultMaxParallelism(int operatorParallelism) {
checkParallelismPreconditions(operatorParallelism);
return Math.min(
Math.max(
MathUtils.roundUpToPowerOfTwo(
operatorParallelism + (operatorParallelism / 2)),
DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
UPPER_BOUND_MAX_PARALLELISM);
}
通过调用 assignKeyToParallelOperator 方法即可计算key分配的子任务编号。
其中:
key:keyBy时使用的key
maxParallelism:最大并行度,如果没有设定可以通过调用上面的 computeDefaultMaxParallelism 方法进行计算默认最大并行度
parallelism:正常情况下为当前所有算子中的最大并行度
通过返回的子任务编号即可找到对应的算子,继而可以找到对应的TaskManager查看日志。
如图: