Use a new thread for listing

This commit is contained in:
Nbiba Bedis 2021-12-09 08:21:53 +01:00
parent b1667bd474
commit 569f39c6c9
3 changed files with 72 additions and 39 deletions

View File

@ -4,6 +4,8 @@ use std::{
env,
io::prelude::*,
path::{Path, PathBuf},
sync::mpsc::{self, Receiver},
thread,
};
use fs_err as fs;
@ -48,21 +50,31 @@ pub fn unpack_archive(
/// List contents of `archive`, returning a vector of archive entries
pub fn list_archive(
archive: tar::Archive<impl Read + 'static>,
) -> crate::Result<impl Iterator<Item = crate::Result<FileInArchive>>> {
// NOTE: tar::Archive::entries takes a &mut self
// This makes returning an iterator impossible
// Current workaround is just to leak the archive
let archive = Box::leak(Box::new(archive));
mut archive: tar::Archive<impl Read + Send + 'static>,
) -> impl Iterator<Item = crate::Result<FileInArchive>> {
struct Files(Receiver<crate::Result<FileInArchive>>);
impl Iterator for Files {
type Item = crate::Result<FileInArchive>;
Ok(archive.entries()?.map(|file| {
let file = file?;
fn next(&mut self) -> Option<Self::Item> {
self.0.recv().ok()
}
}
let path = file.path()?.into_owned();
let is_dir = file.header().entry_type().is_dir();
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
for file in archive.entries().expect("entries is only used once") {
let file_in_archive = (|| {
let file = file?;
let path = file.path()?.into_owned();
let is_dir = file.header().entry_type().is_dir();
Ok(FileInArchive { path, is_dir })
})();
tx.send(file_in_archive).unwrap();
}
});
Ok(FileInArchive { path, is_dir })
}))
Files(rx)
}
/// Compresses the archives given by `input_filenames` into the file given previously to `writer`.

View File

@ -4,6 +4,8 @@ use std::{
env,
io::{self, prelude::*},
path::{Path, PathBuf},
sync::mpsc,
thread,
};
use fs_err as fs;
@ -82,22 +84,41 @@ where
/// List contents of `archive`, returning a vector of archive entries
pub fn list_archive<R>(mut archive: ZipArchive<R>) -> impl Iterator<Item = crate::Result<FileInArchive>>
where
R: Read + Seek + 'static,
R: Read + Seek + Send + 'static,
{
(0..archive.len()).filter_map(move |idx| {
let file = match archive.by_index(idx) {
Ok(f) => f,
Err(e) => return Some(Err(e.into())),
};
struct Files(mpsc::Receiver<crate::Result<FileInArchive>>);
impl Iterator for Files {
type Item = crate::Result<FileInArchive>;
let path = match file.enclosed_name() {
Some(path) => path.to_owned(),
None => return None,
};
let is_dir = file.is_dir();
fn next(&mut self) -> Option<Self::Item> {
self.0.recv().ok()
}
}
Some(Ok(FileInArchive { path, is_dir }))
})
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
for idx in 0..archive.len() {
let maybe_file_in_archive = (|| {
let file = match archive.by_index(idx) {
Ok(f) => f,
Err(e) => return Some(Err(e.into())),
};
let path = match file.enclosed_name() {
Some(path) => path.to_owned(),
None => return None,
};
let is_dir = file.is_dir();
Some(Ok(FileInArchive { path, is_dir }))
})();
if let Some(file_in_archive) = maybe_file_in_archive {
tx.send(file_in_archive).unwrap();
}
}
});
Files(rx)
}
/// Compresses the archives given by `input_filenames` into the file given previously to `writer`.

View File

@ -309,7 +309,6 @@ fn compress_files(
Bzip => Box::new(bzip2::write::BzEncoder::new(encoder, Default::default())),
Lz4 => Box::new(lzzzz::lz4f::WriteCompressor::new(encoder, Default::default())?),
Lzma => Box::new(xz2::write::XzEncoder::new(encoder, 6)),
Snappy => Box::new(snap::write::FrameEncoder::new(encoder)),
Zstd => {
let zstd_encoder = zstd::stream::write::Encoder::new(encoder, Default::default());
// Safety:
@ -463,7 +462,6 @@ fn decompress_file(
Bzip => Box::new(bzip2::read::BzDecoder::new(decoder)),
Lz4 => Box::new(lzzzz::lz4f::ReadDecompressor::new(decoder)?),
Lzma => Box::new(xz2::read::XzDecoder::new(decoder)),
Snappy => Box::new(snap::read::FrameDecoder::new(decoder)),
Zstd => Box::new(zstd::stream::Decoder::new(decoder)?),
Tar | Zip => unreachable!(),
};
@ -567,7 +565,6 @@ fn list_archive_contents(
archive_path: &Path,
formats: Vec<CompressionFormat>,
list_options: ListOptions,
question_policy: QuestionPolicy,
) -> crate::Result<()> {
let reader = fs::File::open(&archive_path)?;
@ -588,18 +585,21 @@ fn list_archive_contents(
// Will be used in decoder chaining
let reader = BufReader::with_capacity(BUFFER_CAPACITY, reader);
let mut reader: Box<dyn Read> = Box::new(reader);
let mut reader: Box<dyn Read + Send> = Box::new(reader);
// Grab previous decoder and wrap it inside of a new one
let chain_reader_decoder = |format: &CompressionFormat, decoder: Box<dyn Read>| -> crate::Result<Box<dyn Read>> {
let decoder: Box<dyn Read> = match format {
Gzip => Box::new(flate2::read::GzDecoder::new(decoder)),
Bzip => Box::new(bzip2::read::BzDecoder::new(decoder)),
Lz4 => Box::new(lzzzz::lz4f::ReadDecompressor::new(decoder)?),
Lzma => Box::new(xz2::read::XzDecoder::new(decoder)),
Snappy => Box::new(snap::read::FrameDecoder::new(decoder)),
Zstd => Box::new(zstd::stream::Decoder::new(decoder)?),
Tar | Zip => unreachable!(),
let chain_reader_decoder =
|format: &CompressionFormat, decoder: Box<dyn Read + Send>| -> crate::Result<Box<dyn Read + Send>> {
let decoder: Box<dyn Read + Send> = match format {
Gzip => Box::new(flate2::read::GzDecoder::new(decoder)),
Bzip => Box::new(bzip2::read::BzDecoder::new(decoder)),
Lz4 => Box::new(lzzzz::lz4f::ReadDecompressor::new(decoder)?),
Lzma => Box::new(xz2::read::XzDecoder::new(decoder)),
Snappy => Box::new(snap::read::FrameDecoder::new(decoder)),
Zstd => Box::new(zstd::stream::Decoder::new(decoder)?),
Tar | Zip => unreachable!(),
};
Ok(decoder)
};
Ok(decoder)
};
@ -609,7 +609,7 @@ fn list_archive_contents(
}
let files: Box<dyn Iterator<Item = crate::Result<FileInArchive>>> = match formats[0] {
Tar => Box::new(crate::archive::tar::list_archive(tar::Archive::new(reader))?),
Tar => Box::new(crate::archive::tar::list_archive(tar::Archive::new(reader))),
Zip => {
eprintln!("{orange}[WARNING]{reset}", orange = *colors::ORANGE, reset = *colors::RESET);
eprintln!(