Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
stneng committed Mar 8, 2023
1 parent 000107c commit 420e2be
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 18 deletions.
48 changes: 31 additions & 17 deletions src/extensions/storage_macro/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,17 @@ impl crate::application::CoLink {
Ok(chunk_id)
}

async fn _delete_chunks_compatibility_mode(&self, key_name: &str) -> Result<String, Error> {
let metadata_key = format!("{}:chunk_metadata", key_name);
let chunk_len = self.read_entry(&metadata_key).await?;
let chunk_len = String::from_utf8_lossy(&chunk_len).parse::<i32>()?;
let res = self.delete_entry(&metadata_key).await?;
for i in 0..chunk_len {
self.delete_entry(&format!("{}:{}", key_name, i)).await?;
}
Ok(res)
}

async fn _chunk_lock_compatibility_mode(&self, key_name: &str) -> Result<(), Error> {
loop {
if self
Expand Down Expand Up @@ -149,11 +160,15 @@ impl crate::application::CoLink {
let metadata_key = format!("{}:chunk_metadata", key_name);
if key_name.contains('$') {
self._chunk_lock_compatibility_mode(key_name).await?;
if let Err(e) = self.create_entry(&metadata_key, b"0").await {
self._chunk_unlock_compatibility_mode(key_name).await?;
return Err(e);
}
let chunk_len = self
._store_chunks_compatibility_mode(payload, key_name)
.await?;
let res = self
.create_entry(&metadata_key, chunk_len.to_string().as_bytes())
.update_entry(&metadata_key, chunk_len.to_string().as_bytes())
.await?;
self._chunk_unlock_compatibility_mode(key_name).await?;
return Ok(res);
Expand Down Expand Up @@ -182,15 +197,19 @@ impl crate::application::CoLink {
let metadata_key = format!("{}:chunk_metadata", key_name);
if key_name.contains('$') {
self._chunk_lock_compatibility_mode(key_name).await?;
let chunk_len = self.read_entry(&metadata_key).await?;
let chunk_len = String::from_utf8_lossy(&chunk_len).parse::<i32>()?;
let mut payload = Vec::new();
for i in 0..chunk_len {
let mut res = self.read_entry(&format!("{}:{}", key_name, i)).await?;
payload.append(&mut res);
let res = async {
let chunk_len = self.read_entry(&metadata_key).await?;
let chunk_len = String::from_utf8_lossy(&chunk_len).parse::<i32>()?;
let mut payload = Vec::new();
for i in 0..chunk_len {
let mut res = self.read_entry(&format!("{}:{}", key_name, i)).await?;
payload.append(&mut res);
}
Ok::<Vec<u8>, Error>(payload)
}
.await;
self._chunk_unlock_compatibility_mode(key_name).await?;
return Ok(payload);
return res;
}
let metadata_response = self.read_entry(&metadata_key).await?;
let payload_string = String::from_utf8(metadata_response)?;
Expand All @@ -217,7 +236,7 @@ impl crate::application::CoLink {
let metadata_key = format!("{}:chunk_metadata", key_name);
if key_name.contains('$') {
self._chunk_lock_compatibility_mode(key_name).await?;
let _ = self._delete_entry_chunk(key_name).await;
let _ = self._delete_chunks_compatibility_mode(key_name).await;
let chunk_len = self
._store_chunks_compatibility_mode(payload, key_name)
.await?;
Expand Down Expand Up @@ -278,18 +297,13 @@ impl crate::application::CoLink {

#[async_recursion]
pub(crate) async fn _delete_entry_chunk(&self, key_name: &str) -> Result<String, Error> {
let metadata_key = format!("{}:chunk_metadata", key_name);
if key_name.contains('$') {
self._chunk_lock_compatibility_mode(key_name).await?;
let chunk_len = self.read_entry(&metadata_key).await?;
let chunk_len = String::from_utf8_lossy(&chunk_len).parse::<i32>()?;
let res = self.delete_entry(&metadata_key).await?;
for i in 0..chunk_len {
self.delete_entry(&format!("{}:{}", key_name, i)).await?;
}
let res = self._delete_chunks_compatibility_mode(key_name).await;
self._chunk_unlock_compatibility_mode(key_name).await?;
return Ok(res);
return res;
}
let metadata_key = format!("{}:chunk_metadata", key_name);
let lock_token = self.lock(&metadata_key).await?;
let res = self.delete_entry(&metadata_key).await;
self.unlock(lock_token).await?;
Expand Down
5 changes: 4 additions & 1 deletion tests/test_storage_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,21 @@ async fn test_crud(
.take(5e6 as usize)
.collect::<Vec<u8>>();
cl.create_entry(key_name, &payload).await?;
println!("Create");
assert!(cl.create_entry(key_name, b"").await.is_err());
let data = cl.read_entry(key_name).await?;
println!("Read");
assert_eq!(data, payload);
let new_payload = rand::thread_rng()
.sample_iter(&rand::distributions::Standard)
.take(3e6 as usize)
.collect::<Vec<u8>>();
cl.update_entry(key_name, &new_payload).await?;
println!("Update");
let data = cl.read_entry(key_name).await?;
assert_eq!(data, new_payload);
cl.delete_entry(key_name).await?;
println!("Delete");
assert!(cl.read_entry(key_name).await.is_err());
assert!(cl.delete_entry(key_name).await.is_err());
Ok(())
Expand Down Expand Up @@ -152,7 +156,6 @@ async fn test_storage_macro_fs_append(
Ok(())
}

#[ignore]
#[tokio::test]
async fn test_storage_macro_redis_chunk(
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
Expand Down

0 comments on commit 420e2be

Please sign in to comment.