From 3b6785932ca8fdb3cc71adce8be557ca217bbae7 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 17 Feb 2026 11:12:31 -0800 Subject: [PATCH 01/13] added tests for issue --- .../tests/unit/test_reader_v1.py | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1.py b/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1.py index e2e8870d0d00..1442b93e19b3 100644 --- a/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1.py +++ b/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1.py @@ -622,3 +622,81 @@ def test_to_dataframe_by_page(class_under_test, mock_gapic_client): drop=True ), ) + + +def test_to_dataframe_avro_no_lost_records(class_under_test, mock_gapic_client): + """Verify that to_dataframe() does not lose records for Avro streams. + See: https://github.com/googleapis/google-cloud-python/issues/14900 + """ + bq_columns = [ + {"name": "name", "type": "string"}, + {"name": "number", "type": "int64"}, + ] + avro_schema = _bq_to_avro_schema(bq_columns) + + # 2 pages of data + bq_blocks = [ + [{"name": "a", "number": 1}, {"name": "b", "number": 2}], + [{"name": "c", "number": 3}, {"name": "d", "number": 4}], + ] + + avro_blocks = _bq_to_avro_blocks(bq_blocks, avro_schema) + mock_gapic_client.read_rows.return_value = iter(avro_blocks) + + reader = class_under_test(mock_gapic_client, "name", 0, {}) + df = reader.to_dataframe() + + assert len(df) == 4 + assert df["name"].tolist() == ["a", "b", "c", "d"] + + +def test_to_dataframe_empty_stream_no_session(class_under_test, mock_gapic_client): + """Verify that to_dataframe() handles empty streams without a session safely. + See: https://github.com/googleapis/google-cloud-python/issues/14900 + """ + mock_gapic_client.read_rows.return_value = iter([]) + + reader = class_under_test(mock_gapic_client, "name", 0, {}) + df = reader.to_dataframe() + assert len(df) == 0 + assert isinstance(df, pandas.DataFrame) + + +def test_to_dataframe_empty_stream_with_session(class_under_test, mock_gapic_client): + """Verify that to_dataframe() handles empty streams with a session correctly. + See: https://github.com/googleapis/google-cloud-python/issues/14900 + """ + bq_columns = [{"name": "name", "type": "string"}] + avro_schema = _bq_to_avro_schema(bq_columns) + read_session = _generate_avro_read_session(avro_schema) + + mock_gapic_client.read_rows.return_value = iter([]) + + reader = class_under_test(mock_gapic_client, "name", 0, {}) + it = reader.rows(read_session=read_session) + df = it.to_dataframe() + + assert len(df) == 0 + assert df.columns.tolist() == ["name"] + + +def test_to_arrow_avro_consumes_first_page(class_under_test, mock_gapic_client): + """Verify that to_arrow() consumes the first page of an Avro stream if format is unknown. + See: https://github.com/googleapis/google-cloud-python/issues/14900 + """ + bq_columns = [{"name": "name", "type": "string"}] + avro_schema = _bq_to_avro_schema(bq_columns) + bq_blocks = [[{"name": "a"}], [{"name": "b"}]] + avro_blocks = _bq_to_avro_blocks(bq_blocks, avro_schema) + + mock_gapic_client.read_rows.return_value = iter(avro_blocks) + + reader = class_under_test(mock_gapic_client, "name", 0, {}) + it = reader.rows() + + with pytest.raises(NotImplementedError): + it.to_arrow() + + # Since read_session was not provided, to_arrow() had to consume the first message + # to find out it was Avro. So offset should be 1. + assert reader._offset == 1 From 052d9d5c53d831e5335bccd7c98664c9ce5ede83 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 17 Feb 2026 11:23:56 -0800 Subject: [PATCH 02/13] added fix --- .../cloud/bigquery_storage_v1/reader.py | 96 ++++++++++++++----- 1 file changed, 71 insertions(+), 25 deletions(-) diff --git a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py index f6f52fbfd6ae..ff00fead6e78 100644 --- a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py +++ b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py @@ -356,16 +356,28 @@ def to_arrow(self): pyarrow.Table: A table of all rows in the stream. """ + if self._stream_parser is not None and not isinstance( + self._stream_parser, _ArrowStreamParser + ): + raise NotImplementedError("to_arrow not implemented for this stream format.") + record_batches = [] - for page in self.pages: - record_batches.append(page.to_arrow()) + pages = self.pages + try: + first_page = next(pages) + except StopIteration: + # No data, return an empty Table. + if self._stream_parser is None: + return pyarrow.Table.from_batches([], schema=pyarrow.schema([])) - if record_batches: - return pyarrow.Table.from_batches(record_batches) + self._stream_parser._parse_arrow_schema() + return pyarrow.Table.from_batches([], schema=self._stream_parser._schema) - # No data, return an empty Table. - self._stream_parser._parse_arrow_schema() - return pyarrow.Table.from_batches([], schema=self._stream_parser._schema) + record_batches.append(first_page.to_arrow()) + for page in pages: + record_batches.append(page.to_arrow()) + + return pyarrow.Table.from_batches(record_batches) def to_dataframe(self, dtypes=None): """Create a :class:`pandas.DataFrame` of all rows in the stream. @@ -396,27 +408,60 @@ def to_dataframe(self, dtypes=None): if dtypes is None: dtypes = {} - # If it's an Arrow stream, calling to_arrow, then converting to a - # pandas dataframe is about 2x faster. This is because pandas.concat is - # rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is - # usually no-copy. + # If we already know the format, we can optimize. + if self._stream_parser is not None: + if not isinstance(self._stream_parser, _ArrowStreamParser): + # Known to be Avro (or other non-Arrow format) + frames = [page.to_dataframe(dtypes=dtypes) for page in self.pages] + if frames: + return pandas.concat(frames) + return self._empty_dataframe_from_avro(dtypes) + # Known to be Arrow, we can proceed to use to_arrow optimization below. + + pages = self.pages + try: + first_page = next(pages) + except StopIteration: + # No data, construct an empty dataframe with columns matching the schema. + if self._stream_parser is not None: + try: + self._stream_parser._parse_arrow_schema() + except NotImplementedError: + # It's not Arrow, so it must be Avro (or something else). + return self._empty_dataframe_from_avro(dtypes) + else: + # It's Arrow. + df = pyarrow.Table.from_batches( + [], schema=self._stream_parser._schema + ).to_pandas() + for column in dtypes: + df[column] = pandas.Series(df[column], dtype=dtypes[column]) + return df + else: + return pandas.DataFrame() + + # Try Arrow optimization. try: - record_batch = self.to_arrow() + first_batch = first_page.to_arrow() except NotImplementedError: - pass - else: - df = record_batch.to_pandas() - for column in dtypes: - df[column] = pandas.Series(df[column], dtype=dtypes[column]) - return df + # Not an Arrow stream, use Avro path. + frames = [first_page.to_dataframe(dtypes=dtypes)] + for page in pages: + frames.append(page.to_dataframe(dtypes=dtypes)) + return pandas.concat(frames) - frames = [page.to_dataframe(dtypes=dtypes) for page in self.pages] + # It IS an Arrow stream. + record_batches = [first_batch] + for page in pages: + record_batches.append(page.to_arrow()) - if frames: - return pandas.concat(frames) + table = pyarrow.Table.from_batches(record_batches) + df = table.to_pandas() + for column in dtypes: + df[column] = pandas.Series(df[column], dtype=dtypes[column]) + return df - # No data, construct an empty dataframe with columns matching the schema. - # The result should be consistent with what an empty ARROW stream would produce. + def _empty_dataframe_from_avro(self, dtypes): self._stream_parser._parse_avro_schema() schema = self._stream_parser._avro_schema_json @@ -447,8 +492,9 @@ def _dtypes_from_avro(self, avro_fields): for field_info in avro_fields: # If a type is an union of multiple types, pick the first type # that is not "null". - if isinstance(field_info["type"], list): - type_info = next(item for item in field_info["type"] if item != "null") + type_info = field_info["type"] + if isinstance(type_info, list): + type_info = next(item for item in type_info if item != "null") if isinstance(type_info, str): field_dtype = type_map.get(type_info, "object") From c6960f09b8ad467bcc9633ecba0fa0188fe58504 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 17 Feb 2026 17:45:23 -0800 Subject: [PATCH 03/13] simplified code --- .../cloud/bigquery_storage_v1/reader.py | 71 +++++++------------ 1 file changed, 26 insertions(+), 45 deletions(-) diff --git a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py index ff00fead6e78..c0888d5a33d8 100644 --- a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py +++ b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py @@ -356,28 +356,19 @@ def to_arrow(self): pyarrow.Table: A table of all rows in the stream. """ - if self._stream_parser is not None and not isinstance( - self._stream_parser, _ArrowStreamParser - ): - raise NotImplementedError("to_arrow not implemented for this stream format.") - record_batches = [] - pages = self.pages - try: - first_page = next(pages) - except StopIteration: - # No data, return an empty Table. - if self._stream_parser is None: - return pyarrow.Table.from_batches([], schema=pyarrow.schema([])) + for page in self.pages: + record_batches.append(page.to_arrow()) - self._stream_parser._parse_arrow_schema() - return pyarrow.Table.from_batches([], schema=self._stream_parser._schema) + if record_batches: + return pyarrow.Table.from_batches(record_batches) - record_batches.append(first_page.to_arrow()) - for page in pages: - record_batches.append(page.to_arrow()) + # No data, return an empty Table. + if self._stream_parser is None: + return pyarrow.Table.from_batches([], schema=pyarrow.schema([])) - return pyarrow.Table.from_batches(record_batches) + self._stream_parser._parse_arrow_schema() + return pyarrow.Table.from_batches([], schema=self._stream_parser._schema) def to_dataframe(self, dtypes=None): """Create a :class:`pandas.DataFrame` of all rows in the stream. @@ -408,37 +399,11 @@ def to_dataframe(self, dtypes=None): if dtypes is None: dtypes = {} - # If we already know the format, we can optimize. - if self._stream_parser is not None: - if not isinstance(self._stream_parser, _ArrowStreamParser): - # Known to be Avro (or other non-Arrow format) - frames = [page.to_dataframe(dtypes=dtypes) for page in self.pages] - if frames: - return pandas.concat(frames) - return self._empty_dataframe_from_avro(dtypes) - # Known to be Arrow, we can proceed to use to_arrow optimization below. - pages = self.pages try: first_page = next(pages) except StopIteration: - # No data, construct an empty dataframe with columns matching the schema. - if self._stream_parser is not None: - try: - self._stream_parser._parse_arrow_schema() - except NotImplementedError: - # It's not Arrow, so it must be Avro (or something else). - return self._empty_dataframe_from_avro(dtypes) - else: - # It's Arrow. - df = pyarrow.Table.from_batches( - [], schema=self._stream_parser._schema - ).to_pandas() - for column in dtypes: - df[column] = pandas.Series(df[column], dtype=dtypes[column]) - return df - else: - return pandas.DataFrame() + return self._empty_dataframe(dtypes) # Try Arrow optimization. try: @@ -461,6 +426,21 @@ def to_dataframe(self, dtypes=None): df[column] = pandas.Series(df[column], dtype=dtypes[column]) return df + def _empty_dataframe(self, dtypes): + if self._stream_parser is None: + return pandas.DataFrame() + + if isinstance(self._stream_parser, _ArrowStreamParser): + self._stream_parser._parse_arrow_schema() + df = pyarrow.Table.from_batches( + [], schema=self._stream_parser._schema + ).to_pandas() + for column in dtypes: + df[column] = pandas.Series(df[column], dtype=dtypes[column]) + return df + + return self._empty_dataframe_from_avro(dtypes) + def _empty_dataframe_from_avro(self, dtypes): self._stream_parser._parse_avro_schema() schema = self._stream_parser._avro_schema_json @@ -498,6 +478,7 @@ def _dtypes_from_avro(self, avro_fields): if isinstance(type_info, str): field_dtype = type_map.get(type_info, "object") + else: logical_type = type_info.get("logicalType") if logical_type == "timestamp-micros": From b6471746406e6ea84179787442ebe43eedf8e6cb Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 17 Feb 2026 18:09:07 -0800 Subject: [PATCH 04/13] restructured code --- .../cloud/bigquery_storage_v1/reader.py | 31 ++++++++++++++++--- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py index c0888d5a33d8..9761c6ccec0d 100644 --- a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py +++ b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py @@ -16,6 +16,7 @@ import collections import io +import itertools import json import time @@ -357,6 +358,9 @@ def to_arrow(self): A table of all rows in the stream. """ record_batches = [] + # We iterate over pages first. This is important because the stream + # parser (and thus the schema) might only be initialized after processing + # the first page if a read_session wasn't provided. for page in self.pages: record_batches.append(page.to_arrow()) @@ -399,20 +403,30 @@ def to_dataframe(self, dtypes=None): if dtypes is None: dtypes = {} + # Use a "peek" strategy to check the first page. This allows us to + # determine if the stream is Arrow or Avro without consuming the + # first page from the iterator, which would cause data loss if we + # failed to use the Arrow optimization and had to fall back to the + # generic to_dataframe() method. pages = self.pages try: first_page = next(pages) except StopIteration: return self._empty_dataframe(dtypes) - # Try Arrow optimization. + # Optimization: If it's an Arrow stream, calling to_arrow, then converting to a + # pandas dataframe is about 2x faster. This is because pandas.concat is + # rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is + # usually no-copy. try: first_batch = first_page.to_arrow() except NotImplementedError: - # Not an Arrow stream, use Avro path. - frames = [first_page.to_dataframe(dtypes=dtypes)] - for page in pages: - frames.append(page.to_dataframe(dtypes=dtypes)) + # Not an Arrow stream (or parser doesn't support to_arrow), so use + # the Avro path (or other generic path). + frames = [ + page.to_dataframe(dtypes=dtypes) + for page in itertools.chain([first_page], pages) + ] return pandas.concat(frames) # It IS an Arrow stream. @@ -427,6 +441,13 @@ def to_dataframe(self, dtypes=None): return df def _empty_dataframe(self, dtypes): + """Create an empty DataFrame with the correct schema. + + This handles cases where the stream is empty but we still need to + return a DataFrame with the correct columns and types. It handles + both Arrow and Avro parsers, as well as the case where no parser + is initialized. + """ if self._stream_parser is None: return pandas.DataFrame() From 0470297dafd93c7aba9c82821e5b2c48131545d2 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 17 Feb 2026 18:17:56 -0800 Subject: [PATCH 05/13] cleaned up code --- .../cloud/bigquery_storage_v1/reader.py | 67 ++++++++----------- 1 file changed, 27 insertions(+), 40 deletions(-) diff --git a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py index 9761c6ccec0d..e3452cacd6cc 100644 --- a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py +++ b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py @@ -358,9 +358,6 @@ def to_arrow(self): A table of all rows in the stream. """ record_batches = [] - # We iterate over pages first. This is important because the stream - # parser (and thus the schema) might only be initialized after processing - # the first page if a read_session wasn't provided. for page in self.pages: record_batches.append(page.to_arrow()) @@ -403,43 +400,36 @@ def to_dataframe(self, dtypes=None): if dtypes is None: dtypes = {} - # Use a "peek" strategy to check the first page. This allows us to - # determine if the stream is Arrow or Avro without consuming the - # first page from the iterator, which would cause data loss if we - # failed to use the Arrow optimization and had to fall back to the - # generic to_dataframe() method. + # Use a "peek" strategy to check the first page, without consuming + # from self.pages generator pages = self.pages try: first_page = next(pages) except StopIteration: return self._empty_dataframe(dtypes) - # Optimization: If it's an Arrow stream, calling to_arrow, then converting to a - # pandas dataframe is about 2x faster. This is because pandas.concat is - # rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is - # usually no-copy. try: - first_batch = first_page.to_arrow() + # Optimization: If it's an Arrow stream, calling to_arrow, then converting to a + # pandas dataframe is about 2x faster. This is because pandas.concat is + # rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is + # usually no-copy. + record_batches = [ + p.to_arrow() for p in itertools.chain([first_page], pages) + ] + + table = pyarrow.Table.from_batches(record_batches) + df = table.to_pandas() + for column in dtypes: + df[column] = pandas.Series(df[column], dtype=dtypes[column]) + return df except NotImplementedError: - # Not an Arrow stream (or parser doesn't support to_arrow), so use - # the Avro path (or other generic path). + # Not an Arrow streamm use generic parser frames = [ - page.to_dataframe(dtypes=dtypes) - for page in itertools.chain([first_page], pages) + p.to_dataframe(dtypes=dtypes) + for p in itertools.chain([first_page], pages) ] return pandas.concat(frames) - # It IS an Arrow stream. - record_batches = [first_batch] - for page in pages: - record_batches.append(page.to_arrow()) - - table = pyarrow.Table.from_batches(record_batches) - df = table.to_pandas() - for column in dtypes: - df[column] = pandas.Series(df[column], dtype=dtypes[column]) - return df - def _empty_dataframe(self, dtypes): """Create an empty DataFrame with the correct schema. @@ -459,21 +449,18 @@ def _empty_dataframe(self, dtypes): for column in dtypes: df[column] = pandas.Series(df[column], dtype=dtypes[column]) return df + else: + self._stream_parser._parse_avro_schema() + schema = self._stream_parser._avro_schema_json - return self._empty_dataframe_from_avro(dtypes) - - def _empty_dataframe_from_avro(self, dtypes): - self._stream_parser._parse_avro_schema() - schema = self._stream_parser._avro_schema_json - - column_dtypes = self._dtypes_from_avro(schema["fields"]) - column_dtypes.update(dtypes) + column_dtypes = self._dtypes_from_avro(schema["fields"]) + column_dtypes.update(dtypes) - df = pandas.DataFrame(columns=column_dtypes.keys()) - for column in df: - df[column] = pandas.Series([], dtype=column_dtypes[column]) + df = pandas.DataFrame(columns=column_dtypes.keys()) + for column in df: + df[column] = pandas.Series([], dtype=column_dtypes[column]) - return df + return df def _dtypes_from_avro(self, avro_fields): """Determine Pandas dtypes for columns in Avro schema. From 846bac830ec523a0ef04b146bff88f4d9d85dd30 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 17 Feb 2026 18:20:36 -0800 Subject: [PATCH 06/13] fixed comments --- .../google/cloud/bigquery_storage_v1/reader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py index e3452cacd6cc..591e3be7d6bc 100644 --- a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py +++ b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py @@ -401,7 +401,7 @@ def to_dataframe(self, dtypes=None): dtypes = {} # Use a "peek" strategy to check the first page, without consuming - # from self.pages generator + # from self.pages generator. pages = self.pages try: first_page = next(pages) @@ -423,7 +423,7 @@ def to_dataframe(self, dtypes=None): df[column] = pandas.Series(df[column], dtype=dtypes[column]) return df except NotImplementedError: - # Not an Arrow streamm use generic parser + # Not an Arrow stream; use generic parser. frames = [ p.to_dataframe(dtypes=dtypes) for p in itertools.chain([first_page], pages) From fd23fc34cd3d5a62c81c6c599401991b056121c6 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 17 Feb 2026 18:42:33 -0800 Subject: [PATCH 07/13] Update packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../google/cloud/bigquery_storage_v1/reader.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py index 591e3be7d6bc..8e500ec319eb 100644 --- a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py +++ b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py @@ -439,15 +439,21 @@ def _empty_dataframe(self, dtypes): is initialized. """ if self._stream_parser is None: - return pandas.DataFrame() + df = pandas.DataFrame(columns=dtypes.keys()) + for col, dtype in dtypes.items(): + df[col] = pandas.Series([], dtype=dtype) + return df if isinstance(self._stream_parser, _ArrowStreamParser): self._stream_parser._parse_arrow_schema() df = pyarrow.Table.from_batches( [], schema=self._stream_parser._schema ).to_pandas() - for column in dtypes: - df[column] = pandas.Series(df[column], dtype=dtypes[column]) + for column, dtype in dtypes.items(): + if column in df.columns: + df[column] = pandas.Series(df[column], dtype=dtype) + else: + df[column] = pandas.Series([], dtype=dtype) return df else: self._stream_parser._parse_avro_schema() From 31595bcabf1d368cd7981ddf3e659809790930e0 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 17 Feb 2026 19:39:59 -0800 Subject: [PATCH 08/13] raise error if format changes mid-stream --- .../cloud/bigquery_storage_v1/reader.py | 19 ++++++------ .../tests/unit/test_reader_v1_arrow.py | 30 +++++++++++++++++++ 2 files changed, 40 insertions(+), 9 deletions(-) diff --git a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py index 8e500ec319eb..09ed67ff1791 100644 --- a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py +++ b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py @@ -16,7 +16,6 @@ import collections import io -import itertools import json import time @@ -408,26 +407,28 @@ def to_dataframe(self, dtypes=None): except StopIteration: return self._empty_dataframe(dtypes) + first_batch = None try: # Optimization: If it's an Arrow stream, calling to_arrow, then converting to a # pandas dataframe is about 2x faster. This is because pandas.concat is # rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is # usually no-copy. - record_batches = [ - p.to_arrow() for p in itertools.chain([first_page], pages) - ] + first_batch = first_page.to_arrow() + record_batches = [first_batch] + [p.to_arrow() for p in pages] table = pyarrow.Table.from_batches(record_batches) df = table.to_pandas() for column in dtypes: df[column] = pandas.Series(df[column], dtype=dtypes[column]) return df - except NotImplementedError: + except NotImplementedError as e: + if first_batch is not None: + # Unexpected state: if arrow parsing fails mid-stream, + # raise exception to prevent data loss. + raise RuntimeError("Stream format changed mid-stream") from e # Not an Arrow stream; use generic parser. - frames = [ - p.to_dataframe(dtypes=dtypes) - for p in itertools.chain([first_page], pages) - ] + first_batch = first_page.to_dataframe(dtypes=dtypes) + frames = [first_batch] + [p.to_dataframe(dtypes=dtypes) for p in pages] return pandas.concat(frames) def _empty_dataframe(self, dtypes): diff --git a/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1_arrow.py b/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1_arrow.py index a79502d2fe72..9862c65a84b3 100644 --- a/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1_arrow.py +++ b/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1_arrow.py @@ -370,3 +370,33 @@ def test_to_dataframe_by_page_arrow(class_under_test, mock_gapic_client): drop=True ), ) + + +def test_to_dataframe_mid_stream_failure(mut, class_under_test, mock_gapic_client): + """ + Verify RuntimeError is raised if stream changes from Arrow to Avro mid-stream. + This state is not expected in live environment. + """ + if pandas is None or pyarrow is None: + pytest.skip("Requires pandas and pyarrow") + + # Mock the first page to be a valid Arrow page. + arrow_page = mock.Mock(spec=mut.ReadRowsPage) + arrow_page.to_arrow.return_value = pyarrow.RecordBatch.from_pydict({"col": [1]}) + + # Mock the second page to raise NotImplementedError (simulating Avro/unknown). + avro_page = mock.Mock(spec=mut.ReadRowsPage) + avro_page.to_arrow.side_effect = NotImplementedError("Not Arrow") + + # Setup the reader to yield these two pages. + reader = class_under_test(mock_gapic_client, "name", 0, {}) + + with mock.patch.object( + mut.ReadRowsIterable, "pages", new_callable=mock.PropertyMock + ) as mock_pages: + mock_pages.return_value = iter([arrow_page, avro_page]) + + it = reader.rows() + + with pytest.raises(RuntimeError, match="Stream format changed mid-stream"): + it.to_dataframe() From 7d007afc07324845017fdec516a60fab155587be Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 17 Feb 2026 19:42:00 -0800 Subject: [PATCH 09/13] updated comment --- .../google/cloud/bigquery_storage_v1/reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py index 09ed67ff1791..183128e9ddff 100644 --- a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py +++ b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py @@ -423,7 +423,7 @@ def to_dataframe(self, dtypes=None): return df except NotImplementedError as e: if first_batch is not None: - # Unexpected state: if arrow parsing fails mid-stream, + # Unexpected state: if Arrow parsing fails mid-stream, # raise exception to prevent data loss. raise RuntimeError("Stream format changed mid-stream") from e # Not an Arrow stream; use generic parser. From 7c43864b69f42d054907ffa6fdf30a518619545b Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 18 Feb 2026 08:50:34 -0800 Subject: [PATCH 10/13] added word --- .../google/cloud/bigquery_storage_v1/reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py index 183128e9ddff..6ac203fa4ec9 100644 --- a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py +++ b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py @@ -424,7 +424,7 @@ def to_dataframe(self, dtypes=None): except NotImplementedError as e: if first_batch is not None: # Unexpected state: if Arrow parsing fails mid-stream, - # raise exception to prevent data loss. + # raise exception to prevent unreported data loss. raise RuntimeError("Stream format changed mid-stream") from e # Not an Arrow stream; use generic parser. first_batch = first_page.to_dataframe(dtypes=dtypes) From ecd6b5ed5dea526503fceefca217f1f7552a2f03 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 18 Feb 2026 09:04:38 -0800 Subject: [PATCH 11/13] updated test --- .../tests/unit/test_reader_v1_arrow.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1_arrow.py b/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1_arrow.py index 9862c65a84b3..29af246491bf 100644 --- a/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1_arrow.py +++ b/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1_arrow.py @@ -382,7 +382,9 @@ def test_to_dataframe_mid_stream_failure(mut, class_under_test, mock_gapic_clien # Mock the first page to be a valid Arrow page. arrow_page = mock.Mock(spec=mut.ReadRowsPage) - arrow_page.to_arrow.return_value = pyarrow.RecordBatch.from_pydict({"col": [1]}) + arrow_page.to_arrow.return_value = pyarrow.RecordBatch.from_arrays( + [pyarrow.array([1])], names=["col"] + ) # Mock the second page to raise NotImplementedError (simulating Avro/unknown). avro_page = mock.Mock(spec=mut.ReadRowsPage) From 0a40ae89a3b14394254bc4c797a6d9530cdceb17 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 18 Feb 2026 14:55:41 -0800 Subject: [PATCH 12/13] clean up code --- .../google/cloud/bigquery_storage_v1/reader.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py index 6ac203fa4ec9..186e3b3330e4 100644 --- a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py +++ b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py @@ -440,21 +440,15 @@ def _empty_dataframe(self, dtypes): is initialized. """ if self._stream_parser is None: - df = pandas.DataFrame(columns=dtypes.keys()) - for col, dtype in dtypes.items(): - df[col] = pandas.Series([], dtype=dtype) - return df + return pandas.DataFrame(columns=dtypes.keys()).astype(dtypes) if isinstance(self._stream_parser, _ArrowStreamParser): self._stream_parser._parse_arrow_schema() - df = pyarrow.Table.from_batches( - [], schema=self._stream_parser._schema - ).to_pandas() + + df = self._stream_parser._schema.empty_table().to_pandas() + for column, dtype in dtypes.items(): - if column in df.columns: - df[column] = pandas.Series(df[column], dtype=dtype) - else: - df[column] = pandas.Series([], dtype=dtype) + df[column] = pandas.Series(df.get(column, []), dtype=dtype) return df else: self._stream_parser._parse_avro_schema() From 8f5bc28de6b9dfbb07fa9e446a620e89258696b0 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 18 Feb 2026 15:09:19 -0800 Subject: [PATCH 13/13] fixed 3.7 test --- .../google/cloud/bigquery_storage_v1/reader.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py index 186e3b3330e4..e321a672c696 100644 --- a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py +++ b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py @@ -440,7 +440,10 @@ def _empty_dataframe(self, dtypes): is initialized. """ if self._stream_parser is None: - return pandas.DataFrame(columns=dtypes.keys()).astype(dtypes) + df = pandas.DataFrame(columns=dtypes.keys()) + for col, dtype in dtypes.items(): + df[col] = pandas.Series([], dtype=dtype) + return df if isinstance(self._stream_parser, _ArrowStreamParser): self._stream_parser._parse_arrow_schema()