Added to_kafka directly from a Dask worker#279
Added to_kafka directly from a Dask worker#279jsmaupin wants to merge 1 commit intopython-streamz:masterfrom
Conversation
Codecov Report
@@ Coverage Diff @@
## master #279 +/- ##
==========================================
- Coverage 94.69% 93.61% -1.08%
==========================================
Files 13 13
Lines 1620 1644 +24
==========================================
+ Hits 1534 1539 +5
- Misses 86 105 +19
Continue to review full report at Codecov.
|
|
I'll add tests to get coverage now. |
|
@jsmaupin, did you get a chance to write some tests? @martindurant, @CJ-Wright, Any thoughts on this? |
|
@chinmaychandak I suspect this can be done with the existing to_kafka method. We just need to figure out the difference between this implementation and the existing one and add an |
|
Okay, let me take a look |
@jsmaupin The Hence, I am thinking that this current implementation of yours would be the best way out? |
|
|
||
| client = default_client() | ||
| result = client.submit(produce, self.topic, x, self.producer_config) | ||
| self._emit(result) |
There was a problem hiding this comment.
Should this call self.emit? That way it integrates with the async support?
There was a problem hiding this comment.
Agreed. Anything else that comes to mind that needs change, @CJ-Wright?
|
@martindurant Could you also please take a look at this? |
|
Circling back to this. After becoming more familiar with everything here, I'm thinking that this should support the back pressure like the existing |
|
This PR got left behind. Does anyone remember the status here? |
|
I proposed a solution here: https://stackoverflow.com/questions/60764361/write-to-kafka-from-a-dask-worker that I felt was a bit of a hack. I haven't found a better solution. I would be happy to follow through with this implementation if there are no objections. |
|
I think that approach is totally fine; but maybe I would improve it by having a dict of producers, with the key being a hash of the connection kwargs, because you could have different kafka jobs live in a cluster at the same time. Also, the attribute could as easily be a global variable in a module - especially if it's mutable like the dict I'm suggestign above. This seems cleaner to my mind, but I can't think of any technical reason that it's different (there is only one worker instance). |
Currently, we must
gather()all the results from a Dask stream back to the master script and then push the results to Kafka. This removes all the benefits of parallel processing we get with Dask and Kafka. It would be much more efficient if we could push data directly from the Dask workers into Kafka.One issue I had getting this to work is that the Producer class from the Confluent Kafka Python library is not pickle-able. The workaround is to hide the Producer from the pickle function by creating it using "reflection" methods and create it on the worker side. However, I believe this adds a requirement that the confluent_kafka library must be installed on the worker.
Also, this implementation is serial, but Dask itself is parallel.