.context("failed to parse content size")
}
+ /// Check the existence of an URL.
+ pub async fn path_exists(
+ &self,
+ datacenter: &str,
+ datastore: &str,
+ path: &str,
+ ) -> Result<bool, Error> {
+ match self
+ .make_request(|| Ok(Request::head(self.file_url(datacenter, datastore, path))))
+ .await
+ {
+ Ok(_) => Ok(true),
+ Err(err) if err.downcast_ref::<NotFound>().is_some() => Ok(false),
+ Err(err) => Err(err),
+ }
+ }
+
/// Get a `Read`able file.
pub async fn open_file(
self: &Arc<Self>,
async fn handle_lookup(&self, name: &str) -> Result<Option<u64>, Error> {
Ok(match self {
- Self::Datacenter(dc) => dc.handle_lookup(name),
+ Self::Datacenter(dc) => dc.handle_lookup(name).await?,
Self::Dir(dir) => dir.handle_lookup(name).await?,
Self::File(_) => return Err(Errno(libc::ENOTDIR).into()),
})
inode: u64,
datacenter: String,
datastores: Mutex<BTreeMap<String, u64>>,
+ active_lookups: Mutex<BTreeMap<String, watch::Receiver<Option<u64>>>>,
}
impl Datacenter {
inode,
datacenter,
datastores: Mutex::new(BTreeMap::new()),
+ active_lookups: Mutex::new(BTreeMap::new()),
}
}
dir_stat(self.inode)
}
- fn handle_lookup(&self, name: &str) -> Option<u64> {
- self.datastores.lock().unwrap().get(name).copied()
+ async fn handle_lookup(&self, name: &str) -> Result<Option<u64>, Error> {
+ let inode = self.datastores.lock().unwrap().get(name).copied();
+ Ok(match inode {
+ None => match self.lookup_new(name).await? {
+ None => None,
+ inode => inode,
+ },
+ inode => inode,
+ })
+ }
+
+ async fn lookup_new(&self, name: &str) -> Result<Option<u64>, Error> {
+ // static analysis does not understand `drop(mutex_guard)`, so this code is ugly
+ // instead...
+
+ let send = 'send: {
+ let mut active = {
+ let mut active_lookups = self.active_lookups.lock().unwrap();
+ match active_lookups.get(name).cloned() {
+ Some(active) => active,
+ None => {
+ let (send, recv) = watch::channel(None);
+ active_lookups.insert(name.to_string(), recv);
+ break 'send send;
+ }
+ }
+ };
+
+ // This will almost always get a RecvError because the sender is immediately dropped,
+ // but that's fine.
+ let _ = active.changed().await;
+ return Ok(*active.borrow());
+ };
+
+ let inode = match self.fs.client.path_exists(&self.datacenter, name, "").await {
+ Ok(true) => self.create_datastore(name).inode,
+ Ok(false) => {
+ send.send(None)?;
+ return Ok(None);
+ }
+ Err(err) => {
+ log::error!("error looking up datastore: {err:?}");
+ return Err(err);
+ }
+ };
+
+ // we need to hold the datastores lock over the active_lookups lock to make sure a negative
+ // entry lookup is not followed by a negative active-lookup lookup by another task.
+ let mut datastores = self.datastores.lock().unwrap();
+ let mut active_lookups = self.active_lookups.lock().unwrap();
+ datastores.insert(name.to_string(), inode);
+ send.send(Some(inode))?;
+ active_lookups.remove(name);
+
+ Ok(Some(inode))
}
fn handle_readdir(&self, mut readdir: requests::ReaddirPlus) -> Result<(), Error> {