Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 59 additions & 17 deletions datafusion/core/tests/datasource/object_store_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,40 @@ async fn multi_query_multi_file_csv_file() {
+---------+-------+-------+
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 4
- LIST prefix=data
Total Requests: 3
- GET (opts) path=data/file_0.csv
- GET (opts) path=data/file_1.csv
- GET (opts) path=data/file_2.csv
"
);

// the second query should re-use the cached LIST results and should not reissue LIST
// Force a cache eviction by removing the data limit for the cache
assert_snapshot!(
test.query("set datafusion.runtime.list_files_cache_limit=\"0K\"").await,
@r"
------- Query Output (0 rows) -------
++
++
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 0
"
);

// Then re-enable the cache
assert_snapshot!(
test.query("set datafusion.runtime.list_files_cache_limit=\"1M\"").await,
@r"
------- Query Output (0 rows) -------
++
++
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 0
"
);

// this query should list the table since the cache entries were evicted
assert_snapshot!(
test.query("select * from csv_table").await,
@r"
Expand All @@ -149,6 +174,30 @@ async fn multi_query_multi_file_csv_file() {
- GET (opts) path=data/file_2.csv
"
);

// this query should not list the table since the entries were added in the previous query
assert_snapshot!(
test.query("select * from csv_table").await,
@r"
------- Query Output (6 rows) -------
+---------+-------+-------+
| c1 | c2 | c3 |
+---------+-------+-------+
| 0.0 | 0.0 | true |
| 0.00003 | 5e-12 | false |
| 0.00001 | 1e-12 | true |
| 0.00003 | 5e-12 | false |
| 0.00002 | 2e-12 | true |
| 0.00003 | 5e-12 | false |
+---------+-------+-------+
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 3
- GET (opts) path=data/file_0.csv
- GET (opts) path=data/file_1.csv
- GET (opts) path=data/file_2.csv
"
);
}

