Index: subversion/libsvn_ra_serf/util.c =================================================================== --- subversion/libsvn_ra_serf/util.c (revision 1081269) +++ subversion/libsvn_ra_serf/util.c (working copy) @@ -1151,58 +1151,20 @@ cdata_xml(void *userData, const char *data, int le parser->error = parser->cdata(parser, parser->user_data, data, len); } -/* Implements svn_ra_serf__response_handler_t */ +typedef svn_error_t * +(*parser_impl_t)(svn_ra_serf__xml_parser_t *ctx, + serf_bucket_t *response); + +/* Implements parser_impl_t. */ svn_error_t * -svn_ra_serf__handle_xml_parser(serf_request_t *request, - serf_bucket_t *response, - void *baton, - apr_pool_t *pool) +svn_ra_serf__parser_impl(svn_ra_serf__xml_parser_t *ctx, + serf_bucket_t *response) { + apr_status_t status; + int xml_status; const char *data; apr_size_t len; - serf_status_line sl; - apr_status_t status; - int xml_status; - svn_ra_serf__xml_parser_t *ctx = baton; - svn_error_t *err; - serf_bucket_response_status(response, &sl); - - if (ctx->status_code) - { - *ctx->status_code = sl.code; - } - - if (sl.code == 301 || sl.code == 302 || sl.code == 307) - { - ctx->location = svn_ra_serf__response_get_location(response, pool); - } - - /* Woo-hoo. Nothing here to see. */ - if (sl.code == 404 && ctx->ignore_errors == FALSE) - { - /* If our caller won't know about the 404, abort() for now. */ - SVN_ERR_ASSERT(ctx->status_code); - - if (*ctx->done == FALSE) - { - *ctx->done = TRUE; - if (ctx->done_list) - { - ctx->done_item->data = ctx->user_data; - ctx->done_item->next = *ctx->done_list; - *ctx->done_list = ctx->done_item; - } - } - - err = svn_ra_serf__handle_server_error(request, response, pool); - - SVN_ERR(svn_error_compose_create( - svn_ra_serf__handle_discard_body(request, response, NULL, pool), - err)); - return SVN_NO_ERROR; - } - if (!ctx->xmlp) { ctx->xmlp = XML_ParserCreate(NULL); @@ -1214,7 +1176,6 @@ svn_error_t * } } - while (1) { status = serf_bucket_read(response, 8000, &data, &len); @@ -1240,9 +1201,10 @@ svn_error_t * *ctx->done_list = ctx->done_item; } } + /* TODO */ SVN_ERR(svn_error_createf(SVN_ERR_RA_DAV_MALFORMED_DATA, NULL, _("XML parsing failed: (%d %s)"), - sl.code, sl.reason)); + 999, "TODO")); } if (ctx->error && ctx->ignore_errors == FALSE) @@ -1270,14 +1232,157 @@ svn_error_t * } return svn_error_wrap_apr(status, NULL); } + } + return SVN_NO_ERROR; +} - /* feed me! */ +static apr_status_t +resp_cache_eof(void *baton, + serf_bucket_t *aggregate_bucket) +{ + svn_ra_serf__xml_parser_t *ctx = baton; + + if (ctx->done_reading) + return APR_EOF; + else + return APR_EAGAIN; +} + +/* Implements parser_impl_t. */ +static svn_error_t * +svn_ra_serf__store_data_impl(svn_ra_serf__xml_parser_t *ctx, + serf_bucket_t *response) +{ + if (!ctx->response_stream) + { + ctx->bkt_alloc = serf_bucket_allocator_create(ctx->pool, NULL, NULL); + ctx->response_stream = serf_bucket_aggregate_create(ctx->bkt_alloc); + serf_bucket_aggregate_hold_open(ctx->response_stream, + resp_cache_eof, + ctx); } - /* not reached */ + + while (1) + { + static int debug_ctr = 0; + const char *data; + apr_size_t len; + serf_bucket_t *tmp; + apr_status_t status; + + status = serf_bucket_read(response, 8000, &data, &len); + + if (SERF_BUCKET_READ_ERROR(status)) + { + return svn_error_wrap_apr(status, NULL); + } + + debug_ctr += len; + printf("Storing %d bytes.\n", debug_ctr); + + /* Store a copy of this data. */ + if (len) + { + tmp = serf_bucket_simple_copy_create(data, len, + ctx->bkt_alloc); + + serf_bucket_aggregate_append(ctx->response_stream, tmp); + } + + if (APR_STATUS_IS_EAGAIN(status)) + { + return svn_error_wrap_apr(status, NULL); + } + + /* Are we done reading? */ + if (APR_STATUS_IS_EOF(status)) + { + ctx->done_reading = TRUE; + return svn_error_wrap_apr(status, NULL); + } + } + + /* Can't reach. */ } /* Implements svn_ra_serf__response_handler_t */ +static svn_error_t * +parser_impl_wrapper(serf_request_t *request, + serf_bucket_t *response, + void *baton, + parser_impl_t parser_impl, + apr_pool_t *pool) +{ + svn_ra_serf__xml_parser_t *ctx = baton; + svn_error_t *err; + serf_status_line sl; + + serf_bucket_response_status(response, &sl); + + if (ctx->status_code) + { + *ctx->status_code = sl.code; + } + + if (sl.code == 301 || sl.code == 302 || sl.code == 307) + { + ctx->location = svn_ra_serf__response_get_location(response, pool); + } + + /* Woo-hoo. Nothing here to see. */ + if (sl.code == 404 && ctx->ignore_errors == FALSE) + { + /* If our caller won't know about the 404, abort() for now. */ + SVN_ERR_ASSERT(ctx->status_code); + + if (*ctx->done == FALSE) + { + *ctx->done = TRUE; + if (ctx->done_list) + { + ctx->done_item->data = ctx->user_data; + ctx->done_item->next = *ctx->done_list; + *ctx->done_list = ctx->done_item; + } + } + + err = svn_ra_serf__handle_server_error(request, response, pool); + + SVN_ERR(svn_error_compose_create( + svn_ra_serf__handle_discard_body(request, response, NULL, pool), + err)); + return SVN_NO_ERROR; + } + + return parser_impl(ctx, response); +} + +/* Implements svn_ra_serf__response_handler_t */ svn_error_t * +svn_ra_serf__handle_xml_parser(serf_request_t *request, + serf_bucket_t *response, + void *baton, + apr_pool_t *pool) +{ + return parser_impl_wrapper(request, response, baton, + svn_ra_serf__parser_impl, + pool); +} + +/* Implements svn_ra_serf__response_handler_t */ +svn_error_t * +svn_ra_serf__store_data(serf_request_t *request, + serf_bucket_t *response, + void *baton, + apr_pool_t *pool) +{ + return parser_impl_wrapper(request, response, baton, + svn_ra_serf__store_data_impl, + pool); +} + +/* Implements svn_ra_serf__response_handler_t */ +svn_error_t * svn_ra_serf__handle_server_error(serf_request_t *request, serf_bucket_t *response, apr_pool_t *pool) Index: subversion/libsvn_ra_serf/update.c =================================================================== --- subversion/libsvn_ra_serf/update.c (revision 1081269) +++ subversion/libsvn_ra_serf/update.c (working copy) @@ -2175,7 +2175,17 @@ link_path(void *report_baton, /** Minimum nr. of outstanding requests needed before a new connection is * opened. */ #define REQS_PER_CONN 8 +/** Maximum nr. of outstanding requests. This limits the memory usage + for requests that are waiting to be sent on the connections. + Since this number limits the reading of more data of the REPORT response, + and xml_parser reads this data in blocks of 8000 bytes, the number of + actual outstanding requests will be higher. + + Setting this to 500 with 4 connections means 125 outstanding requests per + connection. This seems reasonable, but requires more real-world testing. */ +#define MAX_OUTSTANDING_REQS 500 + /** This function creates a new connection for this serf session, but only * if the number of ACTIVE_REQS > REQS_PER_CONN or if there currently is * only one main connection open. @@ -2297,7 +2307,7 @@ finish_report(void *report_baton, do anything with it. The error in parser_ctx->error is sufficient. */ parser_ctx->status_code = &status_code; - handler->response_handler = svn_ra_serf__handle_xml_parser; + handler->response_handler = svn_ra_serf__store_data; handler->response_baton = parser_ctx; svn_ra_serf__request_create(handler); @@ -2440,6 +2450,20 @@ finish_report(void *report_baton, } report->done_fetches = NULL; + /* Parse the REPORT response. */ + if (parser_ctx->response_stream && + report->active_fetches + report->active_propfinds < MAX_OUTSTANDING_REQS) + { + err = svn_ra_serf__parser_impl(parser_ctx, + parser_ctx->response_stream); + if (err && SERF_BUCKET_READ_ERROR(err->apr_err)) + { + return err; + } + serf_bucket_aggregate_cleanup(parser_ctx->response_stream, + parser_ctx->bkt_alloc); + } + /* Debugging purposes only! */ serf_debug__closed_conn(sess->bkt_alloc); for (i = 0; i < sess->num_conns; i++) @@ -2459,6 +2483,7 @@ finish_report(void *report_baton, /* FIXME subpool */ return report->update_editor->close_edit(report->update_baton, sess->pool); } +#undef MAX_OUTSTANDING_REQS #undef MAX_NR_OF_CONNS #undef EXP_REQS_PER_CONN Index: subversion/libsvn_ra_serf/ra_serf.h =================================================================== --- subversion/libsvn_ra_serf/ra_serf.h (revision 1081269) +++ subversion/libsvn_ra_serf/ra_serf.h (working copy) @@ -505,6 +505,8 @@ typedef struct svn_ra_serf__xml_state_t { /* Forward declaration of the XML parser structure. */ typedef struct svn_ra_serf__xml_parser_t svn_ra_serf__xml_parser_t; +typedef struct svn_ra_serf__store_response_t svn_ra_serf__store_response_t; + /* Callback invoked with @a baton by our XML @a parser when an element with * the @a name containing @a attrs is opened. */ @@ -543,6 +545,9 @@ struct svn_ra_serf__xml_parser_t { /* Temporary allocations should be made in this pool. */ apr_pool_t *pool; + /* Allocator created from the pool. */ + serf_bucket_alloc_t *bkt_alloc; + /* Caller-specific data passed to the start, end, cdata callbacks. */ void *user_data; @@ -574,6 +579,12 @@ struct svn_ra_serf__xml_parser_t { */ const char *location; + /* Used by the store_handler, to temporarely store the response data. */ + serf_bucket_t *response_stream; + + /* Is the REPORT response data read completely? */ + svn_boolean_t done_reading; + /* If non-NULL, this value will be set to TRUE when the response is * completed. */ @@ -722,6 +733,17 @@ svn_ra_serf__handle_xml_parser(serf_request_t *req void *handler_baton, apr_pool_t *pool); +/* TODO COMMENT */ +svn_error_t * +svn_ra_serf__parser_impl(svn_ra_serf__xml_parser_t *ctx, + serf_bucket_t *response); + +svn_error_t * +svn_ra_serf__store_data(serf_request_t *request, + serf_bucket_t *response, + void *baton, + apr_pool_t *pool); + /* serf_response_handler_t implementation that completely discards * the response. *