HDFS troubleshooting – why does a tier gets blacklisted?

13/05/2019

HDFS troubleshooting – why does a tier gets blacklisted?

Over the last two years, G-Research has embraced Opensource technologies like Hadoop and Spark. One of the benefits of Opensource technology is that having access to source code can make debugging complex problems a little easier. In this blog post I’ll describe how we used the HDFS source code to identify the root cause of a problem where applications in our production Hadoop cluster would intermittently fail with HDFS write errors:

java.io.IOException:
File could only be replicated to 0 nodes instead of minReplication (=).
There are datanode(s) running and no node(s) are excluded in this operation.

And it happened very often:

The files on HDFS are divided into blocks. To maintain availability the same block is stored on different Datanodes and their number is determined by the replication factor. The above message indicates that the block could not be replicated and therefore the write fails. All of the Datanodes were healthy, we had not seen any network issues that would prevent the communication between the Namenodes and the Datanodes. Everything seemed alright, however the write failures were rising. Reviewing the Datanode logs also did not reveal any additional information; the Datanodes were heartbeating regularly and had plenty of free space available in their data directories.

Initial findings

However, while we were trying to reproduce the write failures in a more controlled way we noticed that it only happened to specific folders on HDFS. It was rather suspicious that each of the affected target folders had the same storage policy setting.

HDFS supports multiple predefined storage types called RAM_DISK, SSD, DISK, ARCHIVE distinguished by performance, capacity, cost etc. Allocation of blocks to storage types is controlled by a set of predefined storage policies. The default policy is HOT (all replicas are placed on the DISK storage type). [1]

The storage policy for the affected folders identified the storage types that were having problems and in turn to the Datanodes to suspect. Our initial thought was that somehow we were losing those nodes, so the tier became unavailable. We decided to restart those Datanodes one by one to see how it affected the situation and to see if we could spot something unusual in the Datanode logs. The intermittent write problem magically disappeared and everything went back to normal. However, within about 10 minutes we were seeing failures again.

The root cause

We just restarted the Datanodes and having found nothing wrong with them, we turned to the Namenodes.

WARN blockmanagement.BlockPlacementPolicy (BlockPlacementPolicyDefault.java:chooseTarget()) - 
Failed to place enough replicas, still in need of 3 to reach 3 (...) For more information, 
please enable DEBUG log level on org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy and org.apache.hadoop.net.NetworkTopology

In the hope of getting more details about the issue, we changed the Namenode log level to DEBUG and targeted the BlockPlacementPolicy as the log message suggested.

hadoop daemonlog -setlevel <namenode>:<port> org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy DEBUG

Observing the Active Namenode’s logs it became clear that some Datanodes were not considered eligible for block placement.

Datanode <ip>:<port> is not chosen since the node is too busy (load: node_load > load_threshold)

Datanodes with the required storage type were being skipped as the result of a load check performed by the Namenode, often making the whole tier unavailable. With the help of the vendor and looking at the source code we gained some understanding of what the message means.

BlockPlacementPolicyDefault.java

// check the communication traffic of the target machine
if (considerLoad) {
  final double maxLoad = considerLoadFactor *
      stats.getInServiceXceiverAverage();
  final int nodeLoad = node.getXceiverCount();
  if (nodeLoad > maxLoad) {
    logNodeIsNotChosen(node, NodeNotChosenReason.NODE_TOO_BUSY,
        "(load: " + nodeLoad + " > " + maxLoad + ")");
    return false;
  }
}

One of the criteria for a Datanode to be part of a write pipeline is its current load (node_load) must be lower, than the (load_threshold). The node_load is the number of the Datanode transfer threads (Xceivers). Those are server side threads used for data connections within a single thread group. The load threshold is calculated by the Namenode as the average node_load across all the live Datanodes multiplied by a load factor (default value is 2.0). It is recalculated every time a heartbeat arrives.

DataNodeManager.java

public double getInServiceXceiverAverage() {
    double avgLoad = 0;
    final int nodes = getNumDatanodesInService();
    if (nodes != 0) {
        final int xceivers = heartbeatManager.getInServiceXceiverCount();
        avgLoad = (double)xceivers/nodes;
    }
    return avgLoad;
}

Selecting nodes based on the Xceivers count is a reasonable idea. However, the Namenode compares each node’s load to all the others, regardless of their storage type. Disregarding the storage type leads to the problem we saw.

Here is an example how it can go wrong. To make the calculation easier: let us say a cluster have 100 Datanodes. 90 of the nodes have only DISK storage types and 10 of the Datanodes only have ARCHIVE storage type set and the average load over all of them is 10. The DISK storage type nodes average load remains steady. The rest (those 10 Datanodes with ARCHIVE storage type) become heavily utilized. When those 10 Datanodes average reach 23 Xceivers the Namenode will filter them out:


(90*10 + 10*23)/100 = 11.3 (average xceivers count)

The load_threshold (based on the default load factor) becomes 22.6. All 10 of our nodes will have a node_load more than two times the cluster average and they become unavailable for block placement.

Possible workarounds

  • Increase the dfs.namenode.replication.considerLoad.factor, in our example setting it to 3 would have helped, however it could increase the risk of wide variation in load across the cluster.
  • The dfs.namenode.replication.considerLoad can be also set to false, which disables the load check entirely, but without having this hard limit, the nodes could be even more unevenly utilized.

Solution

As a followup of the investigation this problem has been identified as a major bug and tracked under HDFS-14383 – Compute datanode load based on StoragePolicy [2] to prevent this issue from happening, when the dfs.namenode.replication.considerLoad is set. In order to avoid the problem, while it is getting fixed, we decided to disable the considerLoad and setup additional monitoring to make sure we can spot and handle possible Datanode hotspotting.

[1] https://github.com/apache/hadoop/blob/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
[2] https://issues.apache.org/jira/browse/HDFS-14383

Jozsef Balogh – Big Data Platform Engineering Team

Interested in a career at G-Research?

Want to be part of a leading quantitative research and technology company? Bring your skills and experience to G-Research by applying for one of our many roles.