Skip to content

Commit 5ec118a

Browse files
committed
[SERF-211] Finalize the asynchronouse resolver API. This adds a private
interface for waking the context from a poll, so that the resolver can signal when results are available. * serf.h (serf_context_create_ex): Make the docstring a docstring. (serf_address_resolved_t, serf_address_resolve_async, serf_connection_created_t, serf_connection_create_async): Remove the experimental comments and fix some typos in the docstrings. * serf_private.h (SERF_IO_WAKEUP_PIPE): New constant for the io baton type. (serf_context_t::wakeup): New member. (serf__context_wakeup): New prototype. * src/context.c: Include <apr_atomic.h> (WAKEUP_LOOPBACK, WAKEUP_FAMILY): New constants. (serf__context_wakeup_t): New struct for the self-pinging wakeup socket. (init_wakeup, process_wakeup): New private helper functions. (serf__context_wakeup): Implement here. (serf_context_create_ex): Initialize the wakeup socket. (serf_event_trigger): Process the wakeup signal. * src/resolve.c: Remove the experimental/todo top-level comment. (resolve): Tweak log message. (push_resolve_result): Wake the context when a new result is available. (serf__process_async_resolve_results): Return immediately if the async resolver was not properly initialized. Add debug logging. git-svn-id: https://svn.apache.org/repos/asf/serf/trunk@1931128 13f79535-47bb-0310-9956-ffa450edef68
1 parent c667c64 commit 5ec118a

4 files changed

Lines changed: 190 additions & 24 deletions

File tree

serf.h

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,8 @@ typedef apr_status_t (*serf_socket_remove_t)(
331331
apr_pollfd_t *pfd,
332332
void *serf_baton);
333333

