use proxmox_http::uri::build_authority;
use proxmox_async::broadcast_future::BroadcastFuture;
-use pbs_api_types::{Authid, Userid};
+use pbs_api_types::{Authid, Userid, RateLimitConfig};
use pbs_tools::json::json_object_to_query;
use pbs_tools::ticket;
use pbs_tools::percent_encoding::DEFAULT_ENCODE_SET;
ticket_cache: bool,
fingerprint_cache: bool,
verify_cert: bool,
- rate_limit: Option<u64>,
- bucket_size: Option<u64>,
+ limit: RateLimitConfig,
}
impl HttpClientOptions {
self
}
- pub fn rate_limit(mut self, rate_limit: Option<u64>) -> Self {
- self.rate_limit = rate_limit;
- self
- }
-
- pub fn bucket_size(mut self, bucket_size: Option<u64>) -> Self {
- self.bucket_size = bucket_size;
+ pub fn rate_limit(mut self, rate_limit: RateLimitConfig) -> Self {
+ self.limit = rate_limit;
self
}
}
ticket_cache: false,
fingerprint_cache: false,
verify_cert: true,
- rate_limit: None,
- bucket_size: None,
+ limit: RateLimitConfig::default(), // unlimited
}
}
}
httpc.set_connect_timeout(Some(std::time::Duration::new(10, 0)));
let mut https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
- if let Some(rate_limit) = options.rate_limit {
- let bucket_size = options.bucket_size.unwrap_or_else(|| rate_limit*3);
- https.set_read_limiter(Some(Arc::new(Mutex::new(RateLimiter::new(rate_limit, bucket_size)))));
- https.set_write_limiter(Some(Arc::new(Mutex::new(RateLimiter::new(rate_limit, bucket_size)))));
+ if let Some(rate_in) = options.limit.rate_in {
+ let burst_in = options.limit.burst_in.unwrap_or_else(|| rate_in).as_u64();
+ https.set_read_limiter(Some(Arc::new(Mutex::new(
+ RateLimiter::new(rate_in.as_u64(), burst_in)
+ ))));
+ }
+
+ if let Some(rate_out) = options.limit.rate_out {
+ let burst_out = options.limit.burst_out.unwrap_or_else(|| rate_out).as_u64();
+ https.set_write_limiter(Some(Arc::new(Mutex::new(
+ RateLimiter::new(rate_out.as_u64(), burst_out)
+ ))));
}
let client = Client::builder()
use proxmox_router::cli::{complete_file_name, shellword_split};
use proxmox::tools::fs::file_get_json;
-use pbs_api_types::{BACKUP_REPO_URL, Authid, UserWithTokens};
+use pbs_api_types::{BACKUP_REPO_URL, Authid, RateLimitConfig, UserWithTokens};
use pbs_datastore::BackupDir;
use pbs_tools::json::json_object_to_query;
}
pub fn connect(repo: &BackupRepository) -> Result<HttpClient, Error> {
- connect_do(repo.host(), repo.port(), repo.auth_id(), None, None)
+ let rate_limit = RateLimitConfig::default(); // unlimited
+ connect_do(repo.host(), repo.port(), repo.auth_id(), rate_limit)
.map_err(|err| format_err!("error building client for repository {} - {}", repo, err))
}
pub fn connect_rate_limited(
repo: &BackupRepository,
- rate: Option<u64>,
- bucket_size: Option<u64>,
+ rate_limit: RateLimitConfig,
) -> Result<HttpClient, Error> {
- connect_do(repo.host(), repo.port(), repo.auth_id(), rate, bucket_size)
+ connect_do(repo.host(), repo.port(), repo.auth_id(), rate_limit)
.map_err(|err| format_err!("error building client for repository {} - {}", repo, err))
}
server: &str,
port: u16,
auth_id: &Authid,
- rate_limit: Option<u64>,
- bucket_size: Option<u64>,
+ rate_limit: RateLimitConfig,
) -> Result<HttpClient, Error> {
let fingerprint = std::env::var(ENV_VAR_PBS_FINGERPRINT).ok();
let password = get_secret_from_env(ENV_VAR_PBS_PASSWORD)?;
let options = HttpClientOptions::new_interactive(password, fingerprint)
- .rate_limit(rate_limit)
- .bucket_size(bucket_size);
+ .rate_limit(rate_limit);
HttpClient::new(server, port, auth_id, options)
}
use pbs_api_types::{
BACKUP_ID_SCHEMA, BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA,
TRAFFIC_CONTROL_BURST_SCHEMA, TRAFFIC_CONTROL_RATE_SCHEMA,
- Authid, CryptMode, Fingerprint, GroupListItem, PruneListItem, PruneOptions,
- SnapshotListItem, StorageStatus,
+ Authid, CryptMode, Fingerprint, GroupListItem, HumanByte,
+ PruneListItem, PruneOptions, RateLimitConfig, SnapshotListItem,
+ StorageStatus,
};
use pbs_client::{
BACKUP_SOURCE_SCHEMA,
verify_chunk_size(size)?;
}
- let rate_limit = param["rate"].as_u64();
- let bucket_size = param["burst"].as_u64();
+ let rate = match param["rate"].as_str() {
+ Some(s) => Some(s.parse::<HumanByte>()?),
+ None => None,
+ };
+ let burst = match param["burst"].as_str() {
+ Some(s) => Some(s.parse::<HumanByte>()?),
+ None => None,
+ };
+
+ let rate_limit = RateLimitConfig::with_same_inout(rate, burst);
let crypto = crypto_parameters(¶m)?;
let backup_time = backup_time_opt.unwrap_or_else(epoch_i64);
- let client = connect_rate_limited(&repo, rate_limit, bucket_size)?;
+ let client = connect_rate_limited(&repo, rate_limit)?;
record_repository(&repo);
println!("Starting backup: {}/{}/{}", backup_type, backup_id, BackupDir::backup_time_to_string(backup_time)?);
let archive_name = json::required_string_param(¶m, "archive-name")?;
- let rate_limit = param["rate"].as_u64();
- let bucket_size = param["burst"].as_u64();
+ let rate = match param["rate"].as_str() {
+ Some(s) => Some(s.parse::<HumanByte>()?),
+ None => None,
+ };
+ let burst = match param["burst"].as_str() {
+ Some(s) => Some(s.parse::<HumanByte>()?),
+ None => None,
+ };
+
+ let rate_limit = RateLimitConfig::with_same_inout(rate, burst);
- let client = connect_rate_limited(&repo, rate_limit, bucket_size)?;
+ let client = connect_rate_limited(&repo, rate_limit)?;
record_repository(&repo);
let path = json::required_string_param(¶m, "snapshot")?;
use pbs_api_types::{
REMOTE_ID_SCHEMA, REMOTE_PASSWORD_SCHEMA, Remote, RemoteConfig, RemoteConfigUpdater,
Authid, PROXMOX_CONFIG_DIGEST_SCHEMA, DATASTORE_SCHEMA, GroupListItem,
- DataStoreListItem, SyncJobConfig, PRIV_REMOTE_AUDIT, PRIV_REMOTE_MODIFY,
+ DataStoreListItem, RateLimitConfig, SyncJobConfig, PRIV_REMOTE_AUDIT, PRIV_REMOTE_MODIFY,
};
use pbs_config::sync;
}
/// Helper to get client for remote.cfg entry
-pub async fn remote_client(remote: &Remote) -> Result<HttpClient, Error> {
- let options = HttpClientOptions::new_non_interactive(remote.password.clone(), remote.config.fingerprint.clone());
+pub async fn remote_client(
+ remote: &Remote,
+ limit: Option<RateLimitConfig>,
+) -> Result<HttpClient, Error> {
+ let mut options = HttpClientOptions::new_non_interactive(remote.password.clone(), remote.config.fingerprint.clone());
+
+ if let Some(limit) = limit {
+ options = options.rate_limit(limit);
+ }
let client = HttpClient::new(
&remote.config.host,
api_err)
};
- let client = remote_client(&remote)
+ let client = remote_client(&remote, None)
.await
.map_err(map_remote_err)?;
let api_res = client
api_err)
};
- let client = remote_client(&remote)
+ let client = remote_client(&remote, None)
.await
.map_err(map_remote_err)?;
let api_res = client
use proxmox_sys::task_log;
use pbs_api_types::{
- Authid, SyncJobConfig, GroupFilter, GROUP_FILTER_LIST_SCHEMA,
+ Authid, SyncJobConfig, GroupFilter, RateLimitConfig, GROUP_FILTER_LIST_SCHEMA,
DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ,
};
sync_job.owner.as_ref().unwrap_or_else(|| Authid::root_auth_id()).clone(),
sync_job.remove_vanished,
sync_job.group_filter.clone(),
+ sync_job.limit.clone(),
)
}
}
schema: GROUP_FILTER_LIST_SCHEMA,
optional: true,
},
+ limit: {
+ type: RateLimitConfig,
+ flatten: true,
+ }
},
},
access: {
remote_store: String,
remove_vanished: Option<bool>,
group_filter: Option<Vec<GroupFilter>>,
+ limit: RateLimitConfig,
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<String, Error> {
auth_id.clone(),
remove_vanished,
group_filter,
+ limit,
)?;
let client = pull_params.client().await?;
use proxmox_router::HttpError;
use proxmox_sys::task_log;
-use pbs_api_types::{Authid, GroupFilter, GroupListItem, Remote, SnapshotListItem};
+use pbs_api_types::{
+ Authid, GroupFilter, GroupListItem, RateLimitConfig, Remote,
+ SnapshotListItem,
+};
use pbs_datastore::{BackupDir, BackupInfo, BackupGroup, DataStore, StoreProgress};
use pbs_datastore::data_blob::DataBlob;
owner: Authid,
remove_vanished: bool,
group_filter: Option<Vec<GroupFilter>>,
+ limit: RateLimitConfig,
}
impl PullParameters {
owner: Authid,
remove_vanished: Option<bool>,
group_filter: Option<Vec<GroupFilter>>,
+ limit: RateLimitConfig,
) -> Result<Self, Error> {
let store = DataStore::lookup_datastore(store)?;
remote_store.to_string(),
);
- Ok(Self { remote, source, store, owner, remove_vanished, group_filter })
+ Ok(Self { remote, source, store, owner, remove_vanished, group_filter, limit })
}
pub async fn client(&self) -> Result<HttpClient, Error> {
- crate::api2::config::remote::remote_client(&self.remote).await
+ crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
}
}