使用flink版本:1.13.0

最近在调试程序过程中遇到了需要针对某个特定key查看日志的需求,因为启动了两百多个TaskManager,而且没有权限访问yarn的日志,如果在web-ui上挨个去找怕是要找到猴年马月,所以我就翻了下flink的源码,在 org.apache.flink.runtime.state.KeyGroupRangeAssignment 类中找到了计算某个key分配至子任务编号的方法:

/**
  * Assigns the given key to a parallel operator index.
  *
  * @param key the key to assign
  * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
  * @param parallelism the current parallelism of the operator
  * @return the index of the parallel operator to which the given key should be routed.
  */

public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
    Preconditions.checkNotNull(key, "Assigned key must not be null!");
    return computeOperatorIndexForKeyGroup(
            maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}

/**
 * Computes a default maximum parallelism from the operator parallelism. This is used in case
 * the user has not explicitly configured a maximum parallelism to still allow a certain degree
 * of scale-up.
 *
 * @param operatorParallelism the operator parallelism as basis for computation.
 * @return the computed default maximum parallelism.
 */
public static int computeDefaultMaxParallelism(int operatorParallelism) {
    checkParallelismPreconditions(operatorParallelism);
    return Math.min(
            Math.max(
                    MathUtils.roundUpToPowerOfTwo(
                            operatorParallelism + (operatorParallelism / 2)),
                    DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
            UPPER_BOUND_MAX_PARALLELISM);
}

通过调用 assignKeyToParallelOperator 方法即可计算key分配的子任务编号。
其中:
key:keyBy时使用的key
maxParallelism:最大并行度,如果没有设定可以通过调用上面的 computeDefaultMaxParallelism 方法进行计算默认最大并行度
parallelism:正常情况下为当前所有算子中的最大并行度

通过返回的子任务编号即可找到对应的算子,继而可以找到对应的TaskManager查看日志。

如图:
F581FA04F96D4AFB96CF6B5BEB725594.png