structure of datafusion
0.config
datafusion的所有config都写在一个macro!里,这个marco会生成对应的变量
macro_rules! config_namespace {
(
$(#[doc = $struct_d:tt])*
$vis:vis struct $struct_name:ident {
$(
$(#[doc = $d:tt])*
$field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
)*$(,)*
}
) => {
$(#[doc = $struct_d])*
#[derive(Debug, Clone)]
#[non_exhaustive]
$vis struct $struct_name{
$(
$(#[doc = $d])*
$field_vis $field_name : $field_type,
)*
}
impl ConfigField for $struct_name {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
$(
stringify!($field_name) => self.$field_name.set(rem, value),
)*
_ => _internal_err!(
"Config value \"{}\" not found on {}", key, stringify!($struct_name)
)
}
}
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
$(
let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
let desc = concat!($($d),*).trim();
self.$field_name.visit(v, key.as_str(), desc);
)*
}
}
impl Default for $struct_name {
fn default() -> Self {
Self {
$($field_name: $default),*
}
}
}
}
}
1.SessionContext
使用SessionConfig来初始化,一步一步地来初始化catalog_list、SessionState(之后会继续分析,目前未知)
pub struct SessionContext {
/// UUID for the session
session_id: String,
/// Session start time
session_start_time: DateTime<Utc>,
/// Shared session state for the session
state: Arc<RwLock<SessionState>>,
}
类似mysql 的con* 负责一个sesson的链接和对应的用户变量
options涵盖了所有的对应config(physical plan/ logical plan etc)
extensions 是个Hashmap : 目前不知道对应什么
pub struct SessionConfig {
/// Configuration options
options: ConfigOptions,
/// Opaque extensions.
extensions: AnyMap,
}
没太多好分析的,注视里基本上都有了,看一看,这玩意儿就喝THD一样,要的时候往里面塞往里面拿就好
SessionStates
pub struct SessionState {
/// A unique UUID that identifies the session
session_id: String,
/// Responsible for analyzing and rewrite a logical plan before optimization
analyzer: Analyzer,
/// Responsible for optimizing a logical plan
optimizer: Optimizer,
/// Responsible for optimizing a physical execution plan
physical_optimizers: PhysicalOptimizer,
/// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
query_planner: Arc<dyn QueryPlanner + Send + Sync>,
/// Collection of catalogs containing schemas and ultimately TableProviders
catalog_list: Arc<dyn CatalogList>,
/// Table Functions
table_functions: HashMap<String, Arc<TableFunction>>,
/// Scalar functions that are registered with the context
scalar_functions: HashMap<String, Arc<ScalarUDF>>,
/// Aggregate functions registered in the context
aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
/// Window functions registered in the context
window_functions: HashMap<String, Arc<WindowUDF>>,
/// Deserializer registry for extensions.
serializer_registry: Arc<dyn SerializerRegistry>,
/// Session configuration
config: SessionConfig,
/// Execution properties
execution_props: ExecutionProps,
/// TableProviderFactories for different file formats.
///
/// Maps strings like "JSON" to an instance of [`TableProviderFactory`]
///
/// This is used to create [`TableProvider`] instances for the
/// `CREATE EXTERNAL TABLE ... STORED AS <FORMAT>` for custom file
/// formats other than those built into DataFusion
table_factories: HashMap<String, Arc<dyn TableProviderFactory>>,
/// Runtime environment
runtime_env: Arc<RuntimeEnv>,
}
OPTIMIZER
有四种不同的Analyze类型
InlineTableScan
CountWildcardRule
OperatorToFunction
TypeCoercion
MyAnalyzerRule
LOGICAL OPTIMIZER
这么总结下去太慢了,得找点对应的活才行