Alluxio本地缓存开发配置指南

Alluxio[1] 是世界上第一个面向基于云的数据分析和人工智能的开源的数据编排技术。 它为数据驱动型应用和存储系统构建了桥梁, 将数据从存储层移动到距离数据驱动型应用更近的位置从而能够更容易被访问。 这还使得应用程序能够通过一个公共接口连接到许多存储系统。 Alluxio内存至上的层次化架构使得数据的访问速度能比现有方案快几个数量级。

Alluxio本地缓存介绍

Alluxio本地缓存功能可以解决部署Alluxio服务的问题,上层使用Alluxio可以把Alluxio当做一个本地缓存,并且无需部署Alluxio Master、Worker服务,仅仅作为一个lib库即可。客户端缓存的优点显而易见,就是易于部署和集成;但是其缺点也主要有以下两点:

  1. 跨不同客户端应用程序进程的潜在数据重复;
  2. 很难实现对集群的细粒度缓存控制;

Alluxio 在2020年初提交的这个PR[1]中实现了本地Local Cache的功能。其主要部分如下所示:

C/C++访问接口

环境准备

  1. 有一个HDFS集群,可进行读写

  2. 把hadoop的jar包设置CLASSPATH

1
2
3
4
5
6
7
8
9
10
11
12
export HADOOP_HOME=/home/test/hadoop

JAR_DIRS="$HADOOP_HOME/share/hadoop/common/lib/
$HADOOP_HOME/share/hadoop/common/
$HADOOP_HOME/share/hadoop/hdfs
$HADOOP_HOME/share/hadoop/hdfs/lib/"
for d in $JAR_DIRS; do
for j in $d/*.jar; do
CLASSPATH=${CLASSPATH}:$j
done;
done;
CLASSPATH=$CLASSPATH:/data8/qh
  1. 把Alluxio本地cache的jar包设置进CLASSPATH
    1
    2
    ALLUXIO_LOCAL_CACHE_JAR=/data8/qhl/lib_hdfs_test/libhdfs-examples/path/alluxio-core-client-hdfs-2.6.0-SNAPSHOT.jar
    CLASSPATH=$ALLUXIO_LOCAL_CACHE_JAR:$CLASSPATH

Alluxio代码修改

由于alluxio.hadoop.LocalCacheFileSystem类并未提供默认构造函数,所以需要新增如下代码:

修改方案1

此方案对应用来说,还需要指明采用alluxio前缀,即fs = hdfsConnect(“alluxio://ip”, port);
此方案需要上层用户修改指明采用alluxio FileSystem,并且用户client的cache配置传递不到DistributedFileSystem上面,因为我们在这里面初始化了一个默认的DistributedFileSystem。

1
2
3
4
5
6
7
8
9
10
11
12
13
public LocalCacheFileSystem() {
org.apache.hadoop.conf.Configuration tmpConf = new Configuration();
tmpConf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
try {
org.apache.hadoop.fs.FileSystem fileSystem = FileSystem.get(tmpConf);
HadoopFileOpener fileOpener = uriStatus -> fileSystem.open(new Path(uriStatus.getPath()));
mExternalFileSystem = Preconditions.checkNotNull(fileSystem, "filesystem");
mHadoopFileOpener = Preconditions.checkNotNull(fileOpener, "fileOpener");
mAlluxioFileOpener = status -> new AlluxioHdfsInputStream(mHadoopFileOpener.open(status));
} catch (IOException e) {
LOG.error("initialize LocalCacheFileSystem failed");
}
}

修改方案2

用户client的conf可以透传到DistributedFileSystem上面,并且上层无需修改任何连接方式,还是可以采用fs = hdfsConnect(“ip”, port)的方式连接。
则需要修改如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public LocalCacheFileSystem() {
}

private static final Set<String> SUPPORTED_FS = new HashSet<String>() {
{
add(Constants.SCHEME);
add("ws");
add("hdfs"); // 这里是新增加的代码,目的是让LocalCacheFileSystem识别hdfs schema的连接。
}
};

@Override
public synchronized void initialize(URI uri, org.apache.hadoop.conf.Configuration conf)
throws IOException {
if (!SUPPORTED_FS.contains(uri.getScheme())) {
throw new UnsupportedOperationException(
uri.getScheme() + " is not supported as the external filesystem.");
}
super.initialize(uri, conf);
mHadoopConf = conf;
// Set statistics
setConf(conf);

// The followings are new added codes,这部分是新增加的代码
String hdfsImpl = "fs.hdfs.impl";
String origin = getConf().get(hdfsImpl);
getConf().set(hdfsImpl, DistributedFileSystem.class.getName());
try {
org.apache.hadoop.fs.FileSystem fileSystem = FileSystem.get(getConf());
HadoopFileOpener fileOpener = uriStatus -> fileSystem.open(new Path(uriStatus.getPath()));
mExternalFileSystem = Preconditions.checkNotNull(fileSystem, "filesystem");
mHadoopFileOpener = Preconditions.checkNotNull(fileOpener, "fileOpener");
mAlluxioFileOpener = status -> new AlluxioHdfsInputStream(mHadoopFileOpener.open(status));
} catch (IOException e) {
LOG.error("initialize LocalCacheFileSystem failed");
}
getConf().set(hdfsImpl, origin);
// The above are new added codes,这部分是新增加的代码

mAlluxioConf = HadoopUtils.toAlluxioConf(mHadoopConf);
// Handle metrics
Properties metricsProperties = new Properties();
for (Map.Entry<String, String> entry : conf) {
metricsProperties.setProperty(entry.getKey(), entry.getValue());
}
MetricsSystem.startSinksFromConfig(new MetricsConfig(metricsProperties));
mCacheManager = CacheManager.Factory.get(mAlluxioConf);
}

Alluxio编译请参考官网Building Alluxio From Source。即采用如下命令可以编译成功:

1
mvn -T 2C clean install -DskipTests -Dmaven.javadoc.skip -Dfindbugs.skip -Dcheckstyle.skip -Dlicense.skip -Dskip.protoc

本地cache的jar包在${ALLUXIO_HOME}/core/client/hdfs/alluxio-core-client-hdfs-${version}.jar

注意,如果单独使用此jar包,则需要把其依赖的jar包一并打包,这样其依赖的其他jar包才能都打包进来,把依赖也打包的方式请参考附录

读取测试

需要注意的是,读取的时候,hdfs地址的需要根据提供的alluxio cache jar包,填写不同的前缀;如果采用第一种方式,则前缀要写如下格式:“fs = hdfsConnect(“alluxio://ip”, port);”; 如果采用第二种方式编译的alluxio cache jar包,则不需要填写alluxio前缀

编译
gcc libhdfs_read.c -I/home/test/hadoop/include/ -L/home/test/hadoop/lib/native -lhdfs -o libhdfs_read

如果提示libjvm.so找不到,则需要把其加入到LD_LIBRARY_PATH中

1
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${JAVA_HOME}/jre/lib/amd64/server

读取
./libhdfs_read /bii.parq 1024

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include "hdfs.h"

#include <stdio.h>
#include <stdlib.h>

/**
* An example of using libhdfs to read files. The usage of this file is as follows:
*
* Usage: hdfs_read <filename> <filesize> <buffersize>
*/
int main(int argc, char **argv) {
hdfsFS fs;
const char *rfile = argv[1];
tSize bufferSize = strtoul(argv[2], NULL, 10);
hdfsFile readFile;
char* buffer;
tSize curSize;
if (argc != 3) {
fprintf(stderr, "Usage: hdfs_read <filename> <buffersize>\n");
exit(-1);
}

## 根据alluxio源码修改的方式,填写不同的前缀。
// fs = hdfsConnect("alluxio://172.16.48.5", 9000);
fs = hdfsConnect("172.16.48.5", 9000);
if (!fs) {
fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
exit(-1);
}

readFile = hdfsOpenFile(fs, rfile, O_RDONLY, bufferSize, 0, 0);
if (!readFile) {
fprintf(stderr, "Failed to open %s for writing!\n", rfile);
exit(-2);
}

// data to be written to the file
buffer = malloc(sizeof(char) * bufferSize);
if(buffer == NULL) {
return -2;
}

// read from the file
curSize = bufferSize;
fprintf(stdout,"bufferSize=%d, curSize=%d\n", bufferSize, curSize);
for (; curSize == bufferSize;) {
curSize = hdfsRead(fs, readFile, (void*)buffer, curSize);
fprintf(stdout, "read length=%d\n", curSize);
}

fprintf(stdout, "hahahahah=%d\n",curSize);
fprintf(stdout, "buffer=%s\n", buffer);
free(buffer);
hdfsCloseFile(fs, readFile);
hdfsDisconnect(fs);

# 如果配置的异步缓存cache,则可能由于客户端很快退出,导致缓存不能够及时刷写到磁盘中。
//sleep(1000);
return 0;
}

