From 14d44d75b1ff8f00169e9297cd4fd309dc50090f Mon Sep 17 00:00:00 2001 From: Muad Abd El Hay Date: Thu, 19 Feb 2026 17:06:36 +0100 Subject: [PATCH 1/3] Fix populate antijoin to use .proj() for correct pending key computation The antijoin that computes pending keys (`key_source - self` in `_populate_direct`, `key_source - self._target` in `jobs.refresh`, and `todo - self` in `progress`) did not project the target table to its primary key before the subtraction. When the target table has secondary (non-PK) attributes, the antijoin fails to match on primary key alone and returns all keys instead of just the unpopulated ones. This caused: - `populate(reserve_jobs=False)`: all key_source entries were iterated instead of just pending ones (mitigated by `if key in self:` check inside `_populate1`, but wasted time on large tables) - `populate(reserve_jobs=True)`: `jobs.refresh()` inserted all keys into the jobs table as 'pending', not just truly pending ones. Workers then wasted their `max_calls` budget processing already-completed entries before reaching any real work. - `progress()`: reported incorrect remaining counts in some cases Fix: add `.proj()` to the target side of all three antijoins so the subtraction matches on primary key only, consistent with how DataJoint antijoins are meant to work. Co-Authored-By: Claude Opus 4.6 --- src/datajoint/autopopulate.py | 4 +- src/datajoint/jobs.py | 2 +- tests/integration/test_autopopulate.py | 73 ++++++++++++++++++++++++++ 3 files changed, 76 insertions(+), 3 deletions(-) diff --git a/src/datajoint/autopopulate.py b/src/datajoint/autopopulate.py index 7660e43ec..bb9deea54 100644 --- a/src/datajoint/autopopulate.py +++ b/src/datajoint/autopopulate.py @@ -403,7 +403,7 @@ def _populate_direct( """ from tqdm import tqdm - keys = (self._jobs_to_do(restrictions) - self).keys() + keys = (self._jobs_to_do(restrictions) - self.proj()).keys() logger.debug("Found %d keys to populate" % len(keys)) @@ -701,7 +701,7 @@ def progress(self, *restrictions: Any, display: bool = False) -> tuple[int, int] if not common_attrs: # No common attributes - fall back to two-query method total = len(todo) - remaining = len(todo - self) + remaining = len(todo - self.proj()) else: # Build a single query that computes both total and remaining # Using LEFT JOIN with COUNT(DISTINCT) to handle 1:many relationships diff --git a/src/datajoint/jobs.py b/src/datajoint/jobs.py index 5a0eb2a86..f082af75c 100644 --- a/src/datajoint/jobs.py +++ b/src/datajoint/jobs.py @@ -370,7 +370,7 @@ def refresh( # Keys that need jobs: in key_source, not in target, not in jobs # Disable semantic_check for Job table (self) because its attributes may not have matching lineage - new_keys = (key_source - self._target).restrict(Not(self), semantic_check=False).proj() + new_keys = (key_source - self._target.proj()).restrict(Not(self), semantic_check=False).proj() new_key_list = new_keys.keys() if new_key_list: diff --git a/tests/integration/test_autopopulate.py b/tests/integration/test_autopopulate.py index 4e6290b99..8f3b6bf82 100644 --- a/tests/integration/test_autopopulate.py +++ b/tests/integration/test_autopopulate.py @@ -112,6 +112,79 @@ def test_allow_insert(clean_autopopulate, subject, experiment): experiment.insert1(key) +def test_populate_antijoin_with_secondary_attrs(clean_autopopulate, subject, experiment): + """Test that populate correctly computes pending keys via antijoin. + + Regression test for a bug where `key_source - self` returned all keys + instead of just unpopulated ones when the target table has secondary + attributes. The antijoin must match on primary key only, ignoring + secondary attributes. Without `.proj()`, the antijoin could fail to + exclude already-populated keys. + + This affected both direct mode (reserve_jobs=False) and distributed mode + (reserve_jobs=True), causing workers to waste time re-checking already + completed entries. + """ + assert subject, "root tables are empty" + assert not experiment, "table already filled?" + + total_keys = len(experiment.key_source) + assert total_keys > 0 + + # Partially populate (only 2 entries) + experiment.populate(max_calls=2) + assert len(experiment) == 2 + + # The critical test: key_source - target must return only unpopulated keys. + # Before the fix, this returned all keys (== total_keys) because the + # antijoin failed to match on PK when secondary attributes were present. + pending = experiment.key_source - experiment + assert len(pending) == total_keys - 2, ( + f"Antijoin returned {len(pending)} pending keys, expected {total_keys - 2}. " + f"key_source - target may not be matching on primary key only." + ) + + # Also verify progress() reports correct counts + remaining, total = experiment.progress() + assert total == total_keys + assert remaining == total_keys - 2 + + # Populate the rest and verify antijoin returns 0 + experiment.populate() + pending_after = experiment.key_source - experiment + assert len(pending_after) == 0, ( + f"Antijoin returned {len(pending_after)} pending keys after full populate, expected 0." + ) + + +def test_populate_distributed_antijoin(clean_autopopulate, subject, experiment): + """Test that reserve_jobs=True correctly identifies pending keys. + + When using distributed mode, jobs.refresh() must only insert truly pending + keys into the jobs table, not already-completed ones. This verifies the + antijoin in jobs.refresh() works correctly with secondary attributes. + """ + assert subject, "root tables are empty" + assert not experiment, "table already filled?" + + total_keys = len(experiment.key_source) + + # Partially populate + experiment.populate(max_calls=2) + assert len(experiment) == 2 + + # Refresh jobs — should only create entries for unpopulated keys + experiment.jobs.refresh(delay=-1) + pending_jobs = len(experiment.jobs.pending) + assert pending_jobs == total_keys - 2, ( + f"jobs.refresh() created {pending_jobs} pending jobs, expected {total_keys - 2}. " + f"The antijoin in refresh() may not be excluding already-completed keys." + ) + + # Clean up + experiment.jobs.delete_quick() + + def test_load_dependencies(prefix, connection_test): schema = dj.Schema(f"{prefix}_load_dependencies_populate", connection=connection_test) From 463618b5f3fbc2c6d9aba980c77f31e0259959bc Mon Sep 17 00:00:00 2001 From: Muad Abd El Hay Date: Fri, 20 Feb 2026 10:34:34 +0100 Subject: [PATCH 2/3] Fix test assertions and add regression test for overlapping secondary attrs - Fix assertion counts: Experiment.make() inserts fake_experiments_per_subject rows per key, so populate(max_calls=2) produces 10 rows, not 2 - Add test_populate_antijoin_overlapping_attrs: self-contained test with Sensor/ProcessedSensor tables that share secondary attribute names (num_samples, quality), reproducing the exact conditions where the antijoin fails without .proj() - Run ruff-format to fix lint Co-Authored-By: Claude Opus 4.6 --- tests/integration/test_autopopulate.py | 131 +++++++++++++++++++------ 1 file changed, 102 insertions(+), 29 deletions(-) diff --git a/tests/integration/test_autopopulate.py b/tests/integration/test_autopopulate.py index 8f3b6bf82..32377d6df 100644 --- a/tests/integration/test_autopopulate.py +++ b/tests/integration/test_autopopulate.py @@ -115,15 +115,8 @@ def test_allow_insert(clean_autopopulate, subject, experiment): def test_populate_antijoin_with_secondary_attrs(clean_autopopulate, subject, experiment): """Test that populate correctly computes pending keys via antijoin. - Regression test for a bug where `key_source - self` returned all keys - instead of just unpopulated ones when the target table has secondary - attributes. The antijoin must match on primary key only, ignoring - secondary attributes. Without `.proj()`, the antijoin could fail to - exclude already-populated keys. - - This affected both direct mode (reserve_jobs=False) and distributed mode - (reserve_jobs=True), causing workers to waste time re-checking already - completed entries. + Verifies that partial populate + antijoin gives correct pending counts. + Note: Experiment.make() inserts fake_experiments_per_subject rows per key. """ assert subject, "root tables are empty" assert not experiment, "table already filled?" @@ -131,20 +124,15 @@ def test_populate_antijoin_with_secondary_attrs(clean_autopopulate, subject, exp total_keys = len(experiment.key_source) assert total_keys > 0 - # Partially populate (only 2 entries) + # Partially populate (2 keys from key_source) experiment.populate(max_calls=2) - assert len(experiment) == 2 + assert len(experiment) == 2 * experiment.fake_experiments_per_subject - # The critical test: key_source - target must return only unpopulated keys. - # Before the fix, this returned all keys (== total_keys) because the - # antijoin failed to match on PK when secondary attributes were present. + # key_source - target must return only unpopulated keys pending = experiment.key_source - experiment - assert len(pending) == total_keys - 2, ( - f"Antijoin returned {len(pending)} pending keys, expected {total_keys - 2}. " - f"key_source - target may not be matching on primary key only." - ) + assert len(pending) == total_keys - 2, f"Antijoin returned {len(pending)} pending keys, expected {total_keys - 2}." - # Also verify progress() reports correct counts + # Verify progress() reports correct counts remaining, total = experiment.progress() assert total == total_keys assert remaining == total_keys - 2 @@ -152,17 +140,14 @@ def test_populate_antijoin_with_secondary_attrs(clean_autopopulate, subject, exp # Populate the rest and verify antijoin returns 0 experiment.populate() pending_after = experiment.key_source - experiment - assert len(pending_after) == 0, ( - f"Antijoin returned {len(pending_after)} pending keys after full populate, expected 0." - ) + assert len(pending_after) == 0, f"Antijoin returned {len(pending_after)} pending keys after full populate, expected 0." def test_populate_distributed_antijoin(clean_autopopulate, subject, experiment): """Test that reserve_jobs=True correctly identifies pending keys. When using distributed mode, jobs.refresh() must only insert truly pending - keys into the jobs table, not already-completed ones. This verifies the - antijoin in jobs.refresh() works correctly with secondary attributes. + keys into the jobs table, not already-completed ones. """ assert subject, "root tables are empty" assert not experiment, "table already filled?" @@ -171,20 +156,108 @@ def test_populate_distributed_antijoin(clean_autopopulate, subject, experiment): # Partially populate experiment.populate(max_calls=2) - assert len(experiment) == 2 + assert len(experiment) == 2 * experiment.fake_experiments_per_subject # Refresh jobs — should only create entries for unpopulated keys experiment.jobs.refresh(delay=-1) pending_jobs = len(experiment.jobs.pending) - assert pending_jobs == total_keys - 2, ( - f"jobs.refresh() created {pending_jobs} pending jobs, expected {total_keys - 2}. " - f"The antijoin in refresh() may not be excluding already-completed keys." - ) + assert pending_jobs == total_keys - 2, f"jobs.refresh() created {pending_jobs} pending jobs, expected {total_keys - 2}." # Clean up experiment.jobs.delete_quick() +def test_populate_antijoin_overlapping_attrs(prefix, connection_test): + """Regression test: antijoin with overlapping secondary attribute names. + + This reproduces the bug where `key_source - self` returns ALL keys instead + of just unpopulated ones. The condition is: + + 1. key_source returns secondary attributes (e.g., num_samples, quality) + 2. The target table has secondary attributes with the SAME NAMES + 3. The VALUES differ between source and target after populate + + Without .proj() on the target, SQL matches on ALL common column names + (including secondary attrs), so different values mean no match, and all + keys appear "pending" even after populate. + + Real-world example: LightningPoseOutput (key_source) has num_frames, + quality, processing_datetime as secondary attrs. InitialContainer (target) + also has those same-named columns with different values. + """ + test_schema = dj.Schema(f"{prefix}_antijoin_overlap", connection=connection_test) + + @test_schema + class Sensor(dj.Lookup): + definition = """ + sensor_id : int32 + --- + num_samples : int32 + quality : float + """ + contents = [ + (1, 100, 0.95), + (2, 200, 0.87), + (3, 150, 0.92), + (4, 175, 0.89), + ] + + @test_schema + class ProcessedSensor(dj.Computed): + definition = """ + -> Sensor + --- + num_samples : int32 # same name as Sensor's secondary attr + quality : float # same name as Sensor's secondary attr + result : float + """ + + @property + def key_source(self): + return Sensor() # returns sensor_id + num_samples + quality + + def make(self, key): + # Values intentionally differ from source — this is what triggers + # the bug: the antijoin tries to match on num_samples and quality + # too, and since values differ, no match is found. + self.insert1( + dict( + sensor_id=key["sensor_id"], + num_samples=key["num_samples"] * 2, + quality=round(key["quality"] + 0.05, 2), + result=key["num_samples"] * key["quality"], + ) + ) + + try: + # Partially populate (2 out of 4) + ProcessedSensor().populate(max_calls=2) + assert len(ProcessedSensor()) == 2 + + total_keys = len(ProcessedSensor().key_source) + assert total_keys == 4 + + # The critical test: populate() must correctly identify remaining keys. + # Before the fix, populate() used `key_source - self` which matched on + # num_samples and quality too, returning all 4 keys as "pending". + ProcessedSensor().populate() + assert len(ProcessedSensor()) == 4, ( + f"After full populate, expected 4 entries but got {len(ProcessedSensor())}. " + f"populate() likely re-processed already-completed keys." + ) + + # Verify progress reports 0 remaining + remaining, total = ProcessedSensor().progress() + assert remaining == 0, f"Expected 0 remaining, got {remaining}" + assert total == 4 + + # Verify antijoin with .proj() is correct + pending = ProcessedSensor().key_source - ProcessedSensor().proj() + assert len(pending) == 0 + finally: + test_schema.drop(force=True) + + def test_load_dependencies(prefix, connection_test): schema = dj.Schema(f"{prefix}_load_dependencies_populate", connection=connection_test) From 73a53dd8b6e4ce4bc3387279f5e923eba7e42ba9 Mon Sep 17 00:00:00 2001 From: Muad Abd El Hay Date: Fri, 20 Feb 2026 11:01:01 +0100 Subject: [PATCH 3/3] Fix CI: fetch source data in make(), fix Schema.drop API, remove broken distributed test - make() only receives PK columns -- fetch source data from Sensor() instead - Use Schema.drop(prompt=False) instead of drop(force=True) - Use decimal types instead of float to avoid portability warnings - Remove test_populate_distributed_antijoin: Experiment non-FK experiment_id degrades job granularity, making the assertion unreliable Co-Authored-By: Claude Opus 4.6 --- tests/integration/test_autopopulate.py | 40 ++++++-------------------- 1 file changed, 9 insertions(+), 31 deletions(-) diff --git a/tests/integration/test_autopopulate.py b/tests/integration/test_autopopulate.py index 32377d6df..c448b8a59 100644 --- a/tests/integration/test_autopopulate.py +++ b/tests/integration/test_autopopulate.py @@ -143,30 +143,6 @@ def test_populate_antijoin_with_secondary_attrs(clean_autopopulate, subject, exp assert len(pending_after) == 0, f"Antijoin returned {len(pending_after)} pending keys after full populate, expected 0." -def test_populate_distributed_antijoin(clean_autopopulate, subject, experiment): - """Test that reserve_jobs=True correctly identifies pending keys. - - When using distributed mode, jobs.refresh() must only insert truly pending - keys into the jobs table, not already-completed ones. - """ - assert subject, "root tables are empty" - assert not experiment, "table already filled?" - - total_keys = len(experiment.key_source) - - # Partially populate - experiment.populate(max_calls=2) - assert len(experiment) == 2 * experiment.fake_experiments_per_subject - - # Refresh jobs — should only create entries for unpopulated keys - experiment.jobs.refresh(delay=-1) - pending_jobs = len(experiment.jobs.pending) - assert pending_jobs == total_keys - 2, f"jobs.refresh() created {pending_jobs} pending jobs, expected {total_keys - 2}." - - # Clean up - experiment.jobs.delete_quick() - - def test_populate_antijoin_overlapping_attrs(prefix, connection_test): """Regression test: antijoin with overlapping secondary attribute names. @@ -193,7 +169,7 @@ class Sensor(dj.Lookup): sensor_id : int32 --- num_samples : int32 - quality : float + quality : decimal(4,2) """ contents = [ (1, 100, 0.95), @@ -208,8 +184,8 @@ class ProcessedSensor(dj.Computed): -> Sensor --- num_samples : int32 # same name as Sensor's secondary attr - quality : float # same name as Sensor's secondary attr - result : float + quality : decimal(4,2) # same name as Sensor's secondary attr + result : decimal(8,2) """ @property @@ -217,15 +193,17 @@ def key_source(self): return Sensor() # returns sensor_id + num_samples + quality def make(self, key): + # Fetch source data (key only contains PK after projection) + source = (Sensor() & key).fetch1() # Values intentionally differ from source — this is what triggers # the bug: the antijoin tries to match on num_samples and quality # too, and since values differ, no match is found. self.insert1( dict( sensor_id=key["sensor_id"], - num_samples=key["num_samples"] * 2, - quality=round(key["quality"] + 0.05, 2), - result=key["num_samples"] * key["quality"], + num_samples=source["num_samples"] * 2, + quality=float(source["quality"]) + 0.05, + result=float(source["num_samples"]) * float(source["quality"]), ) ) @@ -255,7 +233,7 @@ def make(self, key): pending = ProcessedSensor().key_source - ProcessedSensor().proj() assert len(pending) == 0 finally: - test_schema.drop(force=True) + test_schema.drop(prompt=False) def test_load_dependencies(prefix, connection_test):