curl_stream.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651
  1. // Copyright (C) 2004-2021 Artifex Software, Inc.
  2. //
  3. // This file is part of MuPDF.
  4. //
  5. // MuPDF is free software: you can redistribute it and/or modify it under the
  6. // terms of the GNU Affero General Public License as published by the Free
  7. // Software Foundation, either version 3 of the License, or (at your option)
  8. // any later version.
  9. //
  10. // MuPDF is distributed in the hope that it will be useful, but WITHOUT ANY
  11. // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  12. // FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
  13. // details.
  14. //
  15. // You should have received a copy of the GNU Affero General Public License
  16. // along with MuPDF. If not, see <https://www.gnu.org/licenses/agpl-3.0.en.html>
  17. //
  18. // Alternative licensing terms are available from the licensor.
  19. // For commercial licensing, see <https://www.artifex.com/> or contact
  20. // Artifex Software, Inc., 39 Mesa Street, Suite 108A, San Francisco,
  21. // CA 94129, USA, for further information.
  22. #include "mupdf/fitz.h"
  23. #include "curl_stream.h"
  24. #include <assert.h>
  25. #include <string.h>
  26. #include <ctype.h>
  27. #include <curl/curl.h>
  28. #ifdef _WIN32
  29. #include <windows.h>
  30. #else
  31. #include <pthread.h>
  32. #endif
  33. #undef DEBUG_BLOCK_FETCHING
  34. #ifdef DEBUG_BLOCK_FETCHING
  35. #ifdef _WIN32
  36. #include <varargs.h>
  37. static void
  38. output(const char *fmt, ...)
  39. {
  40. va_list args;
  41. char text[256];
  42. va_start(args, fmt);
  43. vsnprintf(text, sizeof(text), fmt, args);
  44. va_end(args);
  45. OutputDebugString(text);
  46. }
  47. #else
  48. #define output printf
  49. #endif
  50. #define DEBUG_MESSAGE(A) do { output A; } while(0)
  51. #else
  52. #define DEBUG_MESSAGE(A) do { } while(0)
  53. #endif
  54. #define BLOCK_SHIFT 18
  55. #define BLOCK_SIZE (1<<BLOCK_SHIFT)
  56. #define HAVE_BLOCK(map, num) (((map)[(num)>>3] & (1<<((num) & 7))) != 0)
  57. typedef struct curlstate
  58. {
  59. fz_context *ctx;
  60. CURL *easy;
  61. /* START: The following entries are protected by the lock */
  62. CURLcode curl_error;
  63. char error_buffer[CURL_ERROR_SIZE];
  64. int data_arrived;
  65. int complete;
  66. int kill_thread;
  67. int accept_ranges;
  68. int head;
  69. /* content buffer */
  70. size_t content_length; /* 0 => Unknown length */
  71. unsigned char *buffer;
  72. size_t buffer_fill;
  73. size_t buffer_max;
  74. /* map of which blocks we have */
  75. unsigned char *map;
  76. size_t map_length;
  77. /* outstanding curl request info */
  78. size_t next_fill_start; /* The next file offset we will fetch to */
  79. size_t current_fill_start; /* The current file offset we are fetching to */
  80. size_t current_fill_end;
  81. /* END: The above entries are protected by the lock */
  82. void (*more_data)(void *,int);
  83. void *more_data_arg;
  84. unsigned char public_buffer[4096];
  85. /* We assume either Windows threads or pthreads here. */
  86. #ifdef _WIN32
  87. void *thread;
  88. DWORD thread_id;
  89. HANDLE mutex;
  90. #else
  91. pthread_t thread;
  92. pthread_mutex_t mutex;
  93. #endif
  94. } curlstate;
  95. #ifdef _WIN32
  96. static int locked;
  97. static void
  98. lock(curlstate *state)
  99. {
  100. WaitForSingleObject(state->mutex, INFINITE);
  101. assert(locked == 0);
  102. locked = 1;
  103. }
  104. static void
  105. unlock(curlstate *state)
  106. {
  107. assert(locked == 1);
  108. locked = 0;
  109. ReleaseMutex(state->mutex);
  110. }
  111. #else
  112. static void
  113. lock(curlstate *state)
  114. {
  115. pthread_mutex_lock(&state->mutex);
  116. }
  117. static void
  118. unlock(curlstate *state)
  119. {
  120. pthread_mutex_unlock(&state->mutex);
  121. }
  122. #endif
  123. static size_t on_curl_header(void *ptr, size_t size, size_t nmemb, void *state_)
  124. {
  125. struct curlstate *state = state_;
  126. lock(state);
  127. if (fz_strncasecmp(ptr, "Accept-Ranges: bytes", 20) == 0)
  128. {
  129. DEBUG_MESSAGE(("header arrived with Accept-Ranges!\n"));
  130. state->accept_ranges = 1;
  131. }
  132. if (fz_strncasecmp(ptr, "Content-Length:", 15) == 0)
  133. {
  134. char *s = ptr;
  135. state->content_length = fz_atoi(s + 15);
  136. DEBUG_MESSAGE(("header arrived with Content-Length: %zu\n", state->content_length));
  137. }
  138. unlock(state);
  139. return nmemb * size;
  140. }
  141. static size_t on_curl_data(void *ptr, size_t size, size_t nmemb, void *state_)
  142. {
  143. struct curlstate *state = state_;
  144. size_t old_start;
  145. size *= nmemb;
  146. lock(state);
  147. if (state->data_arrived == 0)
  148. {
  149. /* This is the first time data has arrived.
  150. * If the header has Accept-Ranges then we can do byte requests.
  151. * We know the Content-Length from having processed the header already.
  152. */
  153. if (state->content_length == 0)
  154. {
  155. /* What a crap server. Won't tell us how big the file
  156. * is. We'll have to expand as data as arrives. */
  157. DEBUG_MESSAGE(("have no length!\n"));
  158. }
  159. else if (state->accept_ranges)
  160. {
  161. /* We got a range header, and the correct http response
  162. * code. We can assume that byte fetches are accepted
  163. * and we'll run without progressive mode. */
  164. size_t len = state->content_length;
  165. state->map_length = (len+BLOCK_SIZE-1)>>BLOCK_SHIFT;
  166. state->map = fz_malloc_no_throw(state->ctx, (state->map_length+7)>>3);
  167. state->buffer = fz_malloc_no_throw(state->ctx, len);
  168. state->buffer_max = len;
  169. if (state->map == NULL || state->buffer == NULL)
  170. {
  171. unlock(state);
  172. return 0;
  173. }
  174. memset(state->map, 0, (state->map_length+7)>>3);
  175. DEBUG_MESSAGE(("have range header content_length=%zu!\n", state->content_length));
  176. }
  177. else
  178. {
  179. /* We know the length, and that we can use ByteRanges -
  180. * we can run as a progressive file. */
  181. state->buffer = fz_malloc_no_throw(state->ctx, state->content_length);
  182. if (state->buffer == NULL)
  183. {
  184. unlock(state);
  185. return 0;
  186. }
  187. state->buffer_max = state->content_length;
  188. }
  189. state->data_arrived = 1;
  190. }
  191. if (state->content_length == 0)
  192. {
  193. size_t newsize = (state->current_fill_start + size);
  194. if (newsize > state->buffer_max)
  195. {
  196. /* Expand the buffer */
  197. size_t new_max = state->buffer_max * 2;
  198. if (new_max == 0)
  199. new_max = 4096;
  200. fz_try(state->ctx)
  201. state->buffer = fz_realloc_array(state->ctx, state->buffer, new_max, unsigned char);
  202. fz_catch(state->ctx)
  203. {
  204. unlock(state);
  205. return 0;
  206. }
  207. state->buffer_max = new_max;
  208. }
  209. }
  210. DEBUG_MESSAGE(("data arrived: offset=%ld len=%ld\n", state->current_fill_start, size));
  211. /* Although we always trigger fills starting on block boundaries,
  212. * code this to allow for curl calling us to copy smaller blocks
  213. * as they arrive. */
  214. old_start = state->current_fill_start;
  215. if (state->current_fill_start + size > state->buffer_max) {
  216. unlock(state);
  217. return 0;
  218. }
  219. memcpy(state->buffer + state->current_fill_start, ptr, size);
  220. state->current_fill_start += size;
  221. /* If we've reached the end, or at least a different block
  222. * mark that we've got that block. */
  223. if (state->map && (state->current_fill_start == state->content_length ||
  224. (((state->current_fill_start ^ old_start) & ~(BLOCK_SIZE-1)) != 0)))
  225. {
  226. old_start >>= BLOCK_SHIFT;
  227. state->map[old_start>>3] |= 1<<(old_start & 7);
  228. }
  229. unlock(state);
  230. return size;
  231. }
  232. static void fetch_chunk(struct curlstate *state)
  233. {
  234. char text[32];
  235. size_t block, start, end;
  236. CURLcode ret;
  237. ret = curl_easy_perform(state->easy);
  238. if (ret != CURLE_OK) {
  239. /* If we get an error, store it, and kill the thread.
  240. * The next fetch will return it. */
  241. lock(state);
  242. state->curl_error = ret;
  243. state->kill_thread = 1;
  244. unlock(state);
  245. return;
  246. }
  247. /* We finished the header, now request the body. */
  248. lock(state);
  249. if (state->head)
  250. {
  251. state->head = 0;
  252. curl_easy_setopt(state->easy, CURLOPT_NOBODY, 0);
  253. curl_easy_setopt(state->easy, CURLOPT_HEADERFUNCTION, NULL);
  254. curl_easy_setopt(state->easy, CURLOPT_WRITEHEADER, NULL);
  255. if (state->accept_ranges)
  256. {
  257. fz_snprintf(text, 32, "%d-%d", 0, BLOCK_SIZE-1);
  258. curl_easy_setopt(state->easy, CURLOPT_RANGE, text);
  259. state->next_fill_start = BLOCK_SIZE;
  260. }
  261. unlock(state);
  262. return;
  263. }
  264. /* We finished the current body. If not accepting ranges, that's the end. */
  265. if (!state->accept_ranges)
  266. {
  267. DEBUG_MESSAGE(("we got it all, in one request.\n"));
  268. state->complete = 1;
  269. state->kill_thread = 1;
  270. unlock(state);
  271. return;
  272. }
  273. /* Find the next block to fetch */
  274. assert((state->next_fill_start & (BLOCK_SHIFT-1)) == 0);
  275. block = state->next_fill_start>>BLOCK_SHIFT;
  276. if (state->content_length > 0)
  277. {
  278. /* Find the next block that we haven't got */
  279. size_t map_length = state->map_length;
  280. unsigned char *map = state->map;
  281. while (block < map_length && HAVE_BLOCK(map, block))
  282. ++block;
  283. if (block == map_length)
  284. {
  285. block = 0;
  286. while (block < map_length && HAVE_BLOCK(map, block))
  287. ++block;
  288. if (block == map_length)
  289. {
  290. /* We've got it all! */
  291. DEBUG_MESSAGE(("we got it all block=%zu map_length=%zu!\n", block, map_length));
  292. state->complete = 1;
  293. state->kill_thread = 1;
  294. unlock(state);
  295. return;
  296. }
  297. }
  298. }
  299. else
  300. {
  301. state->complete = 1;
  302. state->kill_thread = 1;
  303. unlock(state);
  304. return;
  305. }
  306. DEBUG_MESSAGE(("block requested was %zu, fetching %zu\n", state->next_fill_start>>BLOCK_SHIFT, block));
  307. /* Set up fetch of that block */
  308. start = block<<BLOCK_SHIFT;
  309. end = start + BLOCK_SIZE-1;
  310. state->current_fill_start = start;
  311. if (state->content_length > 0 && end >= state->content_length)
  312. end = state->content_length-1;
  313. state->current_fill_end = end;
  314. fz_snprintf(text, 32, "%d-%d", start, end);
  315. /* Unless anyone changes this in the meantime, the
  316. * next block we fetch will follow on from this one. */
  317. state->next_fill_start = state->current_fill_start+BLOCK_SIZE;
  318. unlock(state);
  319. /* Request next range! */
  320. DEBUG_MESSAGE(("requesting range %s\n", text));
  321. curl_easy_setopt(state->easy, CURLOPT_RANGE, text);
  322. }
  323. static int cs_next(fz_context *ctx, fz_stream *stream, size_t len)
  324. {
  325. struct curlstate *state = stream->state;
  326. size_t len_read = 0;
  327. int64_t read_point = stream->pos;
  328. int block = read_point>>BLOCK_SHIFT;
  329. size_t left_over = (-read_point) & (BLOCK_SIZE-1);
  330. unsigned char *buf = state->public_buffer;
  331. int err_type;
  332. assert(len != 0);
  333. stream->rp = stream->wp = buf;
  334. lock(state);
  335. err_type = state->complete ? FZ_ERROR_GENERIC : FZ_ERROR_TRYLATER;
  336. /* If we got an error from the fetching thread,
  337. * throw it here (but just once). */
  338. if (state->curl_error)
  339. {
  340. CURLcode err = state->curl_error;
  341. char errstr[CURL_ERROR_SIZE];
  342. memcpy(errstr, state->error_buffer, CURL_ERROR_SIZE);
  343. memset(state->error_buffer, 0, CURL_ERROR_SIZE);
  344. state->curl_error = 0;
  345. unlock(state);
  346. fz_throw(ctx, FZ_ERROR_GENERIC, "cannot fetch data: %s: %s", curl_easy_strerror(err), errstr);
  347. }
  348. if ((size_t) read_point > state->content_length)
  349. {
  350. unlock(state);
  351. if (state->data_arrived == 0)
  352. fz_throw(ctx, err_type, "read of a block we don't have (A) (offset=%ld)", read_point);
  353. return EOF;
  354. }
  355. if (len > sizeof(state->public_buffer))
  356. len = sizeof(state->public_buffer);
  357. if (state->map == NULL)
  358. {
  359. /* We are doing a simple linear fetch as we don't know the
  360. * content length. */
  361. if (read_point + len > state->current_fill_start)
  362. {
  363. unlock(state);
  364. fz_throw(ctx, err_type, "read of a block we don't have (B) (offset=%ld)", read_point);
  365. }
  366. memcpy(buf, state->buffer + read_point, len);
  367. unlock(state);
  368. stream->wp = buf + len;
  369. stream->pos += len;
  370. if (len == 0)
  371. return EOF;
  372. return *stream->rp++;
  373. }
  374. /* We are reading from a "mapped" file */
  375. if (read_point + len > state->content_length)
  376. len = state->content_length - read_point;
  377. if (left_over > len)
  378. left_over = len;
  379. if (left_over > 0)
  380. {
  381. /* We are starting midway through a block */
  382. if (!HAVE_BLOCK(state->map, block))
  383. {
  384. state->next_fill_start = block<<BLOCK_SHIFT;
  385. unlock(state);
  386. fz_throw(ctx, err_type, "read of a block we don't have (C) (offset=%ld)", read_point);
  387. }
  388. block++;
  389. memcpy(buf, state->buffer + read_point, left_over);
  390. buf += left_over;
  391. read_point += left_over;
  392. len -= left_over;
  393. len_read += left_over;
  394. }
  395. /* Copy any complete blocks */
  396. while (len > BLOCK_SIZE)
  397. {
  398. if (!HAVE_BLOCK(state->map, block))
  399. {
  400. /* We don't have enough data to fulfill the request. */
  401. /* Fetch the next block from here. */
  402. unlock(state);
  403. state->next_fill_start = block<<BLOCK_SHIFT;
  404. stream->wp += len_read;
  405. stream->pos += len_read;
  406. /* If we haven't fetched anything, throw. */
  407. if (len_read == 0)
  408. fz_throw(ctx, err_type, "read of a block we don't have (D) (offset=%ld)", read_point);
  409. /* Otherwise, we got at least one byte, so we can safely return that. */
  410. return *stream->rp++;
  411. }
  412. block++;
  413. memcpy(buf, state->buffer + read_point, BLOCK_SIZE);
  414. buf += BLOCK_SIZE;
  415. read_point += BLOCK_SIZE;
  416. len -= BLOCK_SIZE;
  417. len_read += BLOCK_SIZE;
  418. }
  419. /* Copy any trailing bytes */
  420. if (len > 0)
  421. {
  422. if (!HAVE_BLOCK(state->map, block))
  423. {
  424. /* We don't have enough data to fulfill the request. */
  425. /* Fetch the next block from here. */
  426. unlock(state);
  427. state->next_fill_start = block<<BLOCK_SHIFT;
  428. stream->wp += len_read;
  429. stream->pos += len_read;
  430. /* If we haven't fetched anything, throw. */
  431. if (len_read == 0)
  432. fz_throw(ctx, err_type, "read of a block we don't have (E) (offset=%ld)", read_point);
  433. /* Otherwise, we got at least one byte, so we can safely return that. */
  434. return *stream->rp++;
  435. }
  436. memcpy(buf, state->buffer + read_point, len);
  437. len_read += len;
  438. }
  439. unlock(state);
  440. stream->wp += len_read;
  441. stream->pos += len_read;
  442. if (len_read == 0)
  443. return EOF;
  444. return *stream->rp++;
  445. }
  446. static void cs_close(fz_context *ctx, void *state_)
  447. {
  448. struct curlstate *state = state_;
  449. lock(state);
  450. state->kill_thread = 1;
  451. unlock(state);
  452. #ifdef _WIN32
  453. WaitForSingleObject(state->thread, INFINITE);
  454. CloseHandle(state->thread);
  455. CloseHandle(state->mutex);
  456. #else
  457. pthread_join(state->thread, NULL);
  458. pthread_mutex_destroy(&state->mutex);
  459. #endif
  460. curl_easy_cleanup(state->easy);
  461. fz_free(ctx, state->buffer);
  462. fz_free(ctx, state->map);
  463. fz_free(ctx, state);
  464. }
  465. static void cs_seek(fz_context *ctx, fz_stream *stm, int64_t offset, int whence)
  466. {
  467. struct curlstate *state = stm->state;
  468. stm->wp = stm->rp;
  469. if (whence == SEEK_END)
  470. {
  471. size_t clen;
  472. int data_arrived;
  473. int complete;
  474. lock(state);
  475. data_arrived = state->data_arrived;
  476. clen = state->content_length;
  477. complete = state->complete;
  478. unlock(state);
  479. if (!data_arrived && !complete)
  480. fz_throw(ctx, FZ_ERROR_TRYLATER, "still awaiting file length");
  481. stm->pos = clen + offset;
  482. }
  483. else if (whence == SEEK_CUR)
  484. stm->pos += offset;
  485. else
  486. stm->pos = offset;
  487. if (stm->pos < 0)
  488. stm->pos = 0;
  489. }
  490. static void
  491. fetcher_thread(curlstate *state)
  492. {
  493. /* Keep fetching chunks on a background thread until
  494. * either we have to kill the thread, or the fetch
  495. * is complete. */
  496. while (1) {
  497. int complete;
  498. lock(state);
  499. complete = state->complete || state->kill_thread;
  500. unlock(state);
  501. if (complete)
  502. break;
  503. fetch_chunk(state);
  504. if (state->more_data)
  505. state->more_data(state->more_data_arg, 0);
  506. }
  507. if (state->more_data)
  508. state->more_data(state->more_data_arg, 1);
  509. lock(state);
  510. state->complete = 1;
  511. unlock(state);
  512. }
  513. #ifdef _WIN32
  514. static DWORD WINAPI
  515. win_thread(void *lparam)
  516. {
  517. fetcher_thread((curlstate *)lparam);
  518. return 0;
  519. }
  520. #else
  521. static void *
  522. pthread_thread(void *arg)
  523. {
  524. fetcher_thread((curlstate *)arg);
  525. return NULL;
  526. }
  527. #endif
  528. fz_stream *fz_open_url(fz_context *ctx, const char *url, int kbps, void (*more_data)(void *,int), void *more_data_arg)
  529. {
  530. struct curlstate *state;
  531. fz_stream *stm;
  532. CURLcode code;
  533. state = fz_malloc_struct(ctx, struct curlstate);
  534. state->ctx = ctx;
  535. code = curl_global_init(CURL_GLOBAL_ALL);
  536. if (code != CURLE_OK)
  537. fz_throw(ctx, FZ_ERROR_GENERIC, "curl_global_init failed");
  538. state->easy = curl_easy_init();
  539. if (!state->easy)
  540. fz_throw(ctx, FZ_ERROR_GENERIC, "curl_easy_init failed");
  541. curl_easy_setopt(state->easy, CURLOPT_URL, url);
  542. curl_easy_setopt(state->easy, CURLOPT_FOLLOWLOCATION, 1);
  543. curl_easy_setopt(state->easy, CURLOPT_MAXREDIRS, 12);
  544. curl_easy_setopt(state->easy, CURLOPT_SSL_VERIFYPEER, 0);
  545. curl_easy_setopt(state->easy, CURLOPT_SSL_VERIFYHOST, 0);
  546. curl_easy_setopt(state->easy, CURLOPT_MAX_RECV_SPEED_LARGE, kbps * 1024);
  547. curl_easy_setopt(state->easy, CURLOPT_HEADERFUNCTION, on_curl_header);
  548. curl_easy_setopt(state->easy, CURLOPT_WRITEHEADER, state);
  549. curl_easy_setopt(state->easy, CURLOPT_WRITEFUNCTION, on_curl_data);
  550. curl_easy_setopt(state->easy, CURLOPT_WRITEDATA, state);
  551. curl_easy_setopt(state->easy, CURLOPT_FAILONERROR, 1L);
  552. curl_easy_setopt(state->easy, CURLOPT_ERRORBUFFER, &state->error_buffer);
  553. #ifdef DEBUG_BLOCK_FETCHING
  554. curl_easy_setopt(state->easy, CURLOPT_VERBOSE, 1L);
  555. #endif
  556. /* Get only the HEAD first. */
  557. state->head = 1;
  558. curl_easy_setopt(state->easy, CURLOPT_NOBODY, 1);
  559. #ifdef _WIN32
  560. state->mutex = CreateMutex(NULL, FALSE, NULL);
  561. if (state->mutex == NULL)
  562. fz_throw(ctx, FZ_ERROR_GENERIC, "mutex creation failed");
  563. state->thread = CreateThread(NULL, 0, win_thread, state, 0, &state->thread_id);
  564. if (state->thread == NULL)
  565. fz_throw(ctx, FZ_ERROR_GENERIC, "thread creation failed");
  566. #else
  567. if (pthread_mutex_init(&state->mutex, NULL))
  568. fz_throw(ctx, FZ_ERROR_GENERIC, "mutex creation failed");
  569. if (pthread_create(&state->thread, NULL, pthread_thread, state))
  570. fz_throw(ctx, FZ_ERROR_GENERIC, "thread creation failed");
  571. #endif
  572. state->more_data = more_data;
  573. state->more_data_arg = more_data_arg;
  574. stm = fz_new_stream(ctx, state, cs_next, cs_close);
  575. stm->progressive = 1;
  576. stm->seek = cs_seek;
  577. return stm;
  578. }