#[tokio::test]
Expand All @@ -170,8 +219,7 @@ async fn query_multi_csv_file() {
+---------+-------+-------+
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 4
- LIST prefix=data
Total Requests: 3
- GET (opts) path=data/file_0.csv
- GET (opts) path=data/file_1.csv
- GET (opts) path=data/file_2.csv
Expand All @@ -198,8 +246,7 @@ async fn query_partitioned_csv_file() {
+---------+-------+-------+---+----+-----+
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 4
- LIST prefix=data
Total Requests: 3
- GET (opts) path=data/a=1/b=10/c=100/file_1.csv
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
- GET (opts) path=data/a=3/b=30/c=300/file_3.csv
Expand All @@ -218,8 +265,7 @@ async fn query_partitioned_csv_file() {
+---------+-------+-------+---+----+-----+
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 2
- LIST prefix=data/a=2
Total Requests: 1
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
"
);
Expand All @@ -236,8 +282,7 @@ async fn query_partitioned_csv_file() {
+---------+-------+-------+---+----+-----+
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 2
- LIST prefix=data
Total Requests: 1
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
"
);
Expand All @@ -254,8 +299,7 @@ async fn query_partitioned_csv_file() {
+---------+-------+-------+---+----+-----+
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 2
- LIST prefix=data
Total Requests: 1
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
"
);
Expand All @@ -272,8 +316,7 @@ async fn query_partitioned_csv_file() {
+---------+-------+-------+---+----+-----+
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 2
- LIST prefix=data/a=2/b=20
Total Requests: 1
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
"
);
Expand All @@ -290,8 +333,7 @@ async fn query_partitioned_csv_file() {
+---------+-------+-------+---+----+-----+
------- Object Store Request Summary -------
RequestCountingObjectStore()
Total Requests: 2
- LIST prefix=data
Total Requests: 1
- GET (opts) path=data/a=1/b=10/c=100/file_1.csv
"
);
Expand Down
27 changes: 17 additions & 10 deletions datafusion/execution/src/cache/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use crate::cache::CacheAccessor;
use crate::cache::cache_unit::DefaultFilesMetadataCache;
use crate::cache::{CacheAccessor, DefaultListFilesCache};
use datafusion_common::stats::Precision;
use datafusion_common::{Result, Statistics};
use object_store::ObjectMeta;
Expand Down Expand Up @@ -190,18 +190,25 @@ impl CacheManager {
let file_statistic_cache =
config.table_files_statistics_cache.as_ref().map(Arc::clone);

let list_files_cache = config
.list_files_cache
.as_ref()
.inspect(|c| {
let list_files_cache = match &config.list_files_cache {
Some(lfc) if config.list_files_cache_limit > 0 => {
// the cache memory limit or ttl might have changed, ensure they are updated
c.update_cache_limit(config.list_files_cache_limit);
lfc.update_cache_limit(config.list_files_cache_limit);
// Only update TTL if explicitly set in config, otherwise preserve the cache's existing TTL
if let Some(ttl) = config.list_files_cache_ttl {
c.update_cache_ttl(Some(ttl));
lfc.update_cache_ttl(Some(ttl));
}
})
.map(Arc::clone);
Some(Arc::clone(lfc))
}
None if config.list_files_cache_limit > 0 => {
let lfc: Arc<dyn ListFilesCache> = Arc::new(DefaultListFilesCache::new(
config.list_files_cache_limit,
config.list_files_cache_ttl,
));
Some(lfc)
}
_ => None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user has disabled caching with list_files_cache_limit = "0K" then None will be returned here, but in this case get_list_files_cache_limit() will return "1M"

};

let file_metadata_cache = config
.file_metadata_cache
Expand Down Expand Up @@ -235,7 +242,7 @@ impl CacheManager {
pub fn get_list_files_cache_limit(&self) -> usize {
self.list_files_cache
.as_ref()
.map_or(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, |c| c.cache_limit())
.map_or(0, |c| c.cache_limit())
}

/// Get the TTL (time-to-live) of the list files cache.
Expand Down
4 changes: 4 additions & 0 deletions datafusion/sqllogictest/test_files/parquet.slt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
statement ok
set datafusion.execution.target_partitions = 2;

# disable the listing cache so DataFusion picks up changes from COPY statements
statement ok
set datafusion.runtime.list_files_cache_limit = "0K";

# Create a table as a data source
statement ok
CREATE TABLE src_table (
Expand Down
4 changes: 4 additions & 0 deletions datafusion/sqllogictest/test_files/repartition_scan.slt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ set datafusion.execution.target_partitions = 4;
statement ok
set datafusion.optimizer.repartition_file_min_size = 1;

# disable the listing cache so DataFusion picks up changes from COPY statements
statement ok
set datafusion.runtime.list_files_cache_limit = "0K";

###################
### Parquet tests
###################
Expand Down
18 changes: 18 additions & 0 deletions datafusion/sqllogictest/test_files/set_variable.slt
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,24 @@ SHOW datafusion.runtime.metadata_cache_limit
----
datafusion.runtime.metadata_cache_limit 200M

# Test SET and SHOW runtime.list_files_cache_limit
statement ok
SET datafusion.runtime.list_files_cache_limit = '2M'

query TT
SHOW datafusion.runtime.list_files_cache_limit
----
datafusion.runtime.list_files_cache_limit 2M

# Test SET and SHOW runtime.list_files_cache_ttl
statement ok
SET datafusion.runtime.list_files_cache_ttl = '90s'

query TT
SHOW datafusion.runtime.list_files_cache_ttl
----
datafusion.runtime.list_files_cache_ttl 1m30s

# Note: runtime.temp_directory shows the actual temp directory path with a unique suffix,
# so we cannot test the exact value. We verify it exists in information_schema instead.

Expand Down
17 changes: 14 additions & 3 deletions docs/source/library-user-guide/upgrading.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,26 @@ directly on the `Field`. For example:
In prior versions, `ListingTableProvider` would issue `LIST` commands to
the underlying object store each time it needed to list files for a query.
To improve performance, `ListingTableProvider` now caches the results of
`LIST` commands for the lifetime of the `ListingTableProvider` instance.
`LIST` commands for the lifetime of the `ListingTableProvider` instance or
until a cache entry expires.

Note that by default the cache has no expiration time, so if files are added or removed
from the underlying object store, the `ListingTableProvider` will not see
those changes until the `ListingTableProvider` instance is dropped and recreated.

You will be able to configure the maximum cache size and cache expiration time via a configuration option:
You can configure the maximum cache size and cache entry expiration time via configuration options:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


See <https://github.com/apache/datafusion/issues/19056> for more details.
- `datafusion.runtime.list_files_cache_limit` - Limits the size of the cache in bytes
- `datafusion.runtime.list_files_cache_ttl` - Limits the TTL (time-to-live) of an entry in seconds

Detailed configuration information can be found in the [DataFusion Runtime
Configuration](https://datafusion.apache.org/user-guide/configs.html#runtime-configuration-settings) user's guide.

Caching can be disabled by setting the limit to 0:

```sql
SET datafusion.runtime.list_files_cache_limit TO "0K";
```

Note that the internal API has changed to use a trait `ListFilesCache` instead of a type alias.

Expand Down