Index: buckets/aggregate_buckets.c =================================================================== --- buckets/aggregate_buckets.c (revision 1383) +++ buckets/aggregate_buckets.c (working copy) @@ -32,6 +32,7 @@ typedef struct { int snapshot; serf_bucket_aggregate_eof_t hold_open; void *hold_open_baton; + int bucket_owner; } aggregate_context_t; @@ -51,8 +52,10 @@ static void cleanup_aggregate(aggregate_context_t while (ctx->done != NULL) { next_list = ctx->done->next; - serf_bucket_destroy(ctx->done->bucket); - serf_bucket_mem_free(allocator, ctx->done); + if (ctx->bucket_owner) { + serf_bucket_destroy(ctx->done->bucket); + } + serf_bucket_mem_free(allocator, ctx->done); ctx->done = next_list; } @@ -78,6 +81,7 @@ static aggregate_context_t *create_aggregate(serf_ ctx->snapshot = 0; ctx->hold_open = NULL; ctx->hold_open_baton = NULL; + ctx->bucket_owner = 1; return ctx; } @@ -92,13 +96,31 @@ serf_bucket_t *serf_bucket_aggregate_create( return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx); } +serf_bucket_t *serf_bucket_stream_create( + serf_bucket_alloc_t *allocator, + serf_bucket_aggregate_eof_t fn, + void *baton) +{ + serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator); + aggregate_context_t *ctx = bucket->data; + + serf_bucket_aggregate_hold_open(bucket, fn, baton); + + ctx->bucket_owner = 0; + + return bucket; +} + + static void serf_aggregate_destroy_and_data(serf_bucket_t *bucket) { aggregate_context_t *ctx = bucket->data; bucket_list_t *next_ctx; while (ctx->list) { - serf_bucket_destroy(ctx->list->bucket); + if (ctx->bucket_owner) { + serf_bucket_destroy(ctx->list->bucket); + } next_ctx = ctx->list->next; serf_bucket_mem_free(bucket->allocator, ctx->list); ctx->list = next_ctx; Index: serf_bucket_types.h =================================================================== --- serf_bucket_types.h (revision 1383) +++ serf_bucket_types.h (working copy) @@ -173,6 +173,10 @@ apr_status_t serf_bucket_bwtp_incoming_frame_wait_ extern const serf_bucket_type_t serf_bucket_type_aggregate; #define SERF_BUCKET_IS_AGGREGATE(b) SERF_BUCKET_CHECK((b), aggregate) +typedef apr_status_t (*serf_bucket_aggregate_eof_t)( + void *baton, + serf_bucket_t *aggregate_bucket); + /** serf_bucket_aggregate_cleanup will instantly destroy all buckets in the aggregate bucket that have been read completely. Whereas normally, these buckets are destroyed on every read operation. */ @@ -183,6 +187,11 @@ void serf_bucket_aggregate_cleanup( serf_bucket_t *serf_bucket_aggregate_create( serf_bucket_alloc_t *allocator); +serf_bucket_t *serf_bucket_stream_create( + serf_bucket_alloc_t *allocator, + serf_bucket_aggregate_eof_t fn, + void *baton); + /** Transform @a bucket in-place into an aggregate bucket. */ void serf_bucket_aggregate_become( serf_bucket_t *bucket); @@ -194,10 +203,6 @@ void serf_bucket_aggregate_prepend( void serf_bucket_aggregate_append( serf_bucket_t *aggregate_bucket, serf_bucket_t *append_bucket); - -typedef apr_status_t (*serf_bucket_aggregate_eof_t)( - void *baton, - serf_bucket_t *aggregate_bucket); void serf_bucket_aggregate_hold_open( serf_bucket_t *aggregate_bucket, Index: outgoing.c =================================================================== --- outgoing.c (revision 1383) +++ outgoing.c (working copy) @@ -295,6 +295,33 @@ static void link_requests(serf_request_t **list, s } } +static apr_status_t destroy_request(serf_request_t *request) +{ + serf_connection_t *conn = request->conn; + + /* The bucket is no longer needed, nor is the request's pool. + Note that before we can cleanup the request's pool, we have to + remove the hold of ostream_tail on the request_bucket, to avoid + a double free when ostream_tail is cleaned up later. + */ + if (request->resp_bkt) { + serf_debug__closed_conn(request->resp_bkt->allocator); + serf_bucket_destroy(request->resp_bkt); + } + if (request->req_bkt) { + serf_debug__closed_conn(request->req_bkt->allocator); + serf_bucket_destroy(request->req_bkt); + } + + serf_debug__bucket_alloc_check(request->allocator); + if (request->respool) { + apr_pool_destroy(request->respool); + } + serf_bucket_mem_free(conn->allocator, request); + + return APR_SUCCESS; +} + static apr_status_t cancel_request(serf_request_t *request, serf_request_t **list, int notify_request) @@ -322,22 +349,7 @@ static apr_status_t cancel_request(serf_request_t } } - if (request->resp_bkt) { - serf_debug__closed_conn(request->resp_bkt->allocator); - serf_bucket_destroy(request->resp_bkt); - } - if (request->req_bkt) { - serf_debug__closed_conn(request->req_bkt->allocator); - serf_bucket_destroy(request->req_bkt); - } - - if (request->respool) { - apr_pool_destroy(request->respool); - } - - serf_bucket_mem_free(request->conn->allocator, request); - - return APR_SUCCESS; + return destroy_request(request); } static apr_status_t remove_connection(serf_context_t *ctx, @@ -503,15 +515,13 @@ static apr_status_t do_conn_setup(serf_connection_ } if (conn->ostream_tail == NULL) { - conn->ostream_tail = serf_bucket_aggregate_create(conn->allocator); + conn->ostream_tail = serf_bucket_stream_create(conn->allocator, + detect_eof, + conn); } ostream = conn->ostream_tail; - serf_bucket_aggregate_hold_open(conn->ostream_tail, - detect_eof, - conn); - status = (*conn->setup)(conn->skt, &conn->stream, &ostream, @@ -530,30 +540,6 @@ static apr_status_t do_conn_setup(serf_connection_ return status; } -static apr_status_t destroy_request(serf_request_t *request) -{ - serf_connection_t *conn = request->conn; - - /* The bucket is no longer needed, nor is the request's pool. - Note that before we can cleanup the request's pool, we have to - ensure that the ostream_tail aggregate bucket destroys the - use request bucket (which it owns). - */ - if (request->resp_bkt) { - serf_bucket_destroy(request->resp_bkt); - } - serf_bucket_aggregate_cleanup(conn->ostream_tail, conn->allocator); - if (request->req_bkt) { - serf_bucket_destroy(request->req_bkt); - } - - serf_debug__bucket_alloc_check(request->allocator); - apr_pool_destroy(request->respool); - serf_bucket_mem_free(conn->allocator, request); - - return APR_SUCCESS; -} - /* write data out to the connection */ static apr_status_t write_to_connection(serf_connection_t *conn) {