Rewrote indexer (#107)

* Update index without rayon

* Use crossbeam channels

* Use a single thread for DB insertions

* Better use of rayon in clean()

* Index rewrite

* Parallelize traverser

* Don't swallow send error

* Use Drop trait to flush Inserter work

* Configurable number of traverser threads

* Use channels to manage the work queue instead of Mutex

* Removed unusable profiling feature
This commit is contained in:
Antoine Gersant 2020-12-07 20:07:10 -08:00 committed by GitHub
parent 8524c7d5fe
commit b6c446fa02
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 632 additions and 497 deletions

3
.gitignore vendored
View file

@ -4,9 +4,6 @@ target
# Test output
test-output
# Profiler output when using the `profile-index` feature
index-flame-graph.html
# Local config for quick iteration
TestConfig.toml

1
Cargo.lock generated
View file

@ -1854,6 +1854,7 @@ dependencies = [
"metaflac",
"mp3-duration",
"mp4ameta",
"num_cpus",
"opus_headers",
"pbkdf2",
"percent-encoding 2.1.0",

View file

@ -7,7 +7,6 @@ edition = "2018"
[features]
default = ["service-rocket"]
ui = ["uuid", "winapi"]
profile-index = ["flame", "flamer"]
service-rocket = ["rocket", "rocket_contrib"]
[dependencies]
@ -29,6 +28,7 @@ log = "0.4.5"
metaflac = "0.2.3"
mp3-duration = "0.1.9"
mp4ameta = "0.7.1"
num_cpus = "1.13.0"
opus_headers = "0.1.2"
pbkdf2 = "0.4"
rand = "0.7"

View file

@ -27,7 +27,6 @@ pub struct SongTags {
pub has_artwork: bool,
}
#[cfg_attr(feature = "profile-index", flame)]
pub fn read(path: &Path) -> Option<SongTags> {
let data = match utils::get_audio_format(path) {
Some(AudioFormat::APE) => Some(read_ape(path)),
@ -49,11 +48,8 @@ pub fn read(path: &Path) -> Option<SongTags> {
}
}
#[cfg_attr(feature = "profile-index", flame)]
fn read_id3(path: &Path) -> Result<SongTags> {
let tag = {
#[cfg(feature = "profile-index")]
let _guard = flame::start_guard("id3_tag_read");
match id3::Tag::read_from_path(&path) {
Ok(t) => Ok(t),
Err(e) => {
@ -66,8 +62,6 @@ fn read_id3(path: &Path) -> Result<SongTags> {
}?
};
let duration = {
#[cfg(feature = "profile-index")]
let _guard = flame::start_guard("mp3_duration");
mp3_duration::from_path(&path)
.map(|d| d.as_secs() as u32)
.ok()
@ -127,7 +121,6 @@ fn read_ape_x_of_y(item: &ape::Item) -> Option<u32> {
}
}
#[cfg_attr(feature = "profile-index", flame)]
fn read_ape(path: &Path) -> Result<SongTags> {
let tag = ape::read(path)?;
let artist = tag.item("Artist").and_then(read_ape_string);
@ -150,7 +143,6 @@ fn read_ape(path: &Path) -> Result<SongTags> {
})
}
#[cfg_attr(feature = "profile-index", flame)]
fn read_vorbis(path: &Path) -> Result<SongTags> {
let file = fs::File::open(path)?;
let source = OggStreamReader::new(file)?;
@ -185,7 +177,6 @@ fn read_vorbis(path: &Path) -> Result<SongTags> {
Ok(tags)
}
#[cfg_attr(feature = "profile-index", flame)]
fn read_opus(path: &Path) -> Result<SongTags> {
let headers = opus_headers::parse_from_path(path)?;
@ -219,7 +210,6 @@ fn read_opus(path: &Path) -> Result<SongTags> {
Ok(tags)
}
#[cfg_attr(feature = "profile-index", flame)]
fn read_flac(path: &Path) -> Result<SongTags> {
let tag = metaflac::Tag::read_from_path(path)?;
let vorbis = tag
@ -251,7 +241,6 @@ fn read_flac(path: &Path) -> Result<SongTags> {
})
}
#[cfg_attr(feature = "profile-index", flame)]
fn read_mp4(path: &Path) -> Result<SongTags> {
let mut tag = mp4ameta::Tag::read_from_path(path)?;

View file

@ -1,8 +1,6 @@
use anyhow::*;
use diesel;
use diesel::prelude::*;
#[cfg(feature = "profile-index")]
use flame;
use log::error;
use std::sync::{Arc, Condvar, Mutex};
use std::time;

View file

@ -3,8 +3,6 @@ use diesel;
use diesel::dsl::sql;
use diesel::prelude::*;
use diesel::sql_types;
#[cfg(feature = "profile-index")]
use flame;
use std::path::Path;
use crate::db::{directories, songs, DB};
@ -31,7 +29,6 @@ no_arg_sql_function!(
"Represents the SQL RANDOM() function"
);
#[cfg_attr(feature = "profile-index", flame)]
pub fn virtualize_song(vfs: &VFS, mut song: Song) -> Option<Song> {
song.path = match vfs.real_to_virtual(Path::new(&song.path)) {
Ok(p) => p.to_string_lossy().into_owned(),
@ -46,7 +43,6 @@ pub fn virtualize_song(vfs: &VFS, mut song: Song) -> Option<Song> {
Some(song)
}
#[cfg_attr(feature = "profile-index", flame)]
fn virtualize_directory(vfs: &VFS, mut directory: Directory) -> Option<Directory> {
directory.path = match vfs.real_to_virtual(Path::new(&directory.path)) {
Ok(p) => p.to_string_lossy().into_owned(),

View file

@ -1,472 +0,0 @@
use anyhow::*;
use crossbeam_channel::{Receiver, Sender};
use diesel;
use diesel::prelude::*;
#[cfg(feature = "profile-index")]
use flame;
use log::{error, info};
use rayon::prelude::*;
use regex::Regex;
use std::fs;
use std::path::{Path, PathBuf};
use std::time;
use crate::config::MiscSettings;
use crate::db::{directories, misc_settings, songs, DB};
use crate::index::metadata;
use crate::vfs::VFSSource;
use metadata::SongTags;
const INDEX_BUILDING_INSERT_BUFFER_SIZE: usize = 1000; // Insertions in each transaction
const INDEX_BUILDING_CLEAN_BUFFER_SIZE: usize = 500; // Insertions in each transaction
pub fn update(db: &DB) -> Result<()> {
let start = time::Instant::now();
info!("Beginning library index update");
clean(db)?;
populate(db)?;
info!(
"Library index update took {} seconds",
start.elapsed().as_millis() as f32 / 1000.0
);
#[cfg(feature = "profile-index")]
flame::dump_html(&mut fs::File::create("index-flame-graph.html").unwrap()).unwrap();
Ok(())
}
#[derive(Debug, Insertable)]
#[table_name = "songs"]
struct NewSong {
path: String,
parent: String,
track_number: Option<i32>,
disc_number: Option<i32>,
title: Option<String>,
artist: Option<String>,
album_artist: Option<String>,
year: Option<i32>,
album: Option<String>,
artwork: Option<String>,
duration: Option<i32>,
}
#[derive(Debug, Insertable)]
#[table_name = "directories"]
struct NewDirectory {
path: String,
parent: Option<String>,
artist: Option<String>,
year: Option<i32>,
album: Option<String>,
artwork: Option<String>,
date_added: i32,
}
struct IndexUpdater {
directory_sender: Sender<NewDirectory>,
song_sender: Sender<NewSong>,
album_art_pattern: Regex,
}
impl IndexUpdater {
#[cfg_attr(feature = "profile-index", flame)]
fn new(
album_art_pattern: Regex,
directory_sender: Sender<NewDirectory>,
song_sender: Sender<NewSong>,
) -> Result<IndexUpdater> {
Ok(IndexUpdater {
directory_sender,
song_sender,
album_art_pattern,
})
}
#[cfg_attr(feature = "profile-index", flame)]
fn push_song(&self, song: NewSong) -> Result<()> {
self.song_sender.send(song).map_err(Error::new)
}
#[cfg_attr(feature = "profile-index", flame)]
fn push_directory(&self, directory: NewDirectory) -> Result<()> {
self.directory_sender.send(directory).map_err(Error::new)
}
fn get_artwork(&self, dir: &Path) -> Result<Option<String>> {
for file in fs::read_dir(dir)? {
let file = file?;
if let Some(name_string) = file.file_name().to_str() {
if self.album_art_pattern.is_match(name_string) {
return Ok(file.path().to_str().map(|p| p.to_owned()));
}
}
}
Ok(None)
}
fn populate_directory(&self, parent: Option<&Path>, path: &Path) -> Result<()> {
#[cfg(feature = "profile-index")]
let _guard = flame::start_guard(format!(
"dir: {}",
path.file_name()
.map(|s| { s.to_string_lossy().into_owned() })
.unwrap_or("Unknown".to_owned())
));
// Find artwork
let mut directory_artwork = {
#[cfg(feature = "profile-index")]
let _guard = flame::start_guard("artwork");
self.get_artwork(path).unwrap_or(None)
};
// Extract path and parent path
let parent_string = parent.and_then(|p| p.to_str()).map(|s| s.to_owned());
let path_string = path.to_str().ok_or(anyhow!("Invalid directory path"))?;
// Find date added
let metadata = {
#[cfg(feature = "profile-index")]
let _guard = flame::start_guard("metadata");
fs::metadata(path_string)?
};
let created = {
#[cfg(feature = "profile-index")]
let _guard = flame::start_guard("created_date");
metadata
.created()
.or_else(|_| metadata.modified())?
.duration_since(time::UNIX_EPOCH)?
.as_secs() as i32
};
let mut directory_album = None;
let mut directory_year = None;
let mut directory_artist = None;
let mut inconsistent_directory_album = false;
let mut inconsistent_directory_year = false;
let mut inconsistent_directory_artist = false;
// Sub directories
let mut sub_directories = Vec::new();
let mut song_files = Vec::new();
let files = match fs::read_dir(path) {
Ok(files) => files,
Err(e) => {
error!("Directory read error for `{}`: {}", path.display(), e);
return Err(e.into());
}
};
// Insert content
for file in files {
let file_path = match file {
Ok(ref f) => f.path(),
Err(e) => {
error!("File read error within `{}`: {}", path_string, e);
break;
}
};
#[cfg(feature = "profile-index")]
let _guard = flame::start_guard(format!(
"file: {}",
file_path
.as_path()
.file_name()
.map(|s| { s.to_string_lossy().into_owned() })
.unwrap_or("Unknown".to_owned())
));
if file_path.is_dir() {
sub_directories.push(file_path.to_path_buf());
continue;
}
song_files.push(file_path);
}
let song_metadata = |path: PathBuf| -> Option<(String, SongTags)> {
#[cfg(feature = "profile-index")]
let _guard = flame::start_guard("song_metadata");
path.to_str().and_then(|file_path_string| {
metadata::read(&path).map(|m| (file_path_string.to_owned(), m))
})
};
let song_tags = song_files
.into_par_iter()
.filter_map(song_metadata)
.collect::<Vec<_>>();
if directory_artwork.is_none() {
directory_artwork = song_tags
.iter()
.find(|(_, t)| t.has_artwork)
.map(|(p, _)| p.to_owned());
}
for (file_path_string, tags) in song_tags {
if tags.year.is_some() {
inconsistent_directory_year |=
directory_year.is_some() && directory_year != tags.year;
directory_year = tags.year;
}
if tags.album.is_some() {
inconsistent_directory_album |=
directory_album.is_some() && directory_album != tags.album;
directory_album = tags.album.as_ref().cloned();
}
if tags.album_artist.is_some() {
inconsistent_directory_artist |=
directory_artist.is_some() && directory_artist != tags.album_artist;
directory_artist = tags.album_artist.as_ref().cloned();
} else if tags.artist.is_some() {
inconsistent_directory_artist |=
directory_artist.is_some() && directory_artist != tags.artist;
directory_artist = tags.artist.as_ref().cloned();
}
let artwork_path = if tags.has_artwork {
Some(file_path_string.to_owned())
} else {
directory_artwork.as_ref().cloned()
};
let song = NewSong {
path: file_path_string.to_owned(),
parent: path_string.to_owned(),
disc_number: tags.disc_number.map(|n| n as i32),
track_number: tags.track_number.map(|n| n as i32),
title: tags.title,
duration: tags.duration.map(|n| n as i32),
artist: tags.artist,
album_artist: tags.album_artist,
album: tags.album,
year: tags.year,
artwork: artwork_path,
};
self.push_song(song)?;
}
// Insert directory
let directory = {
if inconsistent_directory_year {
directory_year = None;
}
if inconsistent_directory_album {
directory_album = None;
}
if inconsistent_directory_artist {
directory_artist = None;
}
NewDirectory {
path: path_string.to_owned(),
parent: parent_string,
artwork: directory_artwork,
album: directory_album,
artist: directory_artist,
year: directory_year,
date_added: created,
}
};
self.push_directory(directory)?;
// Populate subdirectories
sub_directories
.into_par_iter()
.map(|sub_directory| self.populate_directory(Some(path), &sub_directory))
.collect() // propagate an error to the caller if one of them failed
}
}
#[cfg_attr(feature = "profile-index", flame)]
pub fn clean(db: &DB) -> Result<()> {
let vfs = db.get_vfs()?;
{
let all_songs: Vec<String>;
{
let connection = db.connect()?;
all_songs = songs::table.select(songs::path).load(&connection)?;
}
let missing_songs = all_songs
.par_iter()
.filter(|ref song_path| {
let path = Path::new(&song_path);
!path.exists() || vfs.real_to_virtual(path).is_err()
})
.collect::<Vec<_>>();
{
let connection = db.connect()?;
for chunk in missing_songs[..].chunks(INDEX_BUILDING_CLEAN_BUFFER_SIZE) {
diesel::delete(songs::table.filter(songs::path.eq_any(chunk)))
.execute(&connection)?;
}
}
}
{
let all_directories: Vec<String>;
{
let connection = db.connect()?;
all_directories = directories::table
.select(directories::path)
.load(&connection)?;
}
let missing_directories = all_directories
.par_iter()
.filter(|ref directory_path| {
let path = Path::new(&directory_path);
!path.exists() || vfs.real_to_virtual(path).is_err()
})
.collect::<Vec<_>>();
{
let connection = db.connect()?;
for chunk in missing_directories[..].chunks(INDEX_BUILDING_CLEAN_BUFFER_SIZE) {
diesel::delete(directories::table.filter(directories::path.eq_any(chunk)))
.execute(&connection)?;
}
}
}
Ok(())
}
#[cfg_attr(feature = "profile-index", flame)]
pub fn populate(db: &DB) -> Result<()> {
let vfs = db.get_vfs()?;
let mount_points = vfs.get_mount_points();
let album_art_pattern = {
let connection = db.connect()?;
let settings: MiscSettings = misc_settings::table.get_result(&connection)?;
Regex::new(&settings.index_album_art_pattern)?
};
let (directory_sender, directory_receiver) = crossbeam_channel::unbounded();
let (song_sender, song_receiver) = crossbeam_channel::unbounded();
let songs_db = db.clone();
let directories_db = db.clone();
let directories_thread = std::thread::spawn(move || {
insert_directories(directory_receiver, directories_db);
});
let songs_thread = std::thread::spawn(move || {
insert_songs(song_receiver, songs_db);
});
{
let updater = IndexUpdater::new(album_art_pattern, directory_sender, song_sender)?;
let mount_points = mount_points.values().collect::<Vec<_>>();
mount_points
.iter()
.par_bridge()
.map(|target| updater.populate_directory(None, target.as_path()))
.collect::<Result<()>>()?;
}
match directories_thread.join() {
Err(e) => error!(
"Error while waiting for directory insertions to complete: {:?}",
e
),
_ => (),
}
match songs_thread.join() {
Err(e) => error!(
"Error while waiting for song insertions to complete: {:?}",
e
),
_ => (),
}
Ok(())
}
fn flush_directories(db: &DB, entries: &Vec<NewDirectory>) {
if db
.connect()
.and_then(|connection| {
diesel::insert_into(directories::table)
.values(entries)
.execute(&*connection) // TODO https://github.com/diesel-rs/diesel/issues/1822
.map_err(Error::new)
})
.is_err()
{
error!("Could not insert new directories in database");
}
}
fn flush_songs(db: &DB, entries: &Vec<NewSong>) {
if db
.connect()
.and_then(|connection| {
diesel::insert_into(songs::table)
.values(entries)
.execute(&*connection) // TODO https://github.com/diesel-rs/diesel/issues/1822
.map_err(Error::new)
})
.is_err()
{
error!("Could not insert new songs in database");
}
}
fn insert_directories(receiver: Receiver<NewDirectory>, db: DB) {
let mut new_entries = Vec::new();
new_entries.reserve_exact(INDEX_BUILDING_INSERT_BUFFER_SIZE);
loop {
match receiver.recv() {
Ok(s) => {
new_entries.push(s);
if new_entries.len() >= INDEX_BUILDING_INSERT_BUFFER_SIZE {
flush_directories(&db, &new_entries);
new_entries.clear();
}
}
Err(_) => break,
}
}
if new_entries.len() > 0 {
flush_directories(&db, &new_entries);
}
}
fn insert_songs(receiver: Receiver<NewSong>, db: DB) {
let mut new_entries = Vec::new();
new_entries.reserve_exact(INDEX_BUILDING_INSERT_BUFFER_SIZE);
loop {
match receiver.recv() {
Ok(s) => {
new_entries.push(s);
if new_entries.len() >= INDEX_BUILDING_INSERT_BUFFER_SIZE {
flush_songs(&db, &new_entries);
new_entries.clear();
}
}
Err(_) => break,
}
}
if new_entries.len() > 0 {
flush_songs(&db, &new_entries);
}
}

View file

@ -0,0 +1,74 @@
use anyhow::*;
use diesel;
use diesel::prelude::*;
use rayon::prelude::*;
use std::path::Path;
use crate::db::{directories, songs, DB};
use crate::vfs::VFSSource;
const INDEX_BUILDING_CLEAN_BUFFER_SIZE: usize = 500; // Deletions in each transaction
pub struct Cleaner {
db: DB,
}
impl Cleaner {
pub fn new(db: DB) -> Self {
Self { db }
}
pub fn clean(&self) -> Result<()> {
let vfs = self.db.get_vfs()?;
let all_directories: Vec<String> = {
let connection = self.db.connect()?;
directories::table
.select(directories::path)
.load(&connection)?
};
let all_songs: Vec<String> = {
let connection = self.db.connect()?;
songs::table.select(songs::path).load(&connection)?
};
let list_missing_directories = || {
all_directories
.par_iter()
.filter(|ref directory_path| {
let path = Path::new(&directory_path);
!path.exists() || vfs.real_to_virtual(path).is_err()
})
.collect::<Vec<_>>()
};
let list_missing_songs = || {
all_songs
.par_iter()
.filter(|ref song_path| {
let path = Path::new(&song_path);
!path.exists() || vfs.real_to_virtual(path).is_err()
})
.collect::<Vec<_>>()
};
let thread_pool = rayon::ThreadPoolBuilder::new().build()?;
let (missing_songs, missing_directories) =
thread_pool.join(list_missing_directories, list_missing_songs);
{
let connection = self.db.connect()?;
for chunk in missing_directories[..].chunks(INDEX_BUILDING_CLEAN_BUFFER_SIZE) {
diesel::delete(directories::table.filter(directories::path.eq_any(chunk)))
.execute(&connection)?;
}
for chunk in missing_songs[..].chunks(INDEX_BUILDING_CLEAN_BUFFER_SIZE) {
diesel::delete(songs::table.filter(songs::path.eq_any(chunk)))
.execute(&connection)?;
}
}
Ok(())
}
}

View file

@ -0,0 +1,144 @@
use crate::index::update::{inserter, traverser};
use crossbeam_channel::{Receiver, Sender};
use log::error;
use regex::Regex;
pub struct Collector {
receiver: Receiver<traverser::Directory>,
sender: Sender<inserter::Item>,
album_art_pattern: Regex,
}
impl Collector {
pub fn new(
receiver: Receiver<traverser::Directory>,
sender: Sender<inserter::Item>,
album_art_pattern: Regex,
) -> Self {
Self {
receiver,
sender,
album_art_pattern,
}
}
pub fn collect(&self) {
loop {
match self.receiver.recv() {
Ok(directory) => self.collect_directory(directory),
Err(_) => break,
}
}
}
fn collect_directory(&self, directory: traverser::Directory) {
let mut directory_album = None;
let mut directory_year = None;
let mut directory_artist = None;
let mut inconsistent_directory_album = false;
let mut inconsistent_directory_year = false;
let mut inconsistent_directory_artist = false;
let directory_artwork = self.get_artwork(&directory);
let directory_path_string = directory.path.to_string_lossy().to_string();
let directory_parent_string = directory.parent.map(|p| p.to_string_lossy().to_string());
for song in directory.songs {
let tags = song.metadata;
let path_string = song.path.to_string_lossy().to_string();
if tags.year.is_some() {
inconsistent_directory_year |=
directory_year.is_some() && directory_year != tags.year;
directory_year = tags.year;
}
if tags.album.is_some() {
inconsistent_directory_album |=
directory_album.is_some() && directory_album != tags.album;
directory_album = tags.album.as_ref().cloned();
}
if tags.album_artist.is_some() {
inconsistent_directory_artist |=
directory_artist.is_some() && directory_artist != tags.album_artist;
directory_artist = tags.album_artist.as_ref().cloned();
} else if tags.artist.is_some() {
inconsistent_directory_artist |=
directory_artist.is_some() && directory_artist != tags.artist;
directory_artist = tags.artist.as_ref().cloned();
}
let artwork_path = if tags.has_artwork {
Some(path_string.clone())
} else {
directory_artwork.as_ref().cloned()
};
if let Err(e) = self.sender.send(inserter::Item::Song(inserter::Song {
path: path_string,
parent: directory_path_string.clone(),
disc_number: tags.disc_number.map(|n| n as i32),
track_number: tags.track_number.map(|n| n as i32),
title: tags.title,
duration: tags.duration.map(|n| n as i32),
artist: tags.artist,
album_artist: tags.album_artist,
album: tags.album,
year: tags.year,
artwork: artwork_path,
})) {
error!("Error while sending song from collector: {}", e);
}
}
if inconsistent_directory_year {
directory_year = None;
}
if inconsistent_directory_album {
directory_album = None;
}
if inconsistent_directory_artist {
directory_artist = None;
}
if let Err(e) = self
.sender
.send(inserter::Item::Directory(inserter::Directory {
path: directory_path_string,
parent: directory_parent_string,
artwork: directory_artwork,
album: directory_album,
artist: directory_artist,
year: directory_year,
date_added: directory.created,
})) {
error!("Error while sending directory from collector: {}", e);
}
}
fn get_artwork(&self, directory: &traverser::Directory) -> Option<String> {
let regex_artwork = directory.other_files.iter().find_map(|path| {
let matches = path
.file_name()
.and_then(|n| n.to_str())
.map(|n| self.album_art_pattern.is_match(n))
.unwrap_or(false);
if matches {
Some(path.to_string_lossy().to_string())
} else {
None
}
});
let embedded_artwork = directory.songs.iter().find_map(|song| {
if song.metadata.has_artwork {
Some(song.path.to_string_lossy().to_string())
} else {
None
}
});
regex_artwork.or(embedded_artwork)
}
}

View file

@ -0,0 +1,133 @@
use anyhow::*;
use crossbeam_channel::Receiver;
use diesel;
use diesel::prelude::*;
use log::error;
use crate::db::{directories, songs, DB};
const INDEX_BUILDING_INSERT_BUFFER_SIZE: usize = 1000; // Insertions in each transaction
#[derive(Debug, Insertable)]
#[table_name = "songs"]
pub struct Song {
pub path: String,
pub parent: String,
pub track_number: Option<i32>,
pub disc_number: Option<i32>,
pub title: Option<String>,
pub artist: Option<String>,
pub album_artist: Option<String>,
pub year: Option<i32>,
pub album: Option<String>,
pub artwork: Option<String>,
pub duration: Option<i32>,
}
#[derive(Debug, Insertable)]
#[table_name = "directories"]
pub struct Directory {
pub path: String,
pub parent: Option<String>,
pub artist: Option<String>,
pub year: Option<i32>,
pub album: Option<String>,
pub artwork: Option<String>,
pub date_added: i32,
}
pub enum Item {
Directory(Directory),
Song(Song),
}
pub struct Inserter {
receiver: Receiver<Item>,
new_directories: Vec<Directory>,
new_songs: Vec<Song>,
db: DB,
}
impl Inserter {
pub fn new(db: DB, receiver: Receiver<Item>) -> Self {
let new_directories = Vec::with_capacity(INDEX_BUILDING_INSERT_BUFFER_SIZE);
let new_songs = Vec::with_capacity(INDEX_BUILDING_INSERT_BUFFER_SIZE);
Self {
db,
receiver,
new_directories,
new_songs,
}
}
pub fn insert(&mut self) {
loop {
match self.receiver.recv() {
Ok(item) => self.insert_item(item),
Err(_) => break,
}
}
}
fn insert_item(&mut self, insert: Item) {
match insert {
Item::Directory(d) => {
self.new_directories.push(d);
if self.new_directories.len() >= INDEX_BUILDING_INSERT_BUFFER_SIZE {
self.flush_directories();
}
}
Item::Song(s) => {
self.new_songs.push(s);
if self.new_songs.len() >= INDEX_BUILDING_INSERT_BUFFER_SIZE {
self.flush_songs();
}
}
};
}
fn flush_directories(&mut self) {
if self
.db
.connect()
.and_then(|connection| {
diesel::insert_into(directories::table)
.values(&self.new_directories)
.execute(&*connection) // TODO https://github.com/diesel-rs/diesel/issues/1822
.map_err(Error::new)
})
.is_err()
{
error!("Could not insert new directories in database");
}
self.new_directories.clear();
}
fn flush_songs(&mut self) {
if self
.db
.connect()
.and_then(|connection| {
diesel::insert_into(songs::table)
.values(&self.new_songs)
.execute(&*connection) // TODO https://github.com/diesel-rs/diesel/issues/1822
.map_err(Error::new)
})
.is_err()
{
error!("Could not insert new songs in database");
}
self.new_songs.clear();
}
}
impl Drop for Inserter {
fn drop(&mut self) {
if self.new_directories.len() > 0 {
self.flush_directories();
}
if self.new_songs.len() > 0 {
self.flush_songs();
}
}
}

73
src/index/update/mod.rs Normal file
View file

@ -0,0 +1,73 @@
use anyhow::*;
use diesel;
use diesel::prelude::*;
use log::{error, info};
use regex::Regex;
use std::time;
use crate::config::MiscSettings;
use crate::db::{misc_settings, DB};
use crate::vfs::VFSSource;
mod cleaner;
mod collector;
mod inserter;
mod traverser;
use cleaner::Cleaner;
use collector::Collector;
use inserter::Inserter;
use traverser::Traverser;
pub fn update(db: &DB) -> Result<()> {
let start = time::Instant::now();
info!("Beginning library index update");
let album_art_pattern = {
let connection = db.connect()?;
let settings: MiscSettings = misc_settings::table.get_result(&connection)?;
Regex::new(&settings.index_album_art_pattern)?
};
let cleaner = Cleaner::new(db.clone());
cleaner.clean()?;
let (insert_sender, insert_receiver) = crossbeam_channel::unbounded();
let inserter_db = db.clone();
let insertion_thread = std::thread::spawn(move || {
let mut inserter = Inserter::new(inserter_db, insert_receiver);
inserter.insert();
});
let (collect_sender, collect_receiver) = crossbeam_channel::unbounded();
let collector_thread = std::thread::spawn(move || {
let collector = Collector::new(collect_receiver, insert_sender, album_art_pattern);
collector.collect();
});
let vfs = db.get_vfs()?;
let traverser_thread = std::thread::spawn(move || {
let mount_points = vfs.get_mount_points();
let traverser = Traverser::new(collect_sender);
traverser.traverse(mount_points.values().map(|p| p.clone()).collect());
});
if let Err(e) = traverser_thread.join() {
error!("Error joining on traverser thread: {:?}", e);
}
if let Err(e) = collector_thread.join() {
error!("Error joining on collector thread: {:?}", e);
}
if let Err(e) = insertion_thread.join() {
error!("Error joining on inserter thread: {:?}", e);
}
info!(
"Library index update took {} seconds",
start.elapsed().as_millis() as f32 / 1000.0
);
Ok(())
}

View file

@ -0,0 +1,206 @@
use crossbeam_channel::{self, Receiver, Sender};
use log::{error, info};
use std::cmp::min;
use std::fs;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use crate::index::metadata::{self, SongTags};
#[derive(Debug)]
pub struct Song {
pub path: PathBuf,
pub metadata: SongTags,
}
#[derive(Debug)]
pub struct Directory {
pub parent: Option<PathBuf>,
pub path: PathBuf,
pub songs: Vec<Song>,
pub other_files: Vec<PathBuf>,
pub created: i32,
}
pub struct Traverser {
directory_sender: Sender<Directory>,
}
#[derive(Debug)]
struct WorkItem {
parent: Option<PathBuf>,
path: PathBuf,
}
impl Traverser {
pub fn new(directory_sender: Sender<Directory>) -> Self {
Self { directory_sender }
}
pub fn traverse(&self, roots: Vec<PathBuf>) {
let num_pending_work_items = Arc::new(AtomicUsize::new(roots.len()));
let (work_item_sender, work_item_receiver) = crossbeam_channel::unbounded();
let key = "POLARIS_NUM_TRAVERSER_THREADS";
let num_threads = std::env::var_os(key)
.map(|v| v.to_string_lossy().to_string())
.and_then(|v| usize::from_str(&v).ok())
.unwrap_or(min(num_cpus::get(), 4));
info!("Browsing collection using {} threads", num_threads);
let mut threads = Vec::new();
for _ in 0..num_threads {
let work_item_sender = work_item_sender.clone();
let work_item_receiver = work_item_receiver.clone();
let directory_sender = self.directory_sender.clone();
let num_pending_work_items = num_pending_work_items.clone();
threads.push(thread::spawn(move || {
let worker = Worker {
work_item_sender,
work_item_receiver,
directory_sender,
num_pending_work_items,
};
worker.run();
}));
}
for root in roots {
let work_item = WorkItem {
parent: None,
path: root,
};
if let Err(e) = work_item_sender.send(work_item) {
error!("Error initializing traverser: {:#?}", e);
}
}
for thread in threads {
if let Err(e) = thread.join() {
error!("Error joining on traverser worker thread: {:#?}", e);
}
}
}
}
struct Worker {
work_item_sender: Sender<WorkItem>,
work_item_receiver: Receiver<WorkItem>,
directory_sender: Sender<Directory>,
num_pending_work_items: Arc<AtomicUsize>,
}
impl Worker {
fn run(&self) {
while let Some(work_item) = self.find_work_item() {
self.process_work_item(work_item);
self.on_item_processed();
}
}
fn find_work_item(&self) -> Option<WorkItem> {
loop {
if self.is_all_work_done() {
return None;
}
{
if let Ok(w) = self
.work_item_receiver
.recv_timeout(Duration::from_millis(100))
{
return Some(w);
}
};
}
}
fn is_all_work_done(&self) -> bool {
self.num_pending_work_items.load(Ordering::SeqCst) == 0
}
fn queue_work(&self, work_item: WorkItem) {
self.num_pending_work_items.fetch_add(1, Ordering::SeqCst);
self.work_item_sender.send(work_item).unwrap();
}
fn on_item_processed(&self) {
self.num_pending_work_items.fetch_sub(1, Ordering::SeqCst);
}
fn emit_directory(&self, directory: Directory) {
self.directory_sender.send(directory).unwrap();
}
pub fn process_work_item(&self, work_item: WorkItem) {
let read_dir = match fs::read_dir(&work_item.path) {
Ok(read_dir) => read_dir,
Err(e) => {
error!(
"Directory read error for `{}`: {}",
work_item.path.display(),
e
);
return;
}
};
let mut sub_directories = Vec::new();
let mut songs = Vec::new();
let mut other_files = Vec::new();
for entry in read_dir {
let path = match entry {
Ok(ref f) => f.path(),
Err(e) => {
error!(
"File read error within `{}`: {}",
work_item.path.display(),
e
);
break;
}
};
if path.is_dir() {
sub_directories.push(path);
} else {
if let Some(metadata) = metadata::read(&path) {
songs.push(Song { path, metadata });
} else {
other_files.push(path);
}
}
}
let created = Self::get_date_created(&work_item.path).unwrap_or_default();
self.emit_directory(Directory {
path: work_item.path.to_owned(),
parent: work_item.parent.map(|p| p.to_owned()),
songs,
other_files,
created,
});
for sub_directory in sub_directories.into_iter() {
self.queue_work(WorkItem {
parent: Some(work_item.path.clone()),
path: sub_directory,
});
}
}
fn get_date_created(path: &Path) -> Option<i32> {
if let Ok(t) = fs::metadata(path).and_then(|m| m.created().or(m.modified())) {
t.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i32)
.ok()
} else {
None
}
}
}

View file

@ -5,9 +5,6 @@
extern crate diesel;
#[macro_use]
extern crate diesel_migrations;
#[cfg(feature = "profile-index")]
#[macro_use]
extern crate flamer;
use anyhow::*;
use log::{error, info};

View file

@ -23,7 +23,6 @@ pub enum AudioFormat {
OPUS,
}
#[cfg_attr(feature = "profile-index", flame)]
pub fn get_audio_format(path: &Path) -> Option<AudioFormat> {
let extension = match path.extension() {
Some(e) => e,