Fluss Catalog
在上一篇 搭建Fluss本地开发环境 中我们成功搭建了Fluss本地的运行环境并启动了集群,接下来尝试通过flink创建Fluss Catalog。
Flink环境准备
在Flink官网上下载Flink包,此处以flink-1.20.0版本为例,解压后运行命令启动flink集群以及SQL client。
# 启动flink集群
flink-1.20.0/bin/start-cluster.sh
# 启动SQL Client
flink-1.20.0/bin/sql-client.sh创建Fluss Catalog
在SQL Client中运行DDL创建Fluss Catalog。
CREATE CATALOG fluss_catalog WITH (
'type' = 'fluss',
'bootstrap.servers' = 'fluss-server-1:9123'
);
USE CATALOG fluss_catalog;创建成功后接下来,就可以通过catalog去访问fluss的数据内容了。
Fluss Catalog源码
在Fluss的源码中,Catalog的实现类是 FlinkCatalog , 它继承了标准的Flink的Catalog接口,实现了接口的方法,从而能够完成对Fluss集群的访问操作。
/** A Flink Catalog for fluss. */
public class FlinkCatalog implements Catalog {
protected final String catalogName;
protected final @Nullable String defaultDatabase;
protected final String bootstrapServers;
protected Connection connection;
protected Admin admin;
private volatile @Nullable LakeCatalog lakeCatalog;
...
}类里面总共只有6个属性,其中 catalogName , defaultDatabase,以及 bootstrapServers分别用来记录catalog的名称,默认数据库名称,以及链接的Fluss集群地址。这三个属性比较直观,不再赘述。
比较重要剩下三个属性 connection , admin 以及 lakeCatalog 。他们主要负责与Fluss集群沟通,完成相应的数据操作。
Connection
Connection是一个接口,抽象表示了一个与Fluss集群的链接。Admin也是一个接口,用来抽象表示一个管理员Client客户端,它能够对集群做一些管理员操作。
public interface Connection extends AutoCloseable {
/** Retrieve the configuration used to create this connection. */
Configuration getConfiguration();
/** Retrieve a new Admin client to administer a Fluss cluster. */
Admin getAdmin();
/** Retrieve a new Table client to operate data in table. */
Table getTable(TablePath tablePath);
/** Close the connection and release all resources. */
@Override
void close() throws Exception;
}Connection接口只有一个实现类,叫 FlussConnection 。类里面属性很多,有各种不同的Client,如RpcClient, WriterClient以及LookupClient。MetadataUpdater用来维护metadata数据。还有用于remote文件操作的RemoteFileDownloader,以及与metrics相关的MetricsRegistry和ClientMetricGroup。
public final class FlussConnection implements Connection {
private final Configuration conf;
private final RpcClient rpcClient;
private final MetadataUpdater metadataUpdater;
private final MetricRegistry metricRegistry;
private final ClientMetricGroup clientMetricGroup;
private volatile WriterClient writerClient;
private volatile LookupClient lookupClient;
private volatile RemoteFileDownloader remoteFileDownloader;
private volatile SecurityTokenManager securityTokenManager;
...
}这里我们先不详细研究其中的每一个组件。现在大致了解FlussConnection的组成之后,先暂时知道它是用来与Fluss交流的核心的类就可以了。
Admin
Admin也是一个接口,它表示一个管理员Client,用来管理操作Fluss集群的数据,表,配置以及ACL等等。
其中定义了很多方法,诸如 createDatabase, listDatabase, getTableSchema等等。
public interface Admin extends AutoCloseable {
CompletableFuture<SchemaInfo> getTableSchema(TablePath tablePath);
CompletableFuture<Void> createDatabase(
String databaseName, DatabaseDescriptor databaseDescriptor, boolean ignoreIfExists);
...
}Admin接口有一个实现类 FlussAdmin, 只有三个属性,其中AdminGateway承担了与集群的沟通工作。
public class FlussAdmin implements Admin {
private final AdminGateway gateway;
private final MetadataUpdater metadataUpdater;
private final RpcClient client;
}在FlussConnection中,Admin Client由 getAdmin() 方法创建。
@Override
public Admin getAdmin() {
return new FlussAdmin(rpcClient, metadataUpdater);
}Catalog方法实现
Fluss Catalog中实现Catalog接口的方法主要依赖于Admin,这里我们以 listDatabases() 方法为例,几乎都是通过调用admin client的对应方法来实现catalog的功能。
@Override
public List<String> listDatabases() throws CatalogException {
try {
return admin.listDatabases().get();
} catch (Exception e) {
throw new CatalogException(
String.format("Failed to list all databases in %s", getName()),
ExceptionUtils.stripExecutionException(e));
}
}几乎所有的Catalog方法都是这种实现方式,将真正的功能委托给了Admin Client。
在Catelog中,Admin在 open() 方法中通过Connection创建。
/** In FlinkCatalog **/
@Override
public void open() throws CatalogException {
Configuration flussConfigs = new Configuration();
flussConfigs.setString(ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers);
connection = ConnectionFactory.createConnection(flussConfigs);
admin = connection.getAdmin();
}LakeCatalog
Fluss架构里数据会存在Remote Storage里,要去访问数据湖里的数据,就需要创建湖结构的Catalog。lakeCatalog就是用来访问湖中的表。
protected CatalogBaseTable getLakeTable(String databaseName, String tableName)
throws TableNotExistException, CatalogException {
mayInitLakeCatalogCatalog();
String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER);
if (tableComponents.length == 1) {
// should be pattern like table_name$lake
tableName = tableComponents[0];
} else {
// be some thing like table_name$lake$snapshot
tableName = String.join("", tableComponents);
}
return lakeCatalog.getTable(new ObjectPath(databaseName, tableName));
}在Fluss中有一个 getLakeTable() 方法,通过lakeCatalog去访问湖中数据。
小结
这篇文章创建了Fluss Catalog,并通过源码简单了解了Fluss Catalog的实现,主要是通过调用Admin的相应方法实现Catalog的功能。