上面的代码中有两处需要注意的点:

  1. 上文提到的hdfs地址,采用不同的allixo源码修改方式,前缀是不一样的。第二种修改源码方式对用户使用友好。
  2. 如果配置了异步缓存cache,则如果客户端退出很快,则会造成文件不能够及时的刷写到磁盘缓存中。

配置文件

libhdfs_read同级别目录下面需要有一个core-site.xml文件,下面列出一些比较重要的配置,具体每个配置的含义请参考Alluxio的官方网站

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://172.16.48.5:9000</value> // hdfs的ip和port
</property>

<property>
// 第一种alluxio修改方式填写fs.alluxio.impl,第二种源码方式修改,则需要填写fs.hdfs.impl
<name>fs.alluxio.impl</name>
<value>alluxio.hadoop.LocalCacheFileSystem</value>
</property>

<property>
<name>alluxio.user.client.cache.enabled</name>
<value>true</value>
</property>

<property>
<name>alluxio.user.client.cache.dir</name>
<value>/data8/qhl/lib_hdfs_test/libhdfs-examples/alluxio_cache</value>
</property>

<property>
<name>alluxio.user.client.cache.page.size</name>
<value>1KB</value>
</property>

<property>
<name>alluxio.user.client.cache.async.write.enabled</name>
<value>false</value>
</property>

<property>
<name>alluxio.user.client.cache.store.type</name>
<value>LOCAL</value>
</property
</configuration>

采用rocksdb作为本地缓存

rocksdb无需编译。alluxio cache jar包中已经包含了rocksdb的二进制。

参考文献

[1]Implement a User-Side Alluxio Local Cache

附录

  1. alluxio.core.client.hdfs pom文件修改,使其能够打包依赖的jar包
    打包命令如下所示,-Dhadoop.version后面是hadoop的版本。
    1
    mvn package assembly:single -Dhadoop.version=2.8.5 -Dfindbugs.skip -Dcheckstyle.skip -Dlicense.skip -Dskip.test=true -DskipTests
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<!---->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>