334-
/* Create a new context for serf operations.
334+
/**
335+
* Create a new context for serf operations.
335336
*
336337
* Use this function to make serf not use its internal control loop, but
337338
* instead rely on an external event loop. Serf will use the @a addf and @a rmf
@@ -650,7 +651,6 @@ apr_status_t serf_connection_create2(
650651
*
651652
* @since New in 1.5.
652653
*/
653-
/* FIXME: EXPERIMENTAL */
654654
typedef void (*serf_address_resolved_t)(
655655
serf_context_t *ctx,
656656
void *resolved_baton,
@@ -668,18 +668,17 @@ typedef void (*serf_address_resolved_t)(
668668
* address resolution, use serf_connection_create_async(), which does take
669669
* proxy configuration into account.
670670
*
671-
* The @a resolve callback will be called during a subsequent call to
671+
* The @a resolved callback will be called during a subsequent call to
672672
* serf_context_run() or serf_context_prerun() and will receive the same
673673
* @a ctx and @a resolved_baton that are provided here.
674674
*
675675
* The lifetime of all function arguments except @a pool must extend until
676-
* either @a resolve is called or an error is reported.
676+
* either @a resolved is called or an error is reported.
677677
*
678678
* All temporary allocations should be made in @a pool.
679679
*
680680
* @since New in 1.5.
681681
*/
682-
/* FIXME: EXPERIMENTAL */
683682
apr_status_t serf_address_resolve_async(
684683
serf_context_t *ctx,
685684
apr_uri_t host_info,
@@ -704,7 +703,6 @@ apr_status_t serf_address_resolve_async(
704703
*
705704
* @since New in 1.5.
706705
*/
707-
/* FIXME: EXPERIMENTAL */
708706
typedef void (*serf_connection_created_t)(
709707
serf_context_t *ctx,
710708
void *created_baton,
@@ -728,7 +726,6 @@ typedef void (*serf_connection_created_t)(
728726
*
729727
* @since New in 1.5.
730728
*/
731-
/* FIXME: EXPERIMENTAL */
732729
apr_status_t serf_connection_create_async(
733730
serf_context_t *ctx,
734731
apr_uri_t host_info,

serf_private.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ typedef int serf__bool_t; /* Not _Bool */
8989
#define SERF_IO_CLIENT (1)
9090
#define SERF_IO_CONN (2)
9191
#define SERF_IO_LISTENER (3)
92+
#define SERF_IO_WAKEUP_PIPE (4)
9293

9394
/*** Narrowing conversions ***/
9495

@@ -544,6 +545,9 @@ struct serf_context_t {
544545

545546
serf_config_t *config;
546547

548+
/* The wakeup socket */
549+
struct serf__context_wakeup_t *wakeup;
550+
547551
/* Support for asynchronous address resolution. */
548552
void *volatile resolve_head;
549553
apr_status_t resolve_init_status;
@@ -741,6 +745,9 @@ struct serf_connection_t {
741745
up buckets that may still reference buckets of this request */
742746
void serf__connection_pre_cleanup(serf_connection_t *);
743747

748+
/* Called when an asynchronous event should wake up the context's pollset. */
749+
void serf__context_wakeup(serf_context_t *ctx);
750+
744751
/* Called from serf_context_create_ex() to set up the context-specific
745752
asynchronous address resolver context. */
746753
apr_status_t serf__create_resolve_context(serf_context_t *ctx);

src/context.c

Lines changed: 156 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* ====================================================================
1919
*/
2020

21+
#include <apr_atomic.h>
2122
#include <apr_pools.h>
2223
#include <apr_poll.h>
2324
#include <apr_version.h>
@@ -27,6 +28,145 @@
2728

2829
#include "serf_private.h"
2930

31+
32+
/* APR has wakeable pollsets, but we can't use them: the main reason is that
33+
the context may not own the pollset, or indeed there may not even be a
34+
pollset but a different event source.
35+
36+
Instead, we create a UDP socket bound to the loopback address and add it
37+
to the context. The socket pings itself to wake up. */
38+
39+
#if APR_HAVE_IPV6
40+
/* Bind to the IPv6 loopback address, if supported. */
41+
#define WAKEUP_LOOPBACK "::1"
42+
#define WAKEUP_FAMILY APR_INET6
43+
#else
44+
#define WAKEUP_LOOPBACK "127.0.0.1"
45+
#define WAKEUP_FAMILY APR_INET
46+
#endif
47+
48+
/* The socket that we'll use to wake the context's pollset. */
49+
struct serf__context_wakeup_t
50+
{
51+
apr_sockaddr_t *addr;
52+
apr_socket_t *skt;
53+
apr_pollfd_t pfd;
54+
serf_io_baton_t io_baton;
55+
volatile apr_uint32_t pending;
56+
};
57+
58+
static void init_wakeup(serf_context_t *ctx)
59+
{
60+
struct serf__context_wakeup_t *wakeup;
61+
apr_status_t status;
62+
63+
ctx->wakeup = NULL;
64+
wakeup = apr_pcalloc(ctx->pool, sizeof(*wakeup));
65+
66+
/* Get the loopback address. This shouldn't block. */
67+
status = apr_sockaddr_info_get(&wakeup->addr,
68+
WAKEUP_LOOPBACK, WAKEUP_FAMILY,
69+
0, 0, ctx->pool);
70+
if (status) {
71+
serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config,
72+
"context 0x%p init wakeup: <%d> resolve %s\n",
73+
ctx, status, WAKEUP_LOOPBACK);
74+
}
75+
76+
/* Create and bind the socket to a random port. */
77+
if (!status) {
78+
status = apr_socket_create(&wakeup->skt, WAKEUP_FAMILY,
79+
SOCK_DGRAM, APR_PROTO_UDP, ctx->pool);
80+
if (!status)
81+
status = apr_socket_bind(wakeup->skt, wakeup->addr);
82+
if (!status)
83+
status = apr_socket_addr_get(&wakeup->addr, APR_LOCAL, wakeup->skt);
84+
85+
if (status) {
86+
serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config,
87+
"context 0x%p init wakeup: <%d> socket\n",
88+
ctx, status);
89+
}
90+
91+
if (!status) {
92+
wakeup->pfd.desc_type = APR_POLL_SOCKET;
93+
wakeup->pfd.desc.s = wakeup->skt;
94+
wakeup->pfd.reqevents = APR_POLLIN;
95+
wakeup->io_baton.type = SERF_IO_WAKEUP_PIPE;
96+
wakeup->io_baton.ctx = ctx;
97+
wakeup->io_baton.reqevents = wakeup->pfd.reqevents;
98+
status = ctx->pollset_add(ctx->pollset_baton, &wakeup->pfd,
99+
&wakeup->io_baton);
100+
}
101+
102+
if (status) {
103+
serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config,
104+
"context 0x%p init wakeup: <%d> pollset add\n",
105+
ctx, status);
106+
}
107+
}
108+
109+
if (!status) {
110+
wakeup->pending = 0;
111+
ctx->wakeup = wakeup;
112+
serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, ctx->config,
113+
"context 0x%p init wakeup done\n", ctx);
114+
}
115+
}
116+
117+
static apr_status_t process_wakeup(serf_context_t *ctx)
118+
{
119+
struct serf__context_wakeup_t *const wakeup = ctx->wakeup;
120+
apr_status_t status;
121+
apr_sockaddr_t from;
122+
apr_size_t length = 3;
123+
char buffer[3];
124+
125+
if (!wakeup)
126+
return APR_SUCCESS;
127+
128+
status = apr_socket_recvfrom(&from, wakeup->skt, 0, buffer, &length);
129+
if (status) {
130+
serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config,
131+
"context 0x%p received wakeup: <%d> [%" APR_SIZE_T_FMT "]\n",
132+
ctx, status, length);
133+
return status;
134+
}
135+
/* TODO: Should we check if the socket actually did ping itself? */
136+
137+
serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, ctx->config,
138+
"context 0x%p received wakeup: [%" APR_SIZE_T_FMT "]\n",
139+
ctx, length);
140+
apr_atomic_set32(&wakeup->pending, 0);
141+
return APR_SUCCESS;
142+
}
143+
144+
void serf__context_wakeup(serf_context_t *ctx)
145+
{
146+
struct serf__context_wakeup_t *const wakeup = ctx->wakeup;
147+
148+
if (!wakeup)
149+
return;
150+
151+
/* Don't signal a new wakeup before the previous one has been processed. */
152+
if (apr_atomic_cas32(&wakeup->pending, 1, 0) == 0) {
153+
apr_size_t length = 1;
154+
apr_status_t status = apr_socket_sendto(wakeup->skt, wakeup->addr, 0,
155+
"\b", &length);
156+
if (status) {
157+
serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config,
158+
"context 0x%p wakeup: <%d> [%" APR_SIZE_T_FMT "]\n",
159+
ctx, status, length);
160+
}
161+
else {
162+
serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, ctx->config,
163+
"context 0x%p wakeup: [%" APR_SIZE_T_FMT "]\n",
164+
ctx, length);
165+
}
166+
}
167+
}
168+
169+
30170
/**
31171
* Callback function (implements serf_progress_t). Takes a number of bytes
32172
* read @a read and bytes written @a written, adds those to the total for this
@@ -114,7 +254,6 @@ static apr_status_t pollset_rm(void *user_baton,
114254
return apr_pollset_remove(s->pollset, pfd);
115255
}
116256

117-
118257
void serf_config_proxy(serf_context_t *ctx,
119258
apr_sockaddr_t *address)
120259
{
@@ -194,20 +333,26 @@ serf_context_t *serf_context_create_ex(
194333
ctx->authn_types = SERF_AUTHN_ALL;
195334
ctx->server_authn_info = apr_hash_make(pool);
196335

336+
/* Assume returned status is APR_SUCCESS */
337+
serf__config_store_init(ctx);
338+
339+
serf__config_store_create_ctx_config(ctx, &ctx->config);
340+
341+
serf__log_init(ctx);
342+
197343
/* Initialize async resolver result queue. */
198344
ctx->resolve_head = NULL;
199345
ctx->resolve_init_status = APR_SUCCESS;
200346
ctx->resolve_init_status = serf__create_resolve_context(ctx);
201347
if (ctx->resolve_init_status != APR_SUCCESS) {
202348
ctx->resolve_context = NULL;
203349
}
204-
205-
/* Assume returned status is APR_SUCCESS */
206-
serf__config_store_init(ctx);
207-
208-
serf__config_store_create_ctx_config(ctx, &ctx->config);
209-
210-
serf__log_init(ctx);
350+
else {
351+
/* Initialize the context's wakeup event. */
352+
/* FIXME: For now, we only use this from the asynchronouse resolver.
353+
We could expose awakable contexts in the public API. */
354+
init_wakeup(ctx);
355+
}
211356

212357
return ctx;
213358
}
@@ -282,6 +427,9 @@ apr_status_t serf_event_trigger(
282427
return status;
283428
}
284429
}
430+
else if (io->type == SERF_IO_WAKEUP_PIPE) {
431+
status = process_wakeup(io->ctx);
432+
}
285433
return status;
286434
}
287435

