Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,9 @@ def to_arrow(self):
return pyarrow.Table.from_batches(record_batches)

# No data, return an empty Table.
if self._stream_parser is None:
return pyarrow.Table.from_batches([], schema=pyarrow.schema([]))

self._stream_parser._parse_arrow_schema()
return pyarrow.Table.from_batches([], schema=self._stream_parser._schema)

Expand Down Expand Up @@ -396,38 +399,72 @@ def to_dataframe(self, dtypes=None):
if dtypes is None:
dtypes = {}

# If it's an Arrow stream, calling to_arrow, then converting to a
# pandas dataframe is about 2x faster. This is because pandas.concat is
# rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
# usually no-copy.
# Use a "peek" strategy to check the first page, without consuming
# from self.pages generator.
pages = self.pages
try:
record_batch = self.to_arrow()
except NotImplementedError:
pass
else:
df = record_batch.to_pandas()
first_page = next(pages)
except StopIteration:
return self._empty_dataframe(dtypes)

first_batch = None
try:
# Optimization: If it's an Arrow stream, calling to_arrow, then converting to a
# pandas dataframe is about 2x faster. This is because pandas.concat is
# rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
# usually no-copy.
first_batch = first_page.to_arrow()
record_batches = [first_batch] + [p.to_arrow() for p in pages]

table = pyarrow.Table.from_batches(record_batches)
df = table.to_pandas()
for column in dtypes:
df[column] = pandas.Series(df[column], dtype=dtypes[column])
return df
Comment on lines 411 to 423
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

While the current implementation correctly handles the case where the first page fails to convert to Arrow, it is technically susceptible to data loss if p.to_arrow() succeeds for the first page but fails for a subsequent one (as the pages iterator would have been partially consumed before the except block is reached). Although BigQuery Storage streams are typically homogeneous in format, it is safer to verify the first page individually before committing to the full iteration.

Suggested change
try:
# Optimization: If it's an Arrow stream, calling to_arrow, then converting to a
# pandas dataframe is about 2x faster. This is because pandas.concat is
# rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
# usually no-copy.
record_batches = [
p.to_arrow() for p in itertools.chain([first_page], pages)
]
table = pyarrow.Table.from_batches(record_batches)
df = table.to_pandas()
for column in dtypes:
df[column] = pandas.Series(df[column], dtype=dtypes[column])
return df
try:
# Optimization: If it's an Arrow stream, calling to_arrow, then converting to a
# pandas dataframe is about 2x faster.
first_batch = first_page.to_arrow()
record_batches = [first_batch] + [
p.to_arrow() for p in pages
]
table = pyarrow.Table.from_batches(record_batches)
df = table.to_pandas()
for column, dtype in dtypes.items():
if column in df.columns:
df[column] = pandas.Series(df[column], dtype=dtype)
else:
df[column] = pandas.Series([], dtype=dtype)
return df

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this code assumes if the first page is in arrow format, the rest of the stream will be as well. @Linchin let me know if that's a wrong assumption

I think I'll change the code to raise an exception if we do get in this unexpected state though, to be explicit about what we expect

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's reasonable to expect the following pages to be arrow too. Thanks for adding the exception, just in case.

except NotImplementedError as e:
if first_batch is not None:
# Unexpected state: if Arrow parsing fails mid-stream,
# raise exception to prevent unreported data loss.
raise RuntimeError("Stream format changed mid-stream") from e
# Not an Arrow stream; use generic parser.
first_batch = first_page.to_dataframe(dtypes=dtypes)
frames = [first_batch] + [p.to_dataframe(dtypes=dtypes) for p in pages]
return pandas.concat(frames)

frames = [page.to_dataframe(dtypes=dtypes) for page in self.pages]
def _empty_dataframe(self, dtypes):
"""Create an empty DataFrame with the correct schema.

if frames:
return pandas.concat(frames)
This handles cases where the stream is empty but we still need to
return a DataFrame with the correct columns and types. It handles
both Arrow and Avro parsers, as well as the case where no parser
is initialized.
"""
if self._stream_parser is None:
df = pandas.DataFrame(columns=dtypes.keys())
for col, dtype in dtypes.items():
df[col] = pandas.Series([], dtype=dtype)
return df

# No data, construct an empty dataframe with columns matching the schema.
# The result should be consistent with what an empty ARROW stream would produce.
self._stream_parser._parse_avro_schema()
schema = self._stream_parser._avro_schema_json
if isinstance(self._stream_parser, _ArrowStreamParser):
self._stream_parser._parse_arrow_schema()

column_dtypes = self._dtypes_from_avro(schema["fields"])
column_dtypes.update(dtypes)
df = self._stream_parser._schema.empty_table().to_pandas()

df = pandas.DataFrame(columns=column_dtypes.keys())
for column in df:
df[column] = pandas.Series([], dtype=column_dtypes[column])
for column, dtype in dtypes.items():
df[column] = pandas.Series(df.get(column, []), dtype=dtype)
return df
else:
self._stream_parser._parse_avro_schema()
schema = self._stream_parser._avro_schema_json

return df
column_dtypes = self._dtypes_from_avro(schema["fields"])
column_dtypes.update(dtypes)

df = pandas.DataFrame(columns=column_dtypes.keys())
for column in df:
df[column] = pandas.Series([], dtype=column_dtypes[column])

