generated from guanyuankai/bonus-edge-proxy
1027 lines
25 KiB
C
1027 lines
25 KiB
C
#include <Python.h>
|
|
|
|
#include "MQTTAsync.h"
|
|
#include "LinkedList.h"
|
|
|
|
static PyObject *MqttV3Error;
|
|
|
|
|
|
static PyObject* mqttv3_create(PyObject* self, PyObject *args)
|
|
{
|
|
MQTTAsync c;
|
|
char* serverURI;
|
|
char* clientId;
|
|
int persistence_option = MQTTCLIENT_PERSISTENCE_DEFAULT;
|
|
PyObject *pyoptions = NULL, *temp;
|
|
MQTTAsync_createOptions options = MQTTAsync_createOptions_initializer;
|
|
int rc;
|
|
|
|
if (!PyArg_ParseTuple(args, "ss|iO", &serverURI, &clientId,
|
|
&persistence_option, &pyoptions))
|
|
return NULL;
|
|
|
|
if (persistence_option != MQTTCLIENT_PERSISTENCE_DEFAULT
|
|
&& persistence_option != MQTTCLIENT_PERSISTENCE_NONE)
|
|
{
|
|
PyErr_SetString(PyExc_TypeError, "persistence must be DEFAULT or NONE");
|
|
return NULL;
|
|
}
|
|
|
|
if (pyoptions)
|
|
{
|
|
|
|
if (!PyDict_Check(pyoptions))
|
|
{
|
|
PyErr_SetString(PyExc_TypeError,
|
|
"Create options parameter must be a dictionary");
|
|
return NULL;
|
|
}
|
|
|
|
if ((temp = PyDict_GetItemString(pyoptions, "sendWhileDisconnected"))
|
|
!= NULL)
|
|
{
|
|
if (PyInt_Check(temp))
|
|
options.sendWhileDisconnected = (int) PyInt_AsLong(temp);
|
|
else
|
|
{
|
|
PyErr_SetString(PyExc_TypeError, "sendWhileDisconnected value must be int");
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
if ((temp = PyDict_GetItemString(pyoptions, "maxBufferedMessages"))
|
|
!= NULL)
|
|
{
|
|
if (PyInt_Check(temp))
|
|
options.maxBufferedMessages = (int) PyInt_AsLong(temp);
|
|
else
|
|
{
|
|
PyErr_SetString(PyExc_TypeError, "maxBufferedMessages value must be int");
|
|
return NULL;
|
|
}
|
|
}
|
|
rc = MQTTAsync_createWithOptions(&c, serverURI, clientId, persistence_option, NULL, &options);
|
|
}
|
|
else
|
|
rc = MQTTAsync_create(&c, serverURI, clientId, persistence_option, NULL);
|
|
|
|
|
|
return Py_BuildValue("ik", rc, c);
|
|
}
|
|
|
|
|
|
static List* callbacks = NULL;
|
|
static List* connected_callbacks = NULL;
|
|
|
|
enum msgTypes
|
|
{
|
|
CONNECT, PUBLISH, SUBSCRIBE, SUBSCRIBE_MANY, UNSUBSCRIBE
|
|
};
|
|
|
|
typedef struct
|
|
{
|
|
MQTTAsync c;
|
|
PyObject *context;
|
|
PyObject *cl, *ma, *dc;
|
|
} CallbackEntry;
|
|
|
|
|
|
typedef struct
|
|
{
|
|
MQTTAsync c;
|
|
PyObject *context;
|
|
PyObject *co;
|
|
} ConnectedEntry;
|
|
|
|
|
|
int clientCompare(void* a, void* b)
|
|
{
|
|
CallbackEntry* e = (CallbackEntry*) a;
|
|
return e->c == (MQTTAsync) b;
|
|
}
|
|
|
|
|
|
int connectedCompare(void* a, void* b)
|
|
{
|
|
ConnectedEntry* e = (ConnectedEntry*) a;
|
|
return e->c == (MQTTAsync) b;
|
|
}
|
|
|
|
|
|
void connected(void* context, char* cause)
|
|
{
|
|
/* call the right Python function, using the context */
|
|
PyObject *arglist;
|
|
PyObject *result;
|
|
ConnectedEntry* e = context;
|
|
PyGILState_STATE gstate;
|
|
|
|
gstate = PyGILState_Ensure();
|
|
arglist = Py_BuildValue("Os", e->context, cause);
|
|
result = PyEval_CallObject(e->co, arglist);
|
|
Py_DECREF(arglist);
|
|
PyGILState_Release(gstate);
|
|
}
|
|
|
|
|
|
void connectionLost(void* context, char* cause)
|
|
{
|
|
/* call the right Python function, using the context */
|
|
PyObject *arglist;
|
|
PyObject *result;
|
|
CallbackEntry* e = context;
|
|
PyGILState_STATE gstate;
|
|
|
|
gstate = PyGILState_Ensure();
|
|
arglist = Py_BuildValue("Os", e->context, cause);
|
|
result = PyEval_CallObject(e->cl, arglist);
|
|
Py_DECREF(arglist);
|
|
PyGILState_Release(gstate);
|
|
}
|
|
|
|
|
|
int messageArrived(void* context, char* topicName, int topicLen,
|
|
MQTTAsync_message* message)
|
|
{
|
|
PyObject *result = NULL;
|
|
CallbackEntry* e = context;
|
|
int rc = -99;
|
|
PyGILState_STATE gstate;
|
|
|
|
gstate = PyGILState_Ensure();
|
|
if (topicLen == 0)
|
|
result = PyObject_CallFunction(e->ma, "Os{ss#sisisisi}", e->context,
|
|
topicName, "payload", message->payload, message->payloadlen,
|
|
"qos", message->qos, "retained", message->retained, "dup",
|
|
message->dup, "msgid", message->msgid);
|
|
else
|
|
result = PyObject_CallFunction(e->ma, "Os#{ss#sisisisi}", e->context,
|
|
topicName, topicLen, "payload", message->payload,
|
|
message->payloadlen, "qos", message->qos, "retained",
|
|
message->retained, "dup", message->dup, "msgid",
|
|
message->msgid);
|
|
if (result)
|
|
{
|
|
if (PyInt_Check(result))
|
|
rc = (int) PyInt_AsLong(result);
|
|
Py_DECREF(result);
|
|
}
|
|
PyGILState_Release(gstate);
|
|
MQTTAsync_free(topicName);
|
|
MQTTAsync_freeMessage(&message);
|
|
return rc;
|
|
}
|
|
|
|
|
|
void deliveryComplete(void* context, MQTTAsync_token dt)
|
|
{
|
|
PyObject *arglist;
|
|
PyObject *result;
|
|
CallbackEntry* e = context;
|
|
PyGILState_STATE gstate;
|
|
|
|
gstate = PyGILState_Ensure();
|
|
arglist = Py_BuildValue("Oi", e->context, dt);
|
|
result = PyEval_CallObject(e->dc, arglist);
|
|
Py_DECREF(arglist);
|
|
PyGILState_Release(gstate);
|
|
}
|
|
|
|
|
|
static PyObject* mqttv3_setcallbacks(PyObject* self, PyObject *args)
|
|
{
|
|
MQTTAsync c;
|
|
CallbackEntry* e = NULL;
|
|
int rc;
|
|
|
|
e = malloc(sizeof(CallbackEntry));
|
|
|
|
if (!PyArg_ParseTuple(args, "kOOOO", &c, (PyObject**) &e->context, &e->cl,
|
|
&e->ma, &e->dc))
|
|
return NULL;
|
|
e->c = c;
|
|
|
|
if ((e->cl != Py_None && !PyCallable_Check(e->cl))
|
|
|| (e->ma != Py_None && !PyCallable_Check(e->ma))
|
|
|| (e->dc != Py_None && !PyCallable_Check(e->dc)))
|
|
{
|
|
PyErr_SetString(PyExc_TypeError,
|
|
"3rd, 4th and 5th parameters must be callable or None");
|
|
return NULL;
|
|
}
|
|
|
|
Py_BEGIN_ALLOW_THREADS
|
|
rc = MQTTAsync_setCallbacks(c, e, connectionLost, messageArrived, deliveryComplete);
|
|
Py_END_ALLOW_THREADS
|
|
|
|
if (rc == MQTTASYNC_SUCCESS)
|
|
{
|
|
ListElement* temp = NULL;
|
|
if ((temp = ListFindItem(callbacks, c, clientCompare)) != NULL)
|
|
{
|
|
ListDetach(callbacks, temp->content);
|
|
free(temp->content);
|
|
}
|
|
ListAppend(callbacks, e, sizeof(e));
|
|
Py_XINCREF(e->cl);
|
|
Py_XINCREF(e->ma);
|
|
Py_XINCREF(e->dc);
|
|
Py_XINCREF(e->context);
|
|
}
|
|
|
|
return Py_BuildValue("i", rc);
|
|
}
|
|
|
|
|
|
static PyObject* mqttv3_setconnected(PyObject* self, PyObject *args)
|
|
{
|
|
MQTTAsync c;
|
|
ConnectedEntry* e = NULL;
|
|
int rc;
|
|
|
|
e = malloc(sizeof(ConnectedEntry));
|
|
|
|
if (!PyArg_ParseTuple(args, "kOO", &c, (PyObject**) &e->context, &e->co))
|
|
return NULL;
|
|
e->c = c;
|
|
|
|
if (e->co != Py_None && !PyCallable_Check(e->co))
|
|
{
|
|
PyErr_SetString(PyExc_TypeError,
|
|
"3rd parameter must be callable or None");
|
|
return NULL;
|
|
}
|
|
|
|
Py_BEGIN_ALLOW_THREADS
|
|
rc = MQTTAsync_setConnected(c, e, connected);
|
|
Py_END_ALLOW_THREADS
|
|
|
|
if (rc == MQTTASYNC_SUCCESS)
|
|
{
|
|
ListElement* temp = NULL;
|
|
if ((temp = ListFindItem(connected_callbacks, c, connectedCompare)) != NULL)
|
|
{
|
|
ListDetach(connected_callbacks, temp->content);
|
|
free(temp->content);
|
|
}
|
|
ListAppend(connected_callbacks, e, sizeof(e));
|
|
Py_XINCREF(e->co);
|
|
Py_XINCREF(e->context);
|
|
}
|
|
|
|
return Py_BuildValue("i", rc);
|
|
}
|
|
|
|
|
|
typedef struct
|
|
{
|
|
MQTTAsync c;
|
|
PyObject *context;
|
|
PyObject *onSuccess, *onFailure;
|
|
enum msgTypes msgType;
|
|
} ResponseEntry;
|
|
|
|
|
|
void onSuccess(void* context, MQTTAsync_successData* response)
|
|
{
|
|
PyObject *result = NULL;
|
|
ResponseEntry* e = context;
|
|
PyGILState_STATE gstate;
|
|
|
|
gstate = PyGILState_Ensure();
|
|
switch (e->msgType)
|
|
{
|
|
case CONNECT:
|
|
result = PyObject_CallFunction(e->onSuccess, "O{sisiss}", (e->context) ? e->context : Py_None,
|
|
"MQTTVersion", response->alt.connect.MQTTVersion,
|
|
"sessionPresent", response->alt.connect.sessionPresent,
|
|
"serverURI", response->alt.connect.serverURI);
|
|
break;
|
|
|
|
case PUBLISH:
|
|
result = PyObject_CallFunction(e->onSuccess, "O{si ss s{ss# sisi}}", (e->context) ? e->context : Py_None,
|
|
"token", response->token,
|
|
"destinationName", response->alt.pub.destinationName,
|
|
"message",
|
|
"payload", response->alt.pub.message.payload,
|
|
response->alt.pub.message.payloadlen,
|
|
"qos", response->alt.pub.message.qos,
|
|
"retained", response->alt.pub.message.retained);
|
|
break;
|
|
|
|
case SUBSCRIBE:
|
|
result = PyObject_CallFunction(e->onSuccess, "O{sisi}", (e->context) ? e->context : Py_None,
|
|
"token", response->token,
|
|
"qos", response->alt.qos);
|
|
break;
|
|
|
|
case SUBSCRIBE_MANY:
|
|
result = PyObject_CallFunction(e->onSuccess, "O{sis[i]}", (e->context) ? e->context : Py_None,
|
|
"token", response->token,
|
|
"qosList", response->alt.qosList[0]);
|
|
break;
|
|
|
|
case UNSUBSCRIBE:
|
|
result = PyObject_CallFunction(e->onSuccess, "O{si}", (e->context) ? e->context : Py_None,
|
|
"token", response->token);
|
|
break;
|
|
}
|
|
if (result)
|
|
{
|
|
Py_DECREF(result);
|
|
printf("decrementing reference count for result\n");
|
|
}
|
|
PyGILState_Release(gstate);
|
|
free(e);
|
|
}
|
|
|
|
|
|
void onFailure(void* context, MQTTAsync_failureData* response)
|
|
{
|
|
PyObject *result = NULL;
|
|
PyObject *arglist = NULL;
|
|
ResponseEntry* e = context;
|
|
PyGILState_STATE gstate;
|
|
|
|
gstate = PyGILState_Ensure();
|
|
|
|
// TODO: convert response into Python structure
|
|
if (e->context)
|
|
arglist = Py_BuildValue("OO", e->context, response);
|
|
else
|
|
arglist = Py_BuildValue("OO", Py_None, response);
|
|
|
|
result = PyEval_CallObject(e->onFailure, arglist);
|
|
Py_DECREF(arglist);
|
|
PyGILState_Release(gstate);
|
|
free(e);
|
|
}
|
|
|
|
|
|
/* return true if ok, false otherwise */
|
|
int getResponseOptions(MQTTAsync c, PyObject *pyoptions, MQTTAsync_responseOptions* responseOptions,
|
|
enum msgTypes msgType)
|
|
{
|
|
PyObject *temp = NULL;
|
|
|
|
if (!pyoptions)
|
|
return 1;
|
|
|
|
if (!PyDict_Check(pyoptions))
|
|
{
|
|
PyErr_SetString(PyExc_TypeError, "Response options must be a dictionary");
|
|
return 0;
|
|
}
|
|
|
|
if ((temp = PyDict_GetItemString(pyoptions, "onSuccess")) != NULL)
|
|
{
|
|
if (PyCallable_Check(temp)) /* temp points to Python function */
|
|
responseOptions->onSuccess = (MQTTAsync_onSuccess*)temp;
|
|
else
|
|
{
|
|
PyErr_SetString(PyExc_TypeError,
|
|
"onSuccess value must be callable");
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
if ((temp = PyDict_GetItemString(pyoptions, "onFailure")) != NULL)
|
|
{
|
|
if (PyCallable_Check(temp))
|
|
responseOptions->onFailure = (MQTTAsync_onFailure*)temp;
|
|
else
|
|
{
|
|
PyErr_SetString(PyExc_TypeError,
|
|
"onFailure value must be callable");
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
responseOptions->context = PyDict_GetItemString(pyoptions, "context");
|
|
|
|
if (responseOptions->onFailure || responseOptions->onSuccess)
|
|
{
|
|
ResponseEntry* r = malloc(sizeof(ResponseEntry));
|
|
r->c = c;
|
|
r->context = responseOptions->context;
|
|
responseOptions->context = r;
|
|
r->onSuccess = (PyObject*)responseOptions->onSuccess;
|
|
responseOptions->onSuccess = onSuccess;
|
|
r->onFailure = (PyObject*)responseOptions->onFailure;
|
|
responseOptions->onFailure = onFailure;
|
|
r->msgType = msgType;
|
|
}
|
|
|
|
return 1; /* not an error, if we get here */
|
|
}
|
|
|
|
|
|
static PyObject* mqttv3_connect(PyObject* self, PyObject *args)
|
|
{
|
|
MQTTAsync c;
|
|
PyObject *pyoptions = NULL, *temp;
|
|
MQTTAsync_connectOptions connectOptions = MQTTAsync_connectOptions_initializer;
|
|
MQTTAsync_willOptions willOptions = MQTTAsync_willOptions_initializer;
|
|
int rc;
|
|
|
|
if (!PyArg_ParseTuple(args, "k|O", &c, &pyoptions))
|
|
return NULL;
|
|
|
|
if (!pyoptions)
|
|
goto skip;
|
|
|
|
if (!PyDict_Check(pyoptions))
|
|
{
|
|
PyErr_SetString(PyExc_TypeError, "2nd parameter must be a dictionary");
|
|
return NULL;
|
|
}
|
|
|
|
if ((temp = PyDict_GetItemString(pyoptions, "onSuccess")) != NULL)
|
|
{
|
|
if (PyCallable_Check(temp)) /* temp points to Python function */
|
|
connectOptions.onSuccess = (MQTTAsync_onSuccess*)temp;
|
|
else
|
|
{
|
|
PyErr_SetString(PyExc_TypeError,
|
|
"onSuccess value must be callable");
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
if ((temp = PyDict_GetItemString(pyoptions, "onFailure")) != NULL)
|
|
{
|
|
if (PyCallable_Check(temp))
|
|
connectOptions.onFailure = (MQTTAsync_onFailure*)temp;
|
|
else
|
|
{
|
|
PyErr_SetString(PyExc_TypeError,
|
|
"onFailure value must be callable");
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
connectOptions.context = PyDict_GetItemString(pyoptions, "context");
|
|
|
|
if (connectOptions.onFailure || connectOptions.onSuccess)
|
|
{
|
|
ResponseEntry* r = malloc(sizeof(ResponseEntry));
|
|
r->c = c;
|
|
r->context = connectOptions.context;
|
|
connectOptions.context = r;
|
|
r->onSuccess = (PyObject*)connectOptions.onSuccess;
|
|
connectOptions.onSuccess = onSuccess;
|
|
r->onFailure = (PyObject*)connectOptions.onFailure;
|
|
connectOptions.onFailure = onFailure;
|
|
r->msgType = CONNECT;
|
|
}
|
|
|
|
if ((temp = PyDict_GetItemString(pyoptions, "keepAliveInterval")) != NULL)
|
|
{
|
|
if (PyInt_Check(temp))
|
|
connectOptions.keepAliveInterval = (int) PyInt_AsLong(temp);
|
|
else
|
|
{
|
|
PyErr_SetString(PyExc_TypeError,
|
|
"keepAliveLiveInterval value must be int");
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
if ((temp = PyDict_GetItemString(pyoptions, "cleansession")) != NULL)
|
|
{
|
|
if (PyInt_Check(temp))
|
|
connectOptions.cleansession = (int) PyInt_AsLong(temp);
|
|
else
|
|
{
|
|
PyErr_SetString(PyExc_TypeError, "cleansession value must be int");
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
if ((temp = PyDict_GetItemString(pyoptions, "will")) != NULL)
|
|
{
|
|
if (PyDict_Check(temp))
|
|
{
|
|
PyObject *wtemp = NULL;
|
|
if ((wtemp = PyDict_GetItemString(temp, "topicName")) == NULL)
|
|
{
|
|
PyErr_SetString(PyExc_TypeError,
|
|
"will topicName value must be set");
|
|
return NULL;
|
|
}
|
|
else
|
|
{
|
|
if (PyString_Check(wtemp))
|
|
willOptions.topicName = PyString_AsString(wtemp);
|
|
else
|
|
{
|
|
PyErr_SetString(PyExc_TypeError,
|
|
"will topicName value must be string");
|
|
return NULL;
|
|
}
|
|
}
|
|
if ((wtemp = PyDict_GetItemString(temp, "message")) == NULL)
|
|
{
|
|
PyErr_SetString(PyExc_TypeError,
|
|
"will message value must be set");
|
|
return NULL;
|
|
}
|
|
else
|
|
{
|
|
if (PyString_Check(wtemp))
|
|
willOptions.message = PyString_AsString(wtemp);
|
|
else
|
|
{
|
|
PyErr_SetString(PyExc_TypeError,
|
|
"will message value must be string");
|
|
return NULL;
|
|
}
|
|
}
|
|
if ((wtemp = PyDict_GetItemString(temp, "retained")) != NULL)
|
|
{
|
|
if (PyInt_Check(wtemp))
|
|
willOptions.retained = (int) PyInt_AsLong(wtemp);
|
|
else
|
|
{
|
|
PyErr_SetString(PyExc_TypeError,
|
|
"will retained value must be int");
|
|
return NULL;
|
|
}
|
|
}
|
|
if ((wtemp = PyDict_GetItemString(temp, "qos")) != NULL)
|
|
{
|
|
if (PyInt_Check(wtemp))
|
|
willOptions.qos = (int) PyInt_AsLong(wtemp);
|
|
else
|
|
{
|
|
PyErr_SetString(PyExc_TypeError,
|
|
"will qos value must be int");
|
|
return NULL;
|
|
}
|
|
}
|
|
connectOptions.will = &willOptions;
|
|
}
|
|
else
|
|
{
|
|
PyErr_SetString(PyExc_TypeError, "will value must be dictionary");
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
if ((temp = PyDict_GetItemString(pyoptions, "username")) != NULL)
|
|
{
|
|
if (PyString_Check(temp))
|
|
connectOptions.username = PyString_AsString(temp);
|
|
else
|
|
{
|
|
PyErr_SetString(PyExc_TypeError, "username value must be string");
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
if ((temp = PyDict_GetItemString(pyoptions, "password")) != NULL)
|
|
{
|
|
if (PyString_Check(temp))
|
|
connectOptions.username = PyString_AsString(temp);
|
|
else
|
|
{
|
|
PyErr_SetString(PyExc_TypeError, "password value must be string");
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
if ((temp = PyDict_GetItemString(pyoptions, "automaticReconnect")) != NULL)
|
|
{
|
|
if (PyInt_Check(temp))
|
|
connectOptions.automaticReconnect = (int) PyInt_AsLong(temp);
|
|
else
|
|
{
|
|
PyErr_SetString(PyExc_TypeError, "automatic reconnect value must be int");
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
if ((temp = PyDict_GetItemString(pyoptions, "minRetryInterval")) != NULL)
|
|
{
|
|
if (PyInt_Check(temp))
|
|
connectOptions.minRetryInterval = (int) PyInt_AsLong(temp);
|
|
else
|
|
{
|
|
PyErr_SetString(PyExc_TypeError, "minRetryInterval value must be int");
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
if ((temp = PyDict_GetItemString(pyoptions, "maxRetryInterval")) != NULL)
|
|
{
|
|
if (PyInt_Check(temp))
|
|
connectOptions.maxRetryInterval = (int) PyInt_AsLong(temp);
|
|
else
|
|
{
|
|
PyErr_SetString(PyExc_TypeError, "maxRetryInterval value must be int");
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
skip:
|
|
Py_BEGIN_ALLOW_THREADS
|
|
rc = MQTTAsync_connect(c, &connectOptions);
|
|
Py_END_ALLOW_THREADS
|
|
return Py_BuildValue("i", rc);
|
|
}
|
|
|
|
|
|
static PyObject* mqttv3_disconnect(PyObject* self, PyObject *args)
|
|
{
|
|
MQTTAsync c;
|
|
MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
|
|
int rc;
|
|
|
|
if (!PyArg_ParseTuple(args, "k|i", &c, &options.timeout))
|
|
return NULL;
|
|
|
|
Py_BEGIN_ALLOW_THREADS
|
|
rc = MQTTAsync_disconnect(c, &options);
|
|
Py_END_ALLOW_THREADS
|
|
return Py_BuildValue("i", rc);
|
|
}
|
|
|
|
|
|
static PyObject* mqttv3_isConnected(PyObject* self, PyObject *args)
|
|
{
|
|
MQTTAsync c;
|
|
int rc;
|
|
|
|
if (!PyArg_ParseTuple(args, "k", &c))
|
|
return NULL;
|
|
Py_BEGIN_ALLOW_THREADS
|
|
rc = MQTTAsync_isConnected(c);
|
|
Py_END_ALLOW_THREADS
|
|
return Py_BuildValue("i", rc);
|
|
}
|
|
|
|
|
|
static PyObject* mqttv3_subscribe(PyObject* self, PyObject *args)
|
|
{
|
|
MQTTAsync c;
|
|
MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
|
|
PyObject *pyoptions = NULL;
|
|
char* topic;
|
|
int qos = 2;
|
|
int rc;
|
|
|
|
if (!PyArg_ParseTuple(args, "ks|iO", &c, &topic, &qos, &pyoptions))
|
|
return NULL;
|
|
|
|
if (!getResponseOptions(c, pyoptions, &response, SUBSCRIBE))
|
|
return NULL;
|
|
|
|
Py_BEGIN_ALLOW_THREADS;
|
|
rc = MQTTAsync_subscribe(c, topic, qos, &response);
|
|
Py_END_ALLOW_THREADS;
|
|
return Py_BuildValue("i", rc);
|
|
}
|
|
|
|
|
|
static PyObject* mqttv3_subscribeMany(PyObject* self, PyObject *args)
|
|
{
|
|
MQTTAsync c;
|
|
MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
|
|
PyObject* topicList;
|
|
PyObject* qosList;
|
|
PyObject *pyoptions = NULL;
|
|
|
|
int count;
|
|
char** topics;
|
|
int* qoss;
|
|
|
|
int i, rc = 0;
|
|
|
|
if (!PyArg_ParseTuple(args, "kOO|O", &c, &topicList, &qosList, &pyoptions))
|
|
return NULL;
|
|
|
|
if (!getResponseOptions(c, pyoptions, &response, SUBSCRIBE))
|
|
return NULL;
|
|
|
|
if (!PySequence_Check(topicList) || !PySequence_Check(qosList))
|
|
{
|
|
PyErr_SetString(PyExc_TypeError,
|
|
"3rd and 4th parameters must be sequences");
|
|
return NULL;
|
|
}
|
|
|
|
if ((count = PySequence_Length(topicList)) != PySequence_Length(qosList))
|
|
{
|
|
PyErr_SetString(PyExc_TypeError,
|
|
"3rd and 4th parameters must be sequences of the same length");
|
|
return NULL;
|
|
}
|
|
|
|
topics = malloc(count * sizeof(char*));
|
|
for (i = 0; i < count; ++i)
|
|
topics[i] = PyString_AsString(PySequence_GetItem(topicList, i));
|
|
|
|
qoss = malloc(count * sizeof(int));
|
|
for (i = 0; i < count; ++i)
|
|
qoss[i] = (int) PyInt_AsLong(PySequence_GetItem(qosList, i));
|
|
|
|
Py_BEGIN_ALLOW_THREADS
|
|
rc = MQTTAsync_subscribeMany(c, count, topics, qoss, &response);
|
|
Py_END_ALLOW_THREADS
|
|
|
|
for (i = 0; i < count; ++i)
|
|
PySequence_SetItem(qosList, i, PyInt_FromLong((long) qoss[i]));
|
|
|
|
free(topics);
|
|
free(qoss);
|
|
|
|
if (rc == MQTTASYNC_SUCCESS)
|
|
return Py_BuildValue("iO", rc, qosList);
|
|
else
|
|
return Py_BuildValue("i", rc);
|
|
}
|
|
|
|
|
|
static PyObject* mqttv3_unsubscribe(PyObject* self, PyObject *args)
|
|
{
|
|
MQTTAsync c;
|
|
MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
|
|
PyObject *pyoptions = NULL;
|
|
char* topic;
|
|
int rc;
|
|
|
|
if (!PyArg_ParseTuple(args, "ks|O", &c, &topic, &pyoptions))
|
|
return NULL;
|
|
|
|
if (!getResponseOptions(c, pyoptions, &response, UNSUBSCRIBE))
|
|
return NULL;
|
|
|
|
Py_BEGIN_ALLOW_THREADS
|
|
rc = MQTTAsync_unsubscribe(c, topic, &response);
|
|
Py_END_ALLOW_THREADS
|
|
return Py_BuildValue("i", rc);
|
|
}
|
|
|
|
|
|
static PyObject* mqttv3_unsubscribeMany(PyObject* self, PyObject *args)
|
|
{
|
|
MQTTAsync c;
|
|
MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
|
|
PyObject* topicList;
|
|
PyObject *pyoptions = NULL;
|
|
|
|
int count;
|
|
char** topics;
|
|
|
|
int i, rc = 0;
|
|
|
|
if (!PyArg_ParseTuple(args, "kO|O", &c, &topicList, &pyoptions))
|
|
return NULL;
|
|
|
|
if (!getResponseOptions(c, pyoptions, &response, UNSUBSCRIBE))
|
|
return NULL;
|
|
|
|
if (!PySequence_Check(topicList))
|
|
{
|
|
PyErr_SetString(PyExc_TypeError, "3rd parameter must be sequences");
|
|
return NULL;
|
|
}
|
|
|
|
count = PySequence_Length(topicList);
|
|
topics = malloc(count * sizeof(char*));
|
|
for (i = 0; i < count; ++i)
|
|
topics[i] = PyString_AsString(PySequence_GetItem(topicList, i));
|
|
|
|
Py_BEGIN_ALLOW_THREADS
|
|
rc = MQTTAsync_unsubscribeMany(c, count, topics, &response);
|
|
Py_END_ALLOW_THREADS
|
|
|
|
free(topics);
|
|
|
|
return Py_BuildValue("i", rc);
|
|
}
|
|
|
|
|
|
static PyObject* mqttv3_send(PyObject* self, PyObject *args)
|
|
{
|
|
MQTTAsync c;
|
|
char* destinationName;
|
|
int payloadlen;
|
|
void* payload;
|
|
int qos = 0;
|
|
int retained = 0;
|
|
MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
|
|
PyObject *pyoptions = NULL;
|
|
int rc;
|
|
|
|
if (!PyArg_ParseTuple(args, "kss#|iiO", &c, &destinationName, &payload,
|
|
&payloadlen, &qos, &retained, &pyoptions))
|
|
return NULL;
|
|
|
|
if (!getResponseOptions(c, pyoptions, &response, PUBLISH))
|
|
return NULL;
|
|
|
|
Py_BEGIN_ALLOW_THREADS
|
|
rc = MQTTAsync_send(c, destinationName, payloadlen, payload, qos, retained, &response);
|
|
Py_END_ALLOW_THREADS
|
|
|
|
if (rc == MQTTASYNC_SUCCESS && qos > 0)
|
|
return Py_BuildValue("ii", rc, response);
|
|
else
|
|
return Py_BuildValue("i", rc);
|
|
}
|
|
|
|
|
|
static PyObject* mqttv3_sendMessage(PyObject* self, PyObject *args)
|
|
{
|
|
MQTTAsync c;
|
|
char* destinationName;
|
|
PyObject *message, *temp;
|
|
MQTTAsync_message msg = MQTTAsync_message_initializer;
|
|
MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
|
|
PyObject *pyoptions = NULL;
|
|
int rc;
|
|
|
|
if (!PyArg_ParseTuple(args, "ksO|O", &c, &destinationName, &message, &pyoptions))
|
|
return NULL;
|
|
|
|
if (!getResponseOptions(c, pyoptions, &response, PUBLISH))
|
|
return NULL;
|
|
|
|
if (!PyDict_Check(message))
|
|
{
|
|
PyErr_SetString(PyExc_TypeError, "3rd parameter must be a dictionary");
|
|
return NULL;
|
|
}
|
|
|
|
if ((temp = PyDict_GetItemString(message, "payload")) == NULL)
|
|
{
|
|
PyErr_SetString(PyExc_TypeError, "dictionary must have payload key");
|
|
return NULL;
|
|
}
|
|
|
|
if (PyString_Check(temp))
|
|
PyString_AsStringAndSize(temp, (char**) &msg.payload,
|
|
(Py_ssize_t*) &msg.payloadlen);
|
|
else
|
|
{
|
|
PyErr_SetString(PyExc_TypeError, "payload value must be string");
|
|
return NULL;
|
|
}
|
|
|
|
if ((temp = PyDict_GetItemString(message, "qos")) == NULL)
|
|
msg.qos = (int) PyInt_AsLong(temp);
|
|
|
|
if ((temp = PyDict_GetItemString(message, "retained")) == NULL)
|
|
msg.retained = (int) PyInt_AsLong(temp);
|
|
|
|
Py_BEGIN_ALLOW_THREADS
|
|
rc = MQTTAsync_sendMessage(c, destinationName, &msg, &response);
|
|
Py_END_ALLOW_THREADS
|
|
|
|
if (rc == MQTTASYNC_SUCCESS && msg.qos > 0)
|
|
return Py_BuildValue("ii", rc, response);
|
|
else
|
|
return Py_BuildValue("i", rc);
|
|
}
|
|
|
|
|
|
static PyObject* mqttv3_waitForCompletion(PyObject* self, PyObject *args)
|
|
{
|
|
MQTTAsync c;
|
|
unsigned long timeout = 1000L;
|
|
MQTTAsync_token dt;
|
|
int rc;
|
|
|
|
if (!PyArg_ParseTuple(args, "ki|i", &c, &dt, &timeout))
|
|
return NULL;
|
|
|
|
Py_BEGIN_ALLOW_THREADS
|
|
rc = MQTTAsync_waitForCompletion(c, dt, timeout);
|
|
Py_END_ALLOW_THREADS
|
|
|
|
return Py_BuildValue("i", rc);
|
|
}
|
|
|
|
|
|
static PyObject* mqttv3_getPendingTokens(PyObject* self, PyObject *args)
|
|
{
|
|
MQTTAsync c;
|
|
MQTTAsync_token* tokens;
|
|
int rc;
|
|
|
|
if (!PyArg_ParseTuple(args, "k", &c))
|
|
return NULL;
|
|
|
|
Py_BEGIN_ALLOW_THREADS
|
|
rc = MQTTAsync_getPendingTokens(c, &tokens);
|
|
Py_END_ALLOW_THREADS
|
|
|
|
if (rc == MQTTASYNC_SUCCESS)
|
|
{
|
|
int i = 0;
|
|
PyObject* dts = PyList_New(0);
|
|
|
|
while (tokens[i] != -1)
|
|
PyList_Append(dts, PyInt_FromLong((long) tokens[i]));
|
|
|
|
return Py_BuildValue("iO", rc, dts);
|
|
}
|
|
else
|
|
return Py_BuildValue("i", rc);
|
|
}
|
|
|
|
|
|
static PyObject* mqttv3_destroy(PyObject* self, PyObject *args)
|
|
{
|
|
MQTTAsync c;
|
|
ListElement* temp = NULL;
|
|
|
|
if (!PyArg_ParseTuple(args, "k", &c))
|
|
return NULL;
|
|
|
|
if ((temp = ListFindItem(callbacks, c, clientCompare)) != NULL)
|
|
{
|
|
ListDetach(callbacks, temp->content);
|
|
free(temp->content);
|
|
}
|
|
|
|
if ((temp = ListFindItem(connected_callbacks, c, connectedCompare)) != NULL)
|
|
{
|
|
ListDetach(connected_callbacks, temp->content);
|
|
free(temp->content);
|
|
}
|
|
|
|
MQTTAsync_destroy(&c);
|
|
|
|
Py_INCREF(Py_None);
|
|
return Py_None;
|
|
}
|
|
|
|
|
|
static PyMethodDef MqttV3Methods[] =
|
|
{
|
|
{ "create", mqttv3_create, METH_VARARGS, "Create an MQTTv3 client." },
|
|
{ "setcallbacks", mqttv3_setcallbacks, METH_VARARGS,
|
|
"Sets the callback functions for a particular client." },
|
|
{ "setconnected", mqttv3_setconnected, METH_VARARGS,
|
|
"Sets the connected callback function for a particular client." },
|
|
{ "connect", mqttv3_connect, METH_VARARGS,
|
|
"Connects to a server using the specified options." },
|
|
{ "disconnect", mqttv3_disconnect, METH_VARARGS,
|
|
"Disconnects from a server." },
|
|
{ "isConnected", mqttv3_isConnected, METH_VARARGS,
|
|
"Determines if this client is currently connected to the server." },
|
|
{ "subscribe", mqttv3_subscribe, METH_VARARGS,
|
|
"Subscribe to the given topic." },
|
|
{ "subscribeMany", mqttv3_subscribeMany, METH_VARARGS,
|
|
"Subscribe to the given topics." },
|
|
{ "unsubscribe", mqttv3_unsubscribe, METH_VARARGS,
|
|
"Unsubscribe from the given topic." },
|
|
{ "unsubscribeMany", mqttv3_unsubscribeMany, METH_VARARGS,
|
|
"Unsubscribe from the given topics." },
|
|
{ "send", mqttv3_send, METH_VARARGS,
|
|
"Publish a message to the given topic." },
|
|
{ "sendMessage", mqttv3_sendMessage, METH_VARARGS,
|
|
"Publish a message to the given topic." },
|
|
{ "waitForCompletion", mqttv3_waitForCompletion, METH_VARARGS,
|
|
"Waits for the completion of the delivery of the message represented by a delivery token." },
|
|
{ "getPendingTokens", mqttv3_getPendingTokens, METH_VARARGS,
|
|
"Returns the tokens pending of completion." },
|
|
{ "destroy", mqttv3_destroy, METH_VARARGS,
|
|
"Free memory allocated to a MQTT client. It is the opposite to create." },
|
|
{ NULL, NULL, 0, NULL } /* Sentinel */
|
|
};
|
|
|
|
|
|
PyMODINIT_FUNC initpaho_mqtt3a(void)
|
|
{
|
|
PyObject *m;
|
|
|
|
PyEval_InitThreads();
|
|
|
|
callbacks = ListInitialize();
|
|
connected_callbacks = ListInitialize();
|
|
|
|
m = Py_InitModule("paho_mqtt3a", MqttV3Methods);
|
|
if (m == NULL)
|
|
return;
|
|
|
|
MqttV3Error = PyErr_NewException("paho_mqtt3a.error", NULL, NULL);
|
|
Py_INCREF(MqttV3Error);
|
|
PyModule_AddObject(m, "error", MqttV3Error);
|
|
|
|
PyModule_AddIntConstant(m, "SUCCESS", MQTTASYNC_SUCCESS);
|
|
PyModule_AddIntConstant(m, "FAILURE", MQTTASYNC_FAILURE);
|
|
PyModule_AddIntConstant(m, "DISCONNECTED", MQTTASYNC_DISCONNECTED);
|
|
PyModule_AddIntConstant(m, "MAX_MESSAGES_INFLIGHT", MQTTASYNC_MAX_MESSAGES_INFLIGHT);
|
|
PyModule_AddIntConstant(m, "BAD_UTF8_STRING", MQTTASYNC_BAD_UTF8_STRING);
|
|
PyModule_AddIntConstant(m, "BAD_NULL_PARAMETER", MQTTASYNC_NULL_PARAMETER);
|
|
PyModule_AddIntConstant(m, "BAD_TOPICNAME_TRUNCATED", MQTTASYNC_TOPICNAME_TRUNCATED);
|
|
PyModule_AddIntConstant(m, "PERSISTENCE_DEFAULT", MQTTCLIENT_PERSISTENCE_DEFAULT);
|
|
PyModule_AddIntConstant(m, "PERSISTENCE_NONE", MQTTCLIENT_PERSISTENCE_NONE);
|
|
PyModule_AddIntConstant(m, "PERSISTENCE_USER", MQTTCLIENT_PERSISTENCE_USER);
|
|
PyModule_AddIntConstant(m, "PERSISTENCE_ERROR",
|
|
MQTTCLIENT_PERSISTENCE_ERROR);
|
|
}
|