`__
.. code-block:: python
# Group example with Parzen-window estimation
import numpy
from django_q.tasks import async_task, result_group, delete_group
# the estimation function
def parzen_estimation(x_samples, point_x, h):
k_n = 0
for row in x_samples:
x_i = (point_x - row[:, numpy.newaxis]) / h
for row in x_i:
if numpy.abs(row) > (1 / 2):
break
else:
k_n += 1
return h, (k_n / len(x_samples)) / (h ** point_x.shape[1])
# create 100 calculations and return the collated result
def parzen_async():
# clear the previous results
delete_group('parzen', cached=True)
mu_vec = numpy.array([0, 0])
cov_mat = numpy.array([[1, 0], [0, 1]])
sample = numpy.random. \
multivariate_normal(mu_vec, cov_mat, 10000)
widths = numpy.linspace(1.0, 1.2, 100)
x = numpy.array([[0], [0]])
# async_task them with a group label to the cache backend
for w in widths:
async_task(parzen_estimation, sample, x, w,
group='parzen', cached=True)
# return after 100 results
return result_group('parzen', count=100, cached=True)
Django Q is not optimized for distributed computing, but this example will give you an idea of what you can do with task :doc:`group`.
Alternatively the ``parzen_async()`` function can also be written with :func:`async_iter`, which automatically utilizes the cache backend and groups to return a single result from an iterable:
.. code-block:: python
# create 100 calculations and return the collated result
def parzen_async():
mu_vec = numpy.array([0, 0])
cov_mat = numpy.array([[1, 0], [0, 1]])
sample = numpy.random. \
multivariate_normal(mu_vec, cov_mat, 10000)
widths = numpy.linspace(1.0, 1.2, 100)
x = numpy.array([[0], [0]])
# async_task them with async_task iterable
args = [(sample, x, w) for w in widths]
result_id = async_iter(parzen_estimation, args, cached=True)
# return the cached result or timeout after 10 seconds
return result(result_id, wait=10000, cached=True)
Http Health Check
=================
An example of a python http server you can use (localhost:8080) to validate the health status of all the clusters in your environment. Example is http only.
Requires cache to be enabled. Save file in your Django project's root directory and run with command: ``python worker_hc.py`` in your container or other environment. Can be customized to show whatever you'd like from the Stat class or modified as needed.
.. code-block:: python
from http.server import BaseHTTPRequestHandler, HTTPServer
from mtt_app.settings.base import EMAIL_USE_TLS
import os
import django
# Set the correct path to you settings module
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "my.settings.path")
# All django stuff has to come after the setup:
django.setup()
from django_q.monitor import Stat
from django_q.conf import Conf
# Set host and port settings
hostName = "localhost"
serverPort = 8080
class HealthCheckServer(BaseHTTPRequestHandler):
def do_GET(self):
# Count the clusters and their status
happy_clusters = 0
total_clusters = 0
for stat in Stat.get_all():
total_clusters += 1
if stat.status in [Conf.IDLE, Conf.WORKING]:
happy_clusters += 1
# Return 200 response if there is at least 1 cluster running,
# and make sure all running clusters are happy
if total_clusters and happy_clusters == total_clusters:
response_code = 200
else:
response_code = 500
self.send_response(response_code)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(
bytes("Django-Q Heath Check", "utf-8")
)
self.wfile.write(
bytes(f"Health check returned {response_code} response
", "utf-8")
)
self.wfile.write(
bytes(
f"{happy_clusters} of {total_clusters} cluster(s) are happy
",
"utf-8",
)
)
if __name__ == "__main__":
webServer = HTTPServer((hostName, serverPort), HealthCheckServer)
print("Server started at http://%s:%s" % (hostName, serverPort))
try:
webServer.serve_forever()
except KeyboardInterrupt:
pass
webServer.server_close()
print("Server stopped.")
.. note::
If you have an example you want to share, please submit a pull request on `github `__.