src/resolve.c

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,6 @@
6767

6868
#define HAVE_ASYNC_RESOLVER (SERF_HAVE_ASYNC_RESOLVER || APR_HAS_THREADS)
6969

70-
/*
71-
* FIXME: EXPERIMENTAL
72-
* TODO:
73-
* - Wake the poll/select in serf_context_run() when new resolve
74-
* results are available.
75-
*/
76-
7770

7871
#if HAVE_ASYNC_RESOLVER
7972

@@ -770,7 +763,7 @@ static void *APR_THREAD_FUNC resolve(apr_thread_t *thread, void *baton)
770763
if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), addr)) {
771764
serf__log(LOGLVL_DEBUG, LOGCOMP_CONN,
772765
__FILE__, task->ctx->config,
773-
"apr async resolve: %s: %s\n", addr->hostname, buf);
766+
"thread pool resolve: %s: %s\n", addr->hostname, buf);
774767
}
775768
addr = addr->next;
776769
}
@@ -869,6 +862,8 @@ static void push_resolve_result(serf_context_t *ctx,
869862
result->next = head;
870863
head = apr_atomic_casptr(&ctx->resolve_head, result, head);
871864
} while(head != result->next);
865+
866+
serf__context_wakeup(ctx);
872867
}
873868

874869

