更新时间:2023-01-11 16:29:25
在code将数据写入到的DataNodes是2个文件present:
The code for writing data into DataNodes is present in 2 files:
DFSOutputStream.java
(包: org.apache.hadoop.hdfs
)
由客户端写入的数据被分成数据包(通常64k的大小的)。当一个数据包准备好,得到的数据将排队数据队列,这是由 DataStreamer
回升。
The data written by client is split into packets (typically of 64k size). When a packet of data is ready, the data gets enqueued into a Data Queue, which is picked up by the DataStreamer
.
DataStreamer
(包: org.apache.hadoop.hdfs
)
有拾取在数据队列中的数据包并把它们在管道发送到数据节点(典型地有3数据节点由于3复制因子在数据管道,)。
It picks up the packets in the Data Queue and sends them to the Data Nodes in the pipeline (typically there are 3 Data Nodes in a data pipeline, because of replication factor of 3).
据检索新的块ID和开始流数据,以数据节点。当数据块被写入时,会关闭当前的块并得到一个新的块写入下一组数据包。
It retrieves a new block ID and starts streaming the data to Data Nodes. When a block of data is written, it closes the current block and gets a new block for writing next set of packets.
在code,其中一个新块得到的,是如下:
The code, where a new block is got, is below:
// get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
if(LOG.isDebugEnabled()) {
LOG.debug("Allocating new block");
}
setPipeline(nextBlockOutputStream());
initDataStreaming();
}
在code,其中当前块被关闭,低于:
The code, where the current block gets closed, is below:
// Is this block full?
if (one.isLastPacketInBlock()) {
// wait for the close packet has been acked
synchronized (dataQueue) {
while (!shouldStop() && ackQueue.size() != 0) {
dataQueue.wait(1000);// wait for acks to arrive from datanodes
}
}
if (shouldStop()) {
continue;
}
endBlock();
}
在端块()
方法,再次在舞台设置为:
In the endBlock()
method, again the stage is set to:
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
这意味着,针对下一组数据包写入新的块中创建一个新的渠道。
which means, a new pipeline is created for writing next set of packets to a new Block.
编辑:?如何检测块的结束
由于 DataStreamer
保存数据附加到一个块,它更新写入的字节数。
As DataStreamer
keeps appending data to a block, it updates the number of bytes written.
/**
* increase bytes of current block by len.
*
* @param len how many bytes to increase to current block
*/
void incBytesCurBlock(long len) {
this.bytesCurBlock += len;
}
它还保存检查,如果写入的字节数等于块大小:
It also keeps checking, if the number of bytes written is equal to the blocksize:
// If packet is full, enqueue it for transmission
//
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
getStreamer().getBytesCurBlock() == blockSize) {
enqueueCurrentPacketFull();
}
如果达到块大小
在上面的语句中,以下条件检查:
In the statement above, following condition checks if the blocksize is reached:
getStreamer().getBytesCurBlock() == blockSize)
如果遇到块边界,那么端块()
方法被调用:
If the block boundary is encountered, then endBlock()
method gets called:
/**
* if encountering a block boundary, send an empty packet to
* indicate the end of block and reset bytesCurBlock.
*
* @throws IOException
*/
protected void endBlock() throws IOException {
if (getStreamer().getBytesCurBlock() == blockSize) {
setCurrentPacketToEmpty();
enqueueCurrentPacket();
getStreamer().setBytesCurBlock(0);
lastFlushOffset = 0;
}
}
这将确保,当前块被关闭,并从名称节点
获得一个新的块写入数据。
This will ensure that, the current block gets closed and a new block is obtained from Name Node
for writing the data.
块大小是由 dfs.blocksize
确定参数 HDFS-site.xml中
文件(它被定128 MB在我的群集= 134217728):
The block size is determined by dfs.blocksize
parameter in hdfs-site.xml
file (it is set to 128 MB = 134217728 in my cluster):
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
<description>The default block size for new files, in bytes.
You can use the following suffix (case insensitive): k(kilo),
m(mega), g(giga), t(tera), p(peta), e(exa) to specify the
size (such as 128k, 512m, 1g, etc.), Or provide complete size
in bytes (such as 134217728 for 128 MB).
</description>
</property>