Un-pickle-able
The Python support in Spark is impressive, yet sooner or later, it becomes clear that Spark is not in Python. Sometimes, such a reality comes into the foreground as unpickable stuff that refuses to get distributed on the cluster.
What's the story?
More or less is something like this:
from a_library_with_binary_bindings import SpecialClass
customization = {
"parameter": "value"
}
client = SpecialClass(**customization)
def my_function(row):
client.do_amazing_things_on(row)
.
:
Bla Bla Bla Bla ...
:
.
df.writeStream.outputMode("append").foreach(my_function).start()
Boom!
TypeError: cannot pickle 'XXXXXX' object
Why does it happen?
The fact is that "my_function" must run on the workers, and Spark needs to distribute it, so it will attempt to serialize it for distribution. The function is written in Python, and it is still Python that has to do the job. However, due to the binary bindings, the XXXXX objects needs some libfoo.so.1.2.3 that the pickle library can't serialize.
Fortunately, a_library_with_binary_bindings library is supposed to be installed on the worker nodes too, so we can change the code a little to ship the Python code that starts a ready-to-use client instance instead of the client itself. Something like this:
from a_library_with_binary_bindings import SpecialClass
customization = {
"parameter": "value"
}
def my_function(row):
client = SpecialClass(**customization)
client.do_amazing_things_on(row)
df.writeStream.outputMode("append").foreach(my_function).start()
This works because my_function contains the Python instructions to start a client, and so far, there is not yet such a client in the story.
Unfortunately, you will end up starting the client for every my_function call.
Let's try this other solution:
from a_library_with_binary_bindings import SpecialClass
customization = {
"parameter": "value"
}
def MyClassContainer:
def __init__(self, customization):
self._customization = customization
def client(self,):
if not self._client:
self._client = SpecialClass(**self._customization)
return self._client
def __call__(self, row):
self.client().do_amazing_things_on(row)
my_function = MyClassContainer(customization)
df.writeStream.outputMode("append").foreach(my_function).start()
Two paramount facts to consider.
领英推荐
The unpickable side of Python
For the sake of completeness, even in the case of Pure-Python staff:
The general rule of thumb is that “logical” objects can be pickled, but “resource” objects (files, locks) can’t because it makes no sense to persist/clone them.
Further reading:
On the foreach from PySpark:
The pickle library:
__call__()
The reader understands that this article may have errors and could be not updated for correct use with the Spark > 3.0 releases. The reader understands that will use it at his own risk, and there are no express or implied warranties for data or hair losses. The reader should also understand that to achieve better performance should consider Scala as a Python alternative.