fs: Arc<FsBase>,
inode: u64,
datacenter: String,
- datastores: Mutex<BTreeMap<String, u64>>,
- active_lookups: Mutex<BTreeMap<String, watch::Receiver<Option<u64>>>>,
+ entries: InodeEntries,
+}
+
+impl LookupEntry for Datacenter {
+ fn lookup_new_entry<'a>(
+ &'a self,
+ name: &'a str,
+ ) -> Pin<Box<(dyn Future<Output = Result<Inode, Error>> + Send + Sync + 'a)>> {
+ Box::pin(self.do_lookup(name))
+ }
}
impl Datacenter {
fn new(fs: Arc<FsBase>, inode: u64, datacenter: String) -> Self {
Self {
+ entries: InodeEntries::new(Arc::clone(&fs)),
+
fs,
inode,
datacenter,
- datastores: Mutex::new(BTreeMap::new()),
- active_lookups: Mutex::new(BTreeMap::new()),
}
}
- pub fn create_datastore(&self, name: &str) -> Arc<Dir> {
- let mut datastores = self.datastores.lock().unwrap();
+ pub fn do_create_datastore(&self, name: &str) -> Arc<Dir> {
+ let mut datastores = self.entries.entries.lock().unwrap();
if let Some(inode) = datastores.get(name).copied() {
match self.fs.inodes.lock().unwrap().get(&inode).unwrap() {
Inode::Dir(dir) => return Arc::clone(dir),
String::new(),
));
+ datastores.insert(name.to_string(), inode);
+
+ dir
+ }
+
+ pub fn create_datastore(&self, name: &str) -> Arc<Dir> {
+ let dir = self.do_create_datastore(name);
+
self.fs
.inodes
.lock()
.unwrap()
- .insert(inode, Inode::Dir(Arc::clone(&dir)));
-
- datastores.insert(name.to_string(), inode);
+ .insert(dir.inode, Inode::Dir(Arc::clone(&dir)));
dir
}
}
async fn handle_lookup(&self, name: &str) -> Result<Option<u64>, Error> {
- let inode = self.datastores.lock().unwrap().get(name).copied();
- Ok(match inode {
- None => match self.lookup_new(name).await? {
- None => None,
- inode => inode,
- },
- inode => inode,
- })
+ self.entries.handle_lookup(name, self).await
}
- async fn lookup_new(&self, name: &str) -> Result<Option<u64>, Error> {
- // static analysis does not understand `drop(mutex_guard)`, so this code is ugly
- // instead...
-
- let send = 'send: {
- let mut active = {
- let mut active_lookups = self.active_lookups.lock().unwrap();
- match active_lookups.get(name).cloned() {
- Some(active) => active,
- None => {
- let (send, recv) = watch::channel(None);
- active_lookups.insert(name.to_string(), recv);
- break 'send send;
- }
- }
- };
-
- // This will almost always get a RecvError because the sender is immediately dropped,
- // but that's fine.
- let _ = active.changed().await;
- return Ok(*active.borrow());
- };
-
- let inode = match self.fs.client.path_exists(&self.datacenter, name, "").await {
- Ok(true) => self.create_datastore(name).inode,
- Ok(false) => {
- send.send(None)?;
- return Ok(None);
- }
- Err(err) => {
- log::error!("error looking up datastore: {err:?}");
- return Err(err);
- }
- };
-
- // we need to hold the datastores lock over the active_lookups lock to make sure a negative
- // entry lookup is not followed by a negative active-lookup lookup by another task.
- let mut datastores = self.datastores.lock().unwrap();
- let mut active_lookups = self.active_lookups.lock().unwrap();
- datastores.insert(name.to_string(), inode);
- send.send(Some(inode))?;
- active_lookups.remove(name);
-
- Ok(Some(inode))
+ async fn do_lookup(&self, name: &str) -> Result<Inode, Error> {
+ if self
+ .fs
+ .client
+ .path_exists(&self.datacenter, name, "")
+ .await?
+ {
+ Ok(Inode::Dir(self.do_create_datastore(name)))
+ } else {
+ Err(NotFound.into())
+ }
}
fn handle_readdir(&self, mut readdir: requests::ReaddirPlus) -> Result<(), Error> {
- let datastores = self.datastores.lock().unwrap();
+ let datastores = self.entries.entries.lock().unwrap();
for (count, (name, inode)) in datastores.iter().skip(readdir.offset as usize).enumerate() {
if readdir