-
Notifications
You must be signed in to change notification settings - Fork 152
Open
Description
I'm testing to_kafka sink and its throughput is limited by polltime (0.2 sec). Looks like self.producer.poll(0) only polls for one message at a time and so only one callback is called every 0.2 seconds.
This fails:
def test_to_kafka_throughput():
ARGS = {'bootstrap.servers': 'localhost:9092'}
with kafka_service() as kafka:
_, TOPIC = kafka
source = Stream.from_iterable(range(100)).map(lambda x: str(x).encode())
kafka = source.to_kafka(TOPIC, ARGS)
out = kafka.sink_to_list()
source.start()
wait_for(
lambda: len(out) == 100,
5,
period=0.1,
fail_func=lambda: print("len(out) ==", len(out))
)The existing test_to_kafka test doesn't catch this, because it starts waiting on the result only after all the items are emitted.
I spent some time tinkering with the code, but can't figure out what's wrong and how to fix this, so any ideas are appreciated.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels