diff --git a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py index f6f52fbfd6ae..e321a672c696 100644 --- a/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py +++ b/packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py @@ -364,6 +364,9 @@ def to_arrow(self): return pyarrow.Table.from_batches(record_batches) # No data, return an empty Table. + if self._stream_parser is None: + return pyarrow.Table.from_batches([], schema=pyarrow.schema([])) + self._stream_parser._parse_arrow_schema() return pyarrow.Table.from_batches([], schema=self._stream_parser._schema) @@ -396,38 +399,72 @@ def to_dataframe(self, dtypes=None): if dtypes is None: dtypes = {} - # If it's an Arrow stream, calling to_arrow, then converting to a - # pandas dataframe is about 2x faster. This is because pandas.concat is - # rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is - # usually no-copy. + # Use a "peek" strategy to check the first page, without consuming + # from self.pages generator. + pages = self.pages try: - record_batch = self.to_arrow() - except NotImplementedError: - pass - else: - df = record_batch.to_pandas() + first_page = next(pages) + except StopIteration: + return self._empty_dataframe(dtypes) + + first_batch = None + try: + # Optimization: If it's an Arrow stream, calling to_arrow, then converting to a + # pandas dataframe is about 2x faster. This is because pandas.concat is + # rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is + # usually no-copy. + first_batch = first_page.to_arrow() + record_batches = [first_batch] + [p.to_arrow() for p in pages] + + table = pyarrow.Table.from_batches(record_batches) + df = table.to_pandas() for column in dtypes: df[column] = pandas.Series(df[column], dtype=dtypes[column]) return df + except NotImplementedError as e: + if first_batch is not None: + # Unexpected state: if Arrow parsing fails mid-stream, + # raise exception to prevent unreported data loss. + raise RuntimeError("Stream format changed mid-stream") from e + # Not an Arrow stream; use generic parser. + first_batch = first_page.to_dataframe(dtypes=dtypes) + frames = [first_batch] + [p.to_dataframe(dtypes=dtypes) for p in pages] + return pandas.concat(frames) - frames = [page.to_dataframe(dtypes=dtypes) for page in self.pages] + def _empty_dataframe(self, dtypes): + """Create an empty DataFrame with the correct schema. - if frames: - return pandas.concat(frames) + This handles cases where the stream is empty but we still need to + return a DataFrame with the correct columns and types. It handles + both Arrow and Avro parsers, as well as the case where no parser + is initialized. + """ + if self._stream_parser is None: + df = pandas.DataFrame(columns=dtypes.keys()) + for col, dtype in dtypes.items(): + df[col] = pandas.Series([], dtype=dtype) + return df - # No data, construct an empty dataframe with columns matching the schema. - # The result should be consistent with what an empty ARROW stream would produce. - self._stream_parser._parse_avro_schema() - schema = self._stream_parser._avro_schema_json + if isinstance(self._stream_parser, _ArrowStreamParser): + self._stream_parser._parse_arrow_schema() - column_dtypes = self._dtypes_from_avro(schema["fields"]) - column_dtypes.update(dtypes) + df = self._stream_parser._schema.empty_table().to_pandas() - df = pandas.DataFrame(columns=column_dtypes.keys()) - for column in df: - df[column] = pandas.Series([], dtype=column_dtypes[column]) + for column, dtype in dtypes.items(): + df[column] = pandas.Series(df.get(column, []), dtype=dtype) + return df + else: + self._stream_parser._parse_avro_schema() + schema = self._stream_parser._avro_schema_json - return df + column_dtypes = self._dtypes_from_avro(schema["fields"]) + column_dtypes.update(dtypes) + + df = pandas.DataFrame(columns=column_dtypes.keys()) + for column in df: + df[column] = pandas.Series([], dtype=column_dtypes[column]) + + return df def _dtypes_from_avro(self, avro_fields): """Determine Pandas dtypes for columns in Avro schema. @@ -447,11 +484,13 @@ def _dtypes_from_avro(self, avro_fields): for field_info in avro_fields: # If a type is an union of multiple types, pick the first type # that is not "null". - if isinstance(field_info["type"], list): - type_info = next(item for item in field_info["type"] if item != "null") + type_info = field_info["type"] + if isinstance(type_info, list): + type_info = next(item for item in type_info if item != "null") if isinstance(type_info, str): field_dtype = type_map.get(type_info, "object") + else: logical_type = type_info.get("logicalType") if logical_type == "timestamp-micros": diff --git a/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1.py b/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1.py index e2e8870d0d00..1442b93e19b3 100644 --- a/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1.py +++ b/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1.py @@ -622,3 +622,81 @@ def test_to_dataframe_by_page(class_under_test, mock_gapic_client): drop=True ), ) + + +def test_to_dataframe_avro_no_lost_records(class_under_test, mock_gapic_client): + """Verify that to_dataframe() does not lose records for Avro streams. + See: https://github.com/googleapis/google-cloud-python/issues/14900 + """ + bq_columns = [ + {"name": "name", "type": "string"}, + {"name": "number", "type": "int64"}, + ] + avro_schema = _bq_to_avro_schema(bq_columns) + + # 2 pages of data + bq_blocks = [ + [{"name": "a", "number": 1}, {"name": "b", "number": 2}], + [{"name": "c", "number": 3}, {"name": "d", "number": 4}], + ] + + avro_blocks = _bq_to_avro_blocks(bq_blocks, avro_schema) + mock_gapic_client.read_rows.return_value = iter(avro_blocks) + + reader = class_under_test(mock_gapic_client, "name", 0, {}) + df = reader.to_dataframe() + + assert len(df) == 4 + assert df["name"].tolist() == ["a", "b", "c", "d"] + + +def test_to_dataframe_empty_stream_no_session(class_under_test, mock_gapic_client): + """Verify that to_dataframe() handles empty streams without a session safely. + See: https://github.com/googleapis/google-cloud-python/issues/14900 + """ + mock_gapic_client.read_rows.return_value = iter([]) + + reader = class_under_test(mock_gapic_client, "name", 0, {}) + df = reader.to_dataframe() + assert len(df) == 0 + assert isinstance(df, pandas.DataFrame) + + +def test_to_dataframe_empty_stream_with_session(class_under_test, mock_gapic_client): + """Verify that to_dataframe() handles empty streams with a session correctly. + See: https://github.com/googleapis/google-cloud-python/issues/14900 + """ + bq_columns = [{"name": "name", "type": "string"}] + avro_schema = _bq_to_avro_schema(bq_columns) + read_session = _generate_avro_read_session(avro_schema) + + mock_gapic_client.read_rows.return_value = iter([]) + + reader = class_under_test(mock_gapic_client, "name", 0, {}) + it = reader.rows(read_session=read_session) + df = it.to_dataframe() + + assert len(df) == 0 + assert df.columns.tolist() == ["name"] + + +def test_to_arrow_avro_consumes_first_page(class_under_test, mock_gapic_client): + """Verify that to_arrow() consumes the first page of an Avro stream if format is unknown. + See: https://github.com/googleapis/google-cloud-python/issues/14900 + """ + bq_columns = [{"name": "name", "type": "string"}] + avro_schema = _bq_to_avro_schema(bq_columns) + bq_blocks = [[{"name": "a"}], [{"name": "b"}]] + avro_blocks = _bq_to_avro_blocks(bq_blocks, avro_schema) + + mock_gapic_client.read_rows.return_value = iter(avro_blocks) + + reader = class_under_test(mock_gapic_client, "name", 0, {}) + it = reader.rows() + + with pytest.raises(NotImplementedError): + it.to_arrow() + + # Since read_session was not provided, to_arrow() had to consume the first message + # to find out it was Avro. So offset should be 1. + assert reader._offset == 1 diff --git a/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1_arrow.py b/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1_arrow.py index a79502d2fe72..29af246491bf 100644 --- a/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1_arrow.py +++ b/packages/google-cloud-bigquery-storage/tests/unit/test_reader_v1_arrow.py @@ -370,3 +370,35 @@ def test_to_dataframe_by_page_arrow(class_under_test, mock_gapic_client): drop=True ), ) + + +def test_to_dataframe_mid_stream_failure(mut, class_under_test, mock_gapic_client): + """ + Verify RuntimeError is raised if stream changes from Arrow to Avro mid-stream. + This state is not expected in live environment. + """ + if pandas is None or pyarrow is None: + pytest.skip("Requires pandas and pyarrow") + + # Mock the first page to be a valid Arrow page. + arrow_page = mock.Mock(spec=mut.ReadRowsPage) + arrow_page.to_arrow.return_value = pyarrow.RecordBatch.from_arrays( + [pyarrow.array([1])], names=["col"] + ) + + # Mock the second page to raise NotImplementedError (simulating Avro/unknown). + avro_page = mock.Mock(spec=mut.ReadRowsPage) + avro_page.to_arrow.side_effect = NotImplementedError("Not Arrow") + + # Setup the reader to yield these two pages. + reader = class_under_test(mock_gapic_client, "name", 0, {}) + + with mock.patch.object( + mut.ReadRowsIterable, "pages", new_callable=mock.PropertyMock + ) as mock_pages: + mock_pages.return_value = iter([arrow_page, avro_page]) + + it = reader.rows() + + with pytest.raises(RuntimeError, match="Stream format changed mid-stream"): + it.to_dataframe()