阅读次数
标签(空格分隔): 未分类
flume在写hdfs的时候,每接收到一个event就会调用bucketwriter.java的append(event)方法,在append当中每次都会去检查shouldrotate,我们来看一下这块的逻辑:
private boolean shouldRotate() {
boolean doRotate = false;
if (writer.isUnderReplicated()) {
this.isUnderReplicated = true;
doRotate = true;
} else {
this.isUnderReplicated = false;
}
if ((rollCount > 0) && (rollCount <= eventCounter)) {
LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);
doRotate = true;
}
if ((rollSize > 0) && (rollSize <= processSize)) {
LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);
doRotate = true;
}
return doRotate;
}
我们可以看到这里每次都会调用writer.isUnderReplicated(),如果副本数低于预期,那么不管有没有达到rollcount或者rollsize就rotate??(这块可能是说上一个event如果副本数没有达到预期,那么就需要重新开一个新文件去写入,为啥要这样设计?不太清楚。。。)
public boolean isUnderReplicated() {
try {
int numBlocks = getNumCurrentReplicas();
if (numBlocks == -1) {
return false;
}
int desiredBlocks;
if (configuredMinReplicas != null) {
desiredBlocks = configuredMinReplicas;
} else {
desiredBlocks = getFsDesiredReplication();
}
return numBlocks < desiredBlocks;
} catch (IllegalAccessException e) {
logger.error("Unexpected error while checking replication factor", e);
} catch (InvocationTargetException e) {
logger.error("Unexpected error while checking replication factor", e);
} catch (IllegalArgumentException e) {
logger.error("Unexpected error while checking replication factor", e);
}
return false;
}
副本获取的代码如下:
/**
* This method gets the datanode replication count for the current open file.
*
* If the pipeline isn't started yet or is empty, you will get the default
* replication factor.
*
* <p/>If this function returns -1, it means you
* are not properly running with the HDFS-826 patch.
* @throws InvocationTargetException
* @throws IllegalAccessException
* @throws IllegalArgumentException
*/
public int getNumCurrentReplicas()
throws IllegalArgumentException, IllegalAccessException,
InvocationTargetException {
if (refGetNumCurrentReplicas != null && outputStream != null) {
OutputStream dfsOutputStream = outputStream.getWrappedStream();
if (dfsOutputStream != null) {
Object repl = refGetNumCurrentReplicas.invoke(dfsOutputStream, NO_ARGS);
if (repl instanceof Integer) {
return ((Integer)repl).intValue();
}
}
}
return -1;
}
我们可以看到每一次event写入都要去和hdfs交互一次,这个代价非常高。而实际上在我们的工作当中我们并没有用rollcount和rollsize,而是使用了rollinterval,rollinterval的逻辑并不在这里,在open()里面有这么一段逻辑:
if (rollInterval > 0) {
Callable<Void> action = new Callable<Void>() {
public Void call() throws Exception {
LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
bucketPath, rollInterval);
try {
// Roll the file and remove reference from sfWriters map.
close(true);
} catch(Throwable t) {
LOG.error("Unexpected error", t);
}
return null;
}
};
timedRollFuture = timedRollerPool.schedule(action, rollInterval,TimeUnit.SECONDS);
我们回过头来看一下hdfseventsink类的process方法:
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
List<BucketWriter> writers = Lists.newArrayList();
transaction.begin();
try {
int txnEventCount = 0;
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
Event event = channel.take();
if (event == null) {
break;
}
// reconstruct the path name by substituting place holders
String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
timeZone, needRounding, roundUnit, roundValue, useLocalTime);
String realName = BucketPath.escapeString(fileName, event.getHeaders(),
timeZone, needRounding, roundUnit, roundValue, useLocalTime);
String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
BucketWriter bucketWriter;
HDFSWriter hdfsWriter = null;
// Callback to remove the reference to the bucket writer from the
// sfWriters map so that all buffers used by the HDFS file
// handles are garbage collected.
WriterCallback closeCallback = new WriterCallback() {
@Override
public void run(String bucketPath) {
LOG.info("Writer callback called.");
synchronized (sfWritersLock) {
sfWriters.remove(bucketPath);
}
}
};
synchronized (sfWritersLock) {
bucketWriter = sfWriters.get(lookupPath);
// we haven't seen this file yet, so open it and cache the handle
if (bucketWriter == null) {
hdfsWriter = writerFactory.getWriter(fileType);
bucketWriter = initializeBucketWriter(realPath, realName,
lookupPath, hdfsWriter, closeCallback);
sfWriters.put(lookupPath, bucketWriter);
}
}
// track the buckets getting written in this transaction
if (!writers.contains(bucketWriter)) {
writers.add(bucketWriter);
}
// Write the data to HDFS
try {
bucketWriter.append(event);
} catch (BucketClosedException ex) {
LOG.info("Bucket was closed while trying to append, " +
"reinitializing bucket and writing event.");
hdfsWriter = writerFactory.getWriter(fileType);
bucketWriter = initializeBucketWriter(realPath, realName,
lookupPath, hdfsWriter, closeCallback);
synchronized (sfWritersLock) {
sfWriters.put(lookupPath, bucketWriter);
}
bucketWriter.append(event);
}
}
if (txnEventCount == 0) {
sinkCounter.incrementBatchEmptyCount();
} else if (txnEventCount == batchSize) {
sinkCounter.incrementBatchCompleteCount();
} else {
sinkCounter.incrementBatchUnderflowCount();
}
// flush all pending buckets before committing the transaction
for (BucketWriter bucketWriter : writers) {
bucketWriter.flush();
}
transaction.commit();
if (txnEventCount < 1) {
return Status.BACKOFF;
} else {
sinkCounter.addToEventDrainSuccessCount(txnEventCount);
return Status.READY;
}
} catch (IOException eIO) {
transaction.rollback();
LOG.warn("HDFS IO error", eIO);
return Status.BACKOFF;
} catch (Throwable th) {
transaction.rollback();
LOG.error("process failed", th);
if (th instanceof Error) {
throw (Error) th;
} else {
throw new EventDeliveryException(th);
}
} finally {
transaction.close();
}
}
hdfseventsink当中维护了map
agent.sinks.sk_cloudui.hdfs.path = hdfs://bigfile/om/anticrack/cloudui.log/%Y%m%d/%k%M/
agent.sinks.sk_cloudui.hdfs.filePrefix = qd01-pcsdata45.qd01.baidu.com