return df

def _dtypes_from_avro(self, avro_fields):
"""Determine Pandas dtypes for columns in Avro schema.
Expand All @@ -447,11 +484,13 @@ def _dtypes_from_avro(self, avro_fields):
for field_info in avro_fields:
# If a type is an union of multiple types, pick the first type
# that is not "null".
if isinstance(field_info["type"], list):
type_info = next(item for item in field_info["type"] if item != "null")
type_info = field_info["type"]
if isinstance(type_info, list):
type_info = next(item for item in type_info if item != "null")

if isinstance(type_info, str):
field_dtype = type_map.get(type_info, "object")

else:
logical_type = type_info.get("logicalType")
if logical_type == "timestamp-micros":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,3 +622,81 @@ def test_to_dataframe_by_page(class_under_test, mock_gapic_client):
drop=True
),
)


def test_to_dataframe_avro_no_lost_records(class_under_test, mock_gapic_client):
"""Verify that to_dataframe() does not lose records for Avro streams.
See: https://github.com/googleapis/google-cloud-python/issues/14900
"""
bq_columns = [
{"name": "name", "type": "string"},
{"name": "number", "type": "int64"},
]
avro_schema = _bq_to_avro_schema(bq_columns)

# 2 pages of data
bq_blocks = [
[{"name": "a", "number": 1}, {"name": "b", "number": 2}],
[{"name": "c", "number": 3}, {"name": "d", "number": 4}],
]

avro_blocks = _bq_to_avro_blocks(bq_blocks, avro_schema)
mock_gapic_client.read_rows.return_value = iter(avro_blocks)

reader = class_under_test(mock_gapic_client, "name", 0, {})
df = reader.to_dataframe()

assert len(df) == 4
assert df["name"].tolist() == ["a", "b", "c", "d"]


def test_to_dataframe_empty_stream_no_session(class_under_test, mock_gapic_client):
"""Verify that to_dataframe() handles empty streams without a session safely.
See: https://github.com/googleapis/google-cloud-python/issues/14900
"""
mock_gapic_client.read_rows.return_value = iter([])

reader = class_under_test(mock_gapic_client, "name", 0, {})
df = reader.to_dataframe()
assert len(df) == 0
assert isinstance(df, pandas.DataFrame)


def test_to_dataframe_empty_stream_with_session(class_under_test, mock_gapic_client):
"""Verify that to_dataframe() handles empty streams with a session correctly.
See: https://github.com/googleapis/google-cloud-python/issues/14900
"""
bq_columns = [{"name": "name", "type": "string"}]
avro_schema = _bq_to_avro_schema(bq_columns)
read_session = _generate_avro_read_session(avro_schema)

mock_gapic_client.read_rows.return_value = iter([])

reader = class_under_test(mock_gapic_client, "name", 0, {})
it = reader.rows(read_session=read_session)
df = it.to_dataframe()

assert len(df) == 0
assert df.columns.tolist() == ["name"]


def test_to_arrow_avro_consumes_first_page(class_under_test, mock_gapic_client):
"""Verify that to_arrow() consumes the first page of an Avro stream if format is unknown.
See: https://github.com/googleapis/google-cloud-python/issues/14900
"""
bq_columns = [{"name": "name", "type": "string"}]
avro_schema = _bq_to_avro_schema(bq_columns)
bq_blocks = [[{"name": "a"}], [{"name": "b"}]]
avro_blocks = _bq_to_avro_blocks(bq_blocks, avro_schema)

mock_gapic_client.read_rows.return_value = iter(avro_blocks)

reader = class_under_test(mock_gapic_client, "name", 0, {})
it = reader.rows()

with pytest.raises(NotImplementedError):
it.to_arrow()

# Since read_session was not provided, to_arrow() had to consume the first message
# to find out it was Avro. So offset should be 1.
assert reader._offset == 1
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,35 @@ def test_to_dataframe_by_page_arrow(class_under_test, mock_gapic_client):
drop=True
),
)


def test_to_dataframe_mid_stream_failure(mut, class_under_test, mock_gapic_client):
"""
Verify RuntimeError is raised if stream changes from Arrow to Avro mid-stream.
This state is not expected in live environment.
"""
if pandas is None or pyarrow is None:
pytest.skip("Requires pandas and pyarrow")

# Mock the first page to be a valid Arrow page.
arrow_page = mock.Mock(spec=mut.ReadRowsPage)
arrow_page.to_arrow.return_value = pyarrow.RecordBatch.from_arrays(
[pyarrow.array([1])], names=["col"]
)

# Mock the second page to raise NotImplementedError (simulating Avro/unknown).
avro_page = mock.Mock(spec=mut.ReadRowsPage)
avro_page.to_arrow.side_effect = NotImplementedError("Not Arrow")

# Setup the reader to yield these two pages.
reader = class_under_test(mock_gapic_client, "name", 0, {})

with mock.patch.object(
mut.ReadRowsIterable, "pages", new_callable=mock.PropertyMock
) as mock_pages:
mock_pages.return_value = iter([arrow_page, avro_page])

it = reader.rows()

with pytest.raises(RuntimeError, match="Stream format changed mid-stream"):
it.to_dataframe()
Loading