Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/src/polling_monitor/arweave_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::Error;
use bytes::Bytes;
use graph::futures03::future::BoxFuture;
use graph::{
components::link_resolver::{ArweaveClient, ArweaveResolver, FileSizeLimit},
components::link_resolver::{ArweaveResolver, FileSizeLimit},
data_source::offchain::Base64,
derive::CheapClone,
prelude::CheapClone,
Expand All @@ -13,7 +13,7 @@ use tower::{buffer::Buffer, ServiceBuilder, ServiceExt};
pub type ArweaveService = Buffer<Base64, BoxFuture<'static, Result<Option<Bytes>, Error>>>;

pub fn arweave_service(
client: Arc<ArweaveClient>,
client: Arc<dyn ArweaveResolver>,
rate_limit: u16,
max_file_size: FileSizeLimit,
) -> ArweaveService {
Expand All @@ -34,7 +34,7 @@ pub fn arweave_service(

#[derive(Clone, CheapClone)]
struct ArweaveServiceInner {
client: Arc<ArweaveClient>,
client: Arc<dyn ArweaveResolver>,
max_file_size: FileSizeLimit,
}

Expand Down
64 changes: 59 additions & 5 deletions tests/src/fixture/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use graph::blockchain::{
};
use graph::cheap_clone::CheapClone;
use graph::components::link_resolver::{
ArweaveClient, ArweaveResolver, FileLinkResolver, FileSizeLimit, LinkResolverContext,
ArweaveClient, ArweaveClientError, ArweaveResolver, FileLinkResolver, FileSizeLimit,
LinkResolverContext,
};
use graph::components::metrics::MetricsRegistry;
use graph::components::network_provider::ChainName;
Expand Down Expand Up @@ -422,14 +423,46 @@ pub struct TestInfo {
pub hash: DeploymentHash,
}

#[derive(Debug)]
pub struct StaticArweaveResolver {
content: HashMap<String, Vec<u8>>,
}

impl StaticArweaveResolver {
pub fn new(content: HashMap<String, Vec<u8>>) -> Self {
Self { content }
}
}

#[async_trait]
impl ArweaveResolver for StaticArweaveResolver {
async fn get(
&self,
file: &graph::data_source::offchain::Base64,
) -> Result<Vec<u8>, ArweaveClientError> {
self.get_with_limit(file, &FileSizeLimit::Unlimited).await
}

async fn get_with_limit(
&self,
file: &graph::data_source::offchain::Base64,
_limit: &FileSizeLimit,
) -> Result<Vec<u8>, ArweaveClientError> {
self.content
.get(file.as_str())
.cloned()
.ok_or(ArweaveClientError::UnableToCheckFileSize)
}
}

pub async fn setup<C: Blockchain>(
test_info: &TestInfo,
stores: &Stores,
chain: &impl TestChainTrait<C>,
graft_block: Option<BlockPtr>,
env_vars: Option<EnvVars>,
) -> TestContext {
setup_inner(test_info, stores, chain, graft_block, env_vars, None).await
setup_inner(test_info, stores, chain, graft_block, env_vars, None, None).await
}

pub async fn setup_with_file_link_resolver<C: Blockchain>(
Expand All @@ -449,6 +482,27 @@ pub async fn setup_with_file_link_resolver<C: Blockchain>(
graft_block,
env_vars,
Some(link_resolver),
None,
)
.await
}

pub async fn setup_with_arweave_resolver<C: Blockchain>(
test_info: &TestInfo,
stores: &Stores,
chain: &impl TestChainTrait<C>,
graft_block: Option<BlockPtr>,
env_vars: Option<EnvVars>,
arweave_resolver: Arc<dyn ArweaveResolver>,
) -> TestContext {
setup_inner(
test_info,
stores,
chain,
graft_block,
env_vars,
None,
Some(arweave_resolver),
)
.await
}
Expand All @@ -460,6 +514,7 @@ pub async fn setup_inner<C: Blockchain>(
graft_block: Option<BlockPtr>,
env_vars: Option<EnvVars>,
link_resolver: Option<Arc<dyn LinkResolver>>,
arweave_resolver: Option<Arc<dyn ArweaveResolver>>,
) -> TestContext {
let env_vars = Arc::new(match env_vars {
Some(ev) => ev,
Expand Down Expand Up @@ -506,7 +561,8 @@ pub async fn setup_inner<C: Blockchain>(
env_vars.mappings.ipfs_request_limit,
);

let arweave_resolver = Arc::new(ArweaveClient::default());
let arweave_resolver: Arc<dyn ArweaveResolver> =
arweave_resolver.unwrap_or_else(|| Arc::new(ArweaveClient::default()));
let arweave_service = arweave_service(
arweave_resolver.cheap_clone(),
env_vars.mappings.ipfs_request_limit,
Expand Down Expand Up @@ -603,8 +659,6 @@ pub async fn setup_inner<C: Blockchain>(
.await
.expect("failed to create subgraph version");

let arweave_resolver = Arc::new(ArweaveClient::default());

TestContext {
logger: logger_factory.subgraph_logger(&deployment),
provider: subgraph_provider,
Expand Down
32 changes: 16 additions & 16 deletions tests/tests/runner_tests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::marker::PhantomData;
use std::str::FromStr;
use std::sync::atomic::{self, AtomicBool};
Expand All @@ -23,8 +24,8 @@ use graph_tests::fixture::ethereum::{
};

use graph_tests::fixture::{
self, test_ptr, test_ptr_reorged, MockAdapterSelector, NoopAdapterSelector, TestChainTrait,
TestContext, TestInfo,
self, test_ptr, test_ptr_reorged, MockAdapterSelector, NoopAdapterSelector,
StaticArweaveResolver, TestChainTrait, TestContext, TestInfo,
};
use graph_tests::recipe::{build_subgraph_with_pnpm_cmd_and_arg, RunnerTestRecipe};
use slog::{o, Discard, Logger};
Expand Down Expand Up @@ -1141,23 +1142,22 @@ async fn arweave_file_data_sources() {
// HASH used in the mappings.
let id = "8APeQ5lW0-csTcBaGdPBDLAL2ci2AT9pTn2tppGPU_8";

// This test assumes the file data sources will be processed in the same block in which they are
// created. But the test might fail due to a race condition if for some reason it takes longer
// than expected to fetch the file from arweave. The sleep here will conveniently happen after the
// data source is added to the offchain monitor but before the monitor is checked, in an an
// attempt to ensure the monitor has enough time to fetch the file.
let adapter_selector = NoopAdapterSelector {
x: PhantomData,
triggers_in_block_sleep: Duration::from_millis(1500),
};
let chain = chain(
&test_info.test_name,
blocks.clone(),
// Use a mock arweave resolver to avoid real network calls and eliminate the
// race condition that caused this test to be flaky.
let arweave_content: HashMap<String, Vec<u8>> =
HashMap::from([(id.to_string(), b"test arweave content".to_vec())]);
let arweave_resolver = Arc::new(StaticArweaveResolver::new(arweave_content));

let chain = chain(&test_info.test_name, blocks.clone(), &stores, None).await;
let ctx = fixture::setup_with_arweave_resolver(
&test_info,
&stores,
Some(Arc::new(adapter_selector)),
&chain,
None,
None,
arweave_resolver,
)
.await;
let ctx = fixture::setup(&test_info, &stores, &chain, None, None).await;
ctx.start_and_sync_to(test_ptr(2)).await;

let store = ctx.store.cheap_clone();
Expand Down