aclk_util.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "aclk_util.h"
  3. #include "aclk_proxy.h"
  4. #include "daemon/common.h"
  5. usec_t aclk_session_newarch = 0;
  6. aclk_env_t *aclk_env = NULL;
  7. int chart_batch_id;
  8. aclk_encoding_type_t aclk_encoding_type_t_from_str(const char *str) {
  9. if (!strcmp(str, "json")) {
  10. return ACLK_ENC_JSON;
  11. }
  12. if (!strcmp(str, "proto")) {
  13. return ACLK_ENC_PROTO;
  14. }
  15. return ACLK_ENC_UNKNOWN;
  16. }
  17. aclk_transport_type_t aclk_transport_type_t_from_str(const char *str) {
  18. if (!strcmp(str, "MQTTv3")) {
  19. return ACLK_TRP_MQTT_3_1_1;
  20. }
  21. if (!strcmp(str, "MQTTv5")) {
  22. return ACLK_TRP_MQTT_5;
  23. }
  24. return ACLK_TRP_UNKNOWN;
  25. }
  26. void aclk_transport_desc_t_destroy(aclk_transport_desc_t *trp_desc) {
  27. freez(trp_desc->endpoint);
  28. }
  29. void aclk_env_t_destroy(aclk_env_t *env) {
  30. freez(env->auth_endpoint);
  31. if (env->transports) {
  32. for (size_t i = 0; i < env->transport_count; i++) {
  33. if(env->transports[i]) {
  34. aclk_transport_desc_t_destroy(env->transports[i]);
  35. freez(env->transports[i]);
  36. env->transports[i] = NULL;
  37. }
  38. }
  39. freez(env->transports);
  40. }
  41. if (env->capabilities) {
  42. for (size_t i = 0; i < env->capability_count; i++)
  43. freez(env->capabilities[i]);
  44. freez(env->capabilities);
  45. }
  46. }
  47. int aclk_env_has_capa(const char *capa)
  48. {
  49. for (int i = 0; i < (int) aclk_env->capability_count; i++) {
  50. if (!strcasecmp(capa, aclk_env->capabilities[i]))
  51. return 1;
  52. }
  53. return 0;
  54. }
  55. #ifdef ACLK_LOG_CONVERSATION_DIR
  56. volatile int aclk_conversation_log_counter = 0;
  57. #endif
  58. #define ACLK_TOPIC_PREFIX "/agent/"
  59. struct aclk_topic {
  60. enum aclk_topics topic_id;
  61. // as received from cloud - we keep this for
  62. // eventual topic list update when claim_id changes
  63. char *topic_recvd;
  64. // constructed topic
  65. char *topic;
  66. };
  67. // This helps to cache finalized topics (assembled with claim_id)
  68. // to not have to alloc or create buffer and construct topic every
  69. // time message is sent as in old ACLK
  70. static struct aclk_topic **aclk_topic_cache = NULL;
  71. static size_t aclk_topic_cache_items = 0;
  72. void free_topic_cache(void)
  73. {
  74. if (aclk_topic_cache) {
  75. for (size_t i = 0; i < aclk_topic_cache_items; i++) {
  76. freez(aclk_topic_cache[i]->topic);
  77. freez(aclk_topic_cache[i]->topic_recvd);
  78. freez(aclk_topic_cache[i]);
  79. }
  80. freez(aclk_topic_cache);
  81. aclk_topic_cache = NULL;
  82. aclk_topic_cache_items = 0;
  83. }
  84. }
  85. #define JSON_TOPIC_KEY_TOPIC "topic"
  86. #define JSON_TOPIC_KEY_NAME "name"
  87. struct topic_name {
  88. enum aclk_topics id;
  89. // cloud name - how is it called
  90. // in answer to /password endpoint
  91. const char *name;
  92. } topic_names[] = {
  93. { .id = ACLK_TOPICID_CHART, .name = "chart" },
  94. { .id = ACLK_TOPICID_ALARMS, .name = "alarms" },
  95. { .id = ACLK_TOPICID_METADATA, .name = "meta" },
  96. { .id = ACLK_TOPICID_COMMAND, .name = "inbox-cmd" },
  97. { .id = ACLK_TOPICID_AGENT_CONN, .name = "agent-connection" },
  98. { .id = ACLK_TOPICID_CMD_NG_V1, .name = "inbox-cmd-v1" },
  99. { .id = ACLK_TOPICID_CREATE_NODE, .name = "create-node-instance" },
  100. { .id = ACLK_TOPICID_NODE_CONN, .name = "node-instance-connection" },
  101. { .id = ACLK_TOPICID_CHART_DIMS, .name = "chart-and-dims-updated" },
  102. { .id = ACLK_TOPICID_CHART_CONFIGS_UPDATED, .name = "chart-configs-updated" },
  103. { .id = ACLK_TOPICID_CHART_RESET, .name = "reset-charts" },
  104. { .id = ACLK_TOPICID_RETENTION_UPDATED, .name = "chart-retention-updated" },
  105. { .id = ACLK_TOPICID_NODE_INFO, .name = "node-instance-info" },
  106. { .id = ACLK_TOPICID_ALARM_LOG, .name = "alarm-log" },
  107. { .id = ACLK_TOPICID_ALARM_HEALTH, .name = "alarm-health" },
  108. { .id = ACLK_TOPICID_ALARM_CONFIG, .name = "alarm-config" },
  109. { .id = ACLK_TOPICID_ALARM_SNAPSHOT, .name = "alarm-snapshot" },
  110. { .id = ACLK_TOPICID_NODE_COLLECTORS, .name = "node-instance-collectors" },
  111. { .id = ACLK_TOPICID_CTXS_SNAPSHOT, .name = "contexts-snapshot" },
  112. { .id = ACLK_TOPICID_CTXS_UPDATED, .name = "contexts-updated" },
  113. { .id = ACLK_TOPICID_UNKNOWN, .name = NULL }
  114. };
  115. enum aclk_topics compulsory_topics[] = {
  116. // TODO remove old topics once not needed anymore
  117. ACLK_TOPICID_CHART, //TODO from legacy
  118. ACLK_TOPICID_ALARMS, //TODO from legacy
  119. ACLK_TOPICID_METADATA, //TODO from legacy
  120. ACLK_TOPICID_COMMAND,
  121. ACLK_TOPICID_AGENT_CONN,
  122. ACLK_TOPICID_CMD_NG_V1,
  123. ACLK_TOPICID_CREATE_NODE,
  124. ACLK_TOPICID_NODE_CONN,
  125. ACLK_TOPICID_CHART_DIMS,
  126. ACLK_TOPICID_CHART_CONFIGS_UPDATED,
  127. ACLK_TOPICID_CHART_RESET,
  128. ACLK_TOPICID_RETENTION_UPDATED,
  129. ACLK_TOPICID_NODE_INFO,
  130. ACLK_TOPICID_ALARM_LOG,
  131. ACLK_TOPICID_ALARM_HEALTH,
  132. ACLK_TOPICID_ALARM_CONFIG,
  133. ACLK_TOPICID_ALARM_SNAPSHOT,
  134. ACLK_TOPICID_NODE_COLLECTORS,
  135. ACLK_TOPICID_CTXS_SNAPSHOT,
  136. ACLK_TOPICID_CTXS_UPDATED,
  137. ACLK_TOPICID_UNKNOWN
  138. };
  139. static enum aclk_topics topic_name_to_id(const char *name) {
  140. struct topic_name *topic = topic_names;
  141. while (topic->name) {
  142. if (!strcmp(topic->name, name)) {
  143. return topic->id;
  144. }
  145. topic++;
  146. }
  147. return ACLK_TOPICID_UNKNOWN;
  148. }
  149. static const char *topic_id_to_name(enum aclk_topics tid) {
  150. struct topic_name *topic = topic_names;
  151. while (topic->name) {
  152. if (topic->id == tid)
  153. return topic->name;
  154. topic++;
  155. }
  156. return "unknown";
  157. }
  158. #define CLAIM_ID_REPLACE_TAG "#{claim_id}"
  159. static void topic_generate_final(struct aclk_topic *t) {
  160. char *dest;
  161. char *replace_tag = strstr(t->topic_recvd, CLAIM_ID_REPLACE_TAG);
  162. if (!replace_tag)
  163. return;
  164. rrdhost_aclk_state_lock(localhost);
  165. if (unlikely(!localhost->aclk_state.claimed_id)) {
  166. error("This should never be called if agent not claimed");
  167. rrdhost_aclk_state_unlock(localhost);
  168. return;
  169. }
  170. t->topic = mallocz(strlen(t->topic_recvd) + 1 - strlen(CLAIM_ID_REPLACE_TAG) + strlen(localhost->aclk_state.claimed_id));
  171. memcpy(t->topic, t->topic_recvd, replace_tag - t->topic_recvd);
  172. dest = t->topic + (replace_tag - t->topic_recvd);
  173. memcpy(dest, localhost->aclk_state.claimed_id, strlen(localhost->aclk_state.claimed_id));
  174. dest += strlen(localhost->aclk_state.claimed_id);
  175. rrdhost_aclk_state_unlock(localhost);
  176. replace_tag += strlen(CLAIM_ID_REPLACE_TAG);
  177. strcpy(dest, replace_tag);
  178. dest += strlen(replace_tag);
  179. *dest = 0;
  180. }
  181. static int topic_cache_add_topic(struct json_object *json, struct aclk_topic *topic)
  182. {
  183. struct json_object_iterator it;
  184. struct json_object_iterator itEnd;
  185. it = json_object_iter_begin(json);
  186. itEnd = json_object_iter_end(json);
  187. while (!json_object_iter_equal(&it, &itEnd)) {
  188. if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_NAME)) {
  189. if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) {
  190. error("topic dictionary key \"" JSON_TOPIC_KEY_NAME "\" is expected to be json_type_string");
  191. return 1;
  192. }
  193. topic->topic_id = topic_name_to_id(json_object_get_string(json_object_iter_peek_value(&it)));
  194. if (topic->topic_id == ACLK_TOPICID_UNKNOWN) {
  195. debug(D_ACLK, "topic dictionary has unknown topic name \"%s\"", json_object_get_string(json_object_iter_peek_value(&it)));
  196. }
  197. json_object_iter_next(&it);
  198. continue;
  199. }
  200. if (!strcmp(json_object_iter_peek_name(&it), JSON_TOPIC_KEY_TOPIC)) {
  201. if (json_object_get_type(json_object_iter_peek_value(&it)) != json_type_string) {
  202. error("topic dictionary key \"" JSON_TOPIC_KEY_TOPIC "\" is expected to be json_type_string");
  203. return 1;
  204. }
  205. topic->topic_recvd = strdupz(json_object_get_string(json_object_iter_peek_value(&it)));
  206. json_object_iter_next(&it);
  207. continue;
  208. }
  209. error("topic dictionary has Unknown/Unexpected key \"%s\" in topic description. Ignoring!", json_object_iter_peek_name(&it));
  210. json_object_iter_next(&it);
  211. }
  212. if (!topic->topic_recvd) {
  213. error("topic dictionary Missig compulsory key %s", JSON_TOPIC_KEY_TOPIC);
  214. return 1;
  215. }
  216. topic_generate_final(topic);
  217. aclk_topic_cache_items++;
  218. return 0;
  219. }
  220. int aclk_generate_topic_cache(struct json_object *json)
  221. {
  222. json_object *obj;
  223. size_t array_size = json_object_array_length(json);
  224. if (!array_size) {
  225. error("Empty topic list!");
  226. return 1;
  227. }
  228. if (aclk_topic_cache)
  229. free_topic_cache();
  230. aclk_topic_cache = callocz(array_size, sizeof(struct aclk_topic *));
  231. for (size_t i = 0; i < array_size; i++) {
  232. obj = json_object_array_get_idx(json, i);
  233. if (json_object_get_type(obj) != json_type_object) {
  234. error("expected json_type_object");
  235. return 1;
  236. }
  237. aclk_topic_cache[i] = callocz(1, sizeof(struct aclk_topic));
  238. if (topic_cache_add_topic(obj, aclk_topic_cache[i])) {
  239. error("failed to parse topic @idx=%d", (int)i);
  240. return 1;
  241. }
  242. }
  243. for (int i = 0; compulsory_topics[i] != ACLK_TOPICID_UNKNOWN; i++) {
  244. if (!aclk_get_topic(compulsory_topics[i])) {
  245. error("missing compulsory topic \"%s\" in password response from cloud", topic_id_to_name(compulsory_topics[i]));
  246. return 1;
  247. }
  248. }
  249. return 0;
  250. }
  251. /*
  252. * Build a topic based on sub_topic and final_topic
  253. * if the sub topic starts with / assume that is an absolute topic
  254. *
  255. */
  256. const char *aclk_get_topic(enum aclk_topics topic)
  257. {
  258. if (!aclk_topic_cache) {
  259. error("Topic cache not initialized");
  260. return NULL;
  261. }
  262. for (size_t i = 0; i < aclk_topic_cache_items; i++) {
  263. if (aclk_topic_cache[i]->topic_id == topic)
  264. return aclk_topic_cache[i]->topic;
  265. }
  266. error("Unknown topic");
  267. return NULL;
  268. }
  269. /*
  270. * Allows iterating all topics in topic cache without
  271. * having to resort to callbacks.
  272. */
  273. const char *aclk_topic_cache_iterate(aclk_topic_cache_iter_t *iter)
  274. {
  275. if (!aclk_topic_cache) {
  276. error("Topic cache not initialized when %s was called.", __FUNCTION__);
  277. return NULL;
  278. }
  279. if (*iter >= aclk_topic_cache_items)
  280. return NULL;
  281. return aclk_topic_cache[(*iter)++]->topic;
  282. }
  283. /*
  284. * TBEB with randomness
  285. *
  286. * @param reset 1 - to reset the delay,
  287. * 0 - to advance a step and calculate sleep time in ms
  288. * @param min, max in seconds
  289. * @returns delay in ms
  290. *
  291. */
  292. unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, unsigned long int max) {
  293. static int attempt = -1;
  294. if (reset) {
  295. attempt = -1;
  296. return 0;
  297. }
  298. attempt++;
  299. if (attempt == 0) {
  300. srandom(time(NULL));
  301. return 0;
  302. }
  303. unsigned long int delay = pow(base, attempt - 1);
  304. delay *= MSEC_PER_SEC;
  305. delay += (random() % (MAX(1000, delay/2)));
  306. if (delay <= min * MSEC_PER_SEC)
  307. return min;
  308. if (delay >= max * MSEC_PER_SEC)
  309. return max;
  310. return delay;
  311. }
  312. static inline int aclk_parse_pair(const char *src, const char c, char **a, char **b)
  313. {
  314. const char *ptr = strchr(src, c);
  315. if (ptr == NULL)
  316. return 1;
  317. // allow empty string
  318. /* if (!*(ptr+1))
  319. return 1;*/
  320. *a = callocz(1, ptr - src + 1);
  321. memcpy (*a, src, ptr - src);
  322. *b = strdupz(ptr+1);
  323. return 0;
  324. }
  325. #define HTTP_PROXY_PREFIX "http://"
  326. void aclk_set_proxy(char **ohost, int *port, char **uname, char **pwd, enum mqtt_wss_proxy_type *type)
  327. {
  328. ACLK_PROXY_TYPE pt;
  329. const char *ptr = aclk_get_proxy(&pt);
  330. char *tmp;
  331. if (pt != PROXY_TYPE_HTTP)
  332. return;
  333. *uname = NULL;
  334. *pwd = NULL;
  335. *port = 0;
  336. char *proxy = strdupz(ptr);
  337. ptr = proxy;
  338. if (!strncmp(ptr, HTTP_PROXY_PREFIX, strlen(HTTP_PROXY_PREFIX)))
  339. ptr += strlen(HTTP_PROXY_PREFIX);
  340. if ((tmp = strchr(ptr, '@'))) {
  341. *tmp = 0;
  342. if(aclk_parse_pair(ptr, ':', uname, pwd)) {
  343. error_report("Failed to get username and password for proxy. Will attempt connection without authentication");
  344. }
  345. ptr = tmp+1;
  346. }
  347. if (!*ptr) {
  348. freez(proxy);
  349. freez(*uname);
  350. freez(*pwd);
  351. return;
  352. }
  353. if ((tmp = strchr(ptr, ':'))) {
  354. *tmp = 0;
  355. tmp++;
  356. if(*tmp)
  357. *port = atoi(tmp);
  358. }
  359. *ohost = strdupz(ptr);
  360. if (*port <= 0 || *port > 65535)
  361. *port = 8080;
  362. if (type)
  363. *type = MQTT_WSS_PROXY_HTTP;
  364. else {
  365. freez(*uname);
  366. freez(*pwd);
  367. }
  368. freez(proxy);
  369. }
  370. #if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110
  371. static EVP_ENCODE_CTX *EVP_ENCODE_CTX_new(void)
  372. {
  373. EVP_ENCODE_CTX *ctx = OPENSSL_malloc(sizeof(*ctx));
  374. if (ctx != NULL) {
  375. memset(ctx, 0, sizeof(*ctx));
  376. }
  377. return ctx;
  378. }
  379. static void EVP_ENCODE_CTX_free(EVP_ENCODE_CTX *ctx)
  380. {
  381. OPENSSL_free(ctx);
  382. return;
  383. }
  384. #endif
  385. int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len)
  386. {
  387. int len;
  388. unsigned char *str = out;
  389. EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new();
  390. EVP_EncodeInit(ctx);
  391. EVP_EncodeUpdate(ctx, str, outl, in, in_len);
  392. str += *outl;
  393. EVP_EncodeFinal(ctx, str, &len);
  394. *outl += len;
  395. str = out;
  396. while(*str) {
  397. if (*str != 0x0D && *str != 0x0A)
  398. *out++ = *str++;
  399. else
  400. str++;
  401. }
  402. *out = 0;
  403. EVP_ENCODE_CTX_free(ctx);
  404. return 0;
  405. }