@@ -884,12 +879,24 @@ apr_status_t serf__process_async_resolve_results(serf_context_t *ctx)
884879
{
885880
resolve_result_t *result;
886881
apr_status_t status;
882+
unsigned counter;
883+
884+
if (ctx->resolve_init_status != APR_SUCCESS) {
885+
/* The async resolver initialization failed, so just return. */
886+
serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, ctx->config,
887+
"context 0x%p async resolver is not initialized\n", ctx);
888+
return APR_SUCCESS;
889+
}
887890

888891
status = run_async_resolver_loop(ctx);
889-
if (status)
892+
if (status) {
893+
serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config,
894+
"context 0x%p async resolve: <%d>\n", ctx, status);
890895
return status;
896+
}
891897

892898
/* Grab the whole stack, leaving it empty, and process the contents. */
899+
counter = 0;
893900
result = apr_atomic_xchgptr(&ctx->resolve_head, NULL);
894901
while (result)
895902
{
@@ -899,6 +906,13 @@ apr_status_t serf__process_async_resolve_results(serf_context_t *ctx)
899906
result->result_pool);
900907
apr_pool_destroy(result->result_pool);
901908
result = next;
909+
++counter;
910+
}
911+
912+
if (counter > 0) {
913+
serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, ctx->config,
914+
"context 0x%p async resolve: %d event%s\n",
915+
ctx, counter, counter == 1 ? "" : "s");
902916
}
903917
return APR_SUCCESS;
904918
}

0 commit comments

Comments
 (0)