Braindump re: implementing svn_ra_replay_range for ra_svn:
We can get a similar speedup with ra_svn. There are two ways to do it:
* Explicit server support.
Either add a new replay-range command or an optional second revision
argument to replay. In either case we would fall back to a bunch of
replay commands for servers that don't support it; we could use a
capability to detect it or just note that (for a new command) the
command wasn't implemented. Upside: guaranteed to work. Downside: no
performance gain possible against 1.4.x servers.
* Rely on details of current implementation.
In practice, you really can just write a bunch of replay commands to
the server and then process all the responses. However, technically
the server may issue an auth request before handling each command;
this is currently hardcoded to be a trivial auth request which
requires no response. For reasons related to Issue #2712, we might
actually want to sometimes put in a real auth request. (If anonymous
users can read the repo, and you try to replay a revision where some
of the paths are not readable by anonymous users, then replay will
leave those paths out, even if the client could have given a username
that can read it if the server had asked for it.) So this would
prevent us from fully fixing Issue #2712.
I would lean towards adding a new command. If people agree I'm happy
to do the coding.
I'll point out that since the fallback here is just equivalent to "run
svn_ra_replay" a bunch of times, and that's what ra_local and ra_neon
need to do also, it might be a good idea to stick a project-private
function in the ra-loader which simulates svn_ra_replay_range in terms
of svn_ra_replay.
--dave
On 10/30/07, Lieven Govaerts <lgo@mobsol.be> wrote:
> Attached patch is work-in-progress to - hopefully drastically - improve
> the performance of 'svnsync sync' over ra_serf.
>
> Problem with 'svnsync sync' right now is that it's a serial process,
> with lots of waiting on both the master and slave server. Basically it
> comes down to this:
> -> set 'currently-copying' revprop on rev 0 of the slave repository
> .. wait on slave server response ..
> -> send replay report request for rev. N to master
> .. wait on master server response ..
> -> parse replay report and drive editor
> -> commit rev. N on the slave repository
> .. wait on slave server response ..
> -> read all revprops from the master
> .. wait on master server response ..
> -> add all revprops to the slave repository
> .. wait on slave server response ..
> -> reset 'currently-copying' revprop
> .. wait on slave server response ..
> -> ... start all over for rev. N+1.
>
> While there's little option to reduce these waiting times for ra_neon,
> ra_serf has http pipelining support, which comes in handy. What I'd like
> to do is this:
> -> send request to get all revprops from rev. N from the master
> -> send replay report request for rev. N to master
> -> send request to get all revprops from rev. N+1 from the master
> -> send replay report request for rev. N+1 to master
> -> read the revprops for rev. N
> -> while opening the editor, add revprops to it.
> -> parse replay report for rev. N and drive editor
> -> commit rev. N & revprops on the slave repository
> .. wait on slave server response ..
> -> back to step 1.
>
> Attached patch will do a part of this. It already eliminates most of the
> time spent waiting on the master server for the replay report, by
> sending multiple replay requests over the pipeline while still parsing
> them one by one. The other parts of above flow aren't changed.
>
> The patch adds a new ra api function, svn_ra_replay_range, which takes a
> range of revisions and two callback functions. The first callback
> function (report start) is used to create the editor, the second
> callback function (report finished) will close the editor and copy all
> rev props. With this patch I had to create a second connection to copy
> the revprops, I hope to eliminate this 2nd connection again in a later
> stage.
>
> Some small tests show that the patch improves svnsync performance with
> about 15% (with a relatively fast network connection to the master
> server). While it passes the svnsync regression tests over the 4 ra
> layers, I've encountered some issues while testing it with different
> repositories, so it's not ready to commit.
>
> Review of the above approach and the patch is welcome, specifically I'd
> like to know if I'm not breaking any rules concerning atomic behavior,
> race conditions etc.
>
> Lieven
>
> Incomplete log message:
> [[[
>
> * subversion/libsvn_ra_serf/replay.c
> (svn_ra_serf__replay_range): new function, sends requests to replay a
> range
> of revisions and then calls the callbacks while parsing the response.
> (start_replay): calls the revstart callback when encountering the report's
> opening tag.
> (end_replay): calls the revfinish callback when encountering the report's
> closing tag.
> (create_replay_body): share the body creation code between
> svn_ra_serf__replay and svn_ra_serf__replay_range.
> (svn_ra_serf__replay): use the new create_replay_body delegate.
>
> * subversion/svnsync/main.c
> (open_source_session): create a second session to the master repo to
> download
> the revprops.
> (replay_baton_t): new baton, passed around between ra_serf and replay
> callback functions.
> (replay_rev_started): replay callback function, opens an editor.
> (replay_rev_finished): replay callback function, closes the editor and
> copies
> revprops from master to slave.
> (do_synchronize): use the new api svn_ra_replay_range.
> ]]]
>
> Index: subversion/include/svn_ra.h
> ===================================================================
> --- subversion/include/svn_ra.h (revision 27493)
> +++ subversion/include/svn_ra.h (working copy)
> @@ -183,6 +183,34 @@
> void *baton,
> apr_pool_t *pool);
>
> +/**
> + * TODO: finish comment!
> + *
> + * Callback function type replay actions.
> + *
> + * @since New in 1.5.
> + */
> +typedef svn_error_t *(*svn_ra_replay_revstart_callback_t)
> + (svn_revnum_t current,
> + void *replay_baton,
> + const svn_delta_editor_t **editor,
> + void **edit_baton,
> + apr_pool_t *pool);
> +
> + /**
> + * TODO: finish comment!
> + *
> + * Callback function type replay actions.
> + *
> + * @since New in 1.5.
> + */
> +typedef svn_error_t *(*svn_ra_replay_revfinish_callback_t)
> + (svn_revnum_t current,
> + void *replay_baton,
> + const svn_delta_editor_t *editor,
> + void *edit_baton,
> + apr_pool_t *pool);
> +
>
> /**
> * The update Reporter.
> @@ -1487,6 +1515,8 @@
>
>
> /**
> + * TODO: update comment (include start/end revision & callbacks)
> + *
> * Replay the changes from @a revision through @a editor and @a edit_baton.
> *
> * Changes will be limited to those that occur under @a session's URL, and
> @@ -1503,6 +1533,36 @@
> *
> * @a pool is used for all allocation.
> *
> + * @since New in 1.5.
> + */
> +svn_error_t *
> +svn_ra_replay_range(svn_ra_session_t *session,
> + svn_revnum_t start_revision,
> + svn_revnum_t end_revision,
> + svn_revnum_t low_water_mark,
> + svn_boolean_t send_deltas,
> + svn_ra_replay_revstart_callback_t revstart_func,
> + svn_ra_replay_revfinish_callback_t revfinish_func,
> + void *replay_baton,
> + apr_pool_t *pool);
> +
> +/**
> + * Replay the changes from @a revision through @a editor and @a edit_baton.
> + *
> + * Changes will be limited to those that occur under @a session's URL, and
> + * the server will assume that the client has no knowledge of revisions
> + * prior to @a low_water_mark. These two limiting factors define the portion
> + * of the tree that the server will assume the client already has knowledge of,
> + * and thus any copies of data from outside that part of the tree will be
> + * sent in their entirety, not as simple copies or deltas against a previous
> + * version.
> + *
> + * If @a send_deltas is @c TRUE, the actual text and property changes in
> + * the revision will be sent, otherwise dummy text deltas and null property
> + * changes will be sent instead.
> + *
> + * @a pool is used for all allocation.
> + *
> * @since New in 1.4.
> */
> svn_error_t *svn_ra_replay(svn_ra_session_t *session,
> Index: subversion/libsvn_ra/ra_loader.c
> ===================================================================
> --- subversion/libsvn_ra/ra_loader.c (revision 27493)
> +++ subversion/libsvn_ra/ra_loader.c (working copy)
> @@ -1060,10 +1060,28 @@
> void *edit_baton,
> apr_pool_t *pool)
> {
> - return session->vtable->replay(session, revision, low_water_mark,
> - text_deltas, editor, edit_baton, pool);
> + return session->vtable->replay(session, revision, low_water_mark,
> + text_deltas, editor, edit_baton,
> + pool);
> }
>
> +svn_error_t *
> +svn_ra_replay_range(svn_ra_session_t *session,
> + svn_revnum_t start_revision,
> + svn_revnum_t end_revision,
> + svn_revnum_t low_water_mark,
> + svn_boolean_t text_deltas,
> + svn_ra_replay_revstart_callback_t revstart_func,
> + svn_ra_replay_revfinish_callback_t revfinish_func,
> + void *replay_baton,
> + apr_pool_t *pool)
> +{
> + return session->vtable->replay_range(session, start_revision, end_revision,
> + low_water_mark, text_deltas,
> + revstart_func, revfinish_func,
> + replay_baton, pool);
> +}
> +
> svn_error_t *svn_ra_has_capability(svn_ra_session_t *session,
> svn_boolean_t *has,
> const char *capability,
> Index: subversion/libsvn_ra/ra_loader.h
> ===================================================================
> --- subversion/libsvn_ra/ra_loader.h (revision 27493)
> +++ subversion/libsvn_ra/ra_loader.h (working copy)
> @@ -228,7 +228,7 @@
> svn_error_t *(*replay)(svn_ra_session_t *session,
> svn_revnum_t revision,
> svn_revnum_t low_water_mark,
> - svn_boolean_t text_deltas,
> + svn_boolean_t send_deltas,
> const svn_delta_editor_t *editor,
> void *edit_baton,
> apr_pool_t *pool);
> @@ -236,6 +236,16 @@
> svn_boolean_t *has,
> const char *capability,
> apr_pool_t *pool);
> + svn_error_t *
> + (*replay_range)(svn_ra_session_t *session,
> + svn_revnum_t start_revision,
> + svn_revnum_t end_revision,
> + svn_revnum_t low_water_mark,
> + svn_boolean_t text_deltas,
> + svn_ra_replay_revstart_callback_t revstart_func,
> + svn_ra_replay_revfinish_callback_t revfinish_func,
> + void *replay_baton,
> + apr_pool_t *pool);
> } svn_ra__vtable_t;
>
> /* The RA session object. */
> Index: subversion/libsvn_ra_local/ra_plugin.c
> ===================================================================
> --- subversion/libsvn_ra_local/ra_plugin.c (revision 27493)
> +++ subversion/libsvn_ra_local/ra_plugin.c (working copy)
> @@ -1415,6 +1415,50 @@
> }
>
>
> +svn_error_t *
> +svn_ra_local__replay_range(svn_ra_session_t *session,
> + svn_revnum_t start_revision,
> + svn_revnum_t end_revision,
> + svn_revnum_t low_water_mark,
> + svn_boolean_t send_deltas,
> + svn_ra_replay_revstart_callback_t revstart_func,
> + svn_ra_replay_revfinish_callback_t revfinish_func,
> + void *replay_baton,
> + apr_pool_t *pool)
> +{
> + svn_revnum_t rev;
> + apr_pool_t *subpool = svn_pool_create(pool);
> +
> + for (rev = start_revision ; rev <= end_revision ; rev++)
> + {
> + svn_ra_local__session_baton_t *sess = session->priv;
> + svn_fs_root_t *root;
> + svn_delta_editor_t *editor;
> + void *edit_baton;
> +
> + svn_pool_clear(subpool);
> +
> + SVN_ERR(revstart_func(rev, replay_baton,
> + &editor, &edit_baton,
> + subpool));
> +
> + SVN_ERR(svn_fs_revision_root(&root, svn_repos_fs(sess->repos),
> + rev, subpool));
> +
> + SVN_ERR(svn_repos_replay2(root, sess->fs_path->data, low_water_mark,
> + send_deltas, editor, edit_baton,
> + NULL, NULL, subpool));
> +
> + SVN_ERR(revfinish_func(rev, replay_baton,
> + editor, edit_baton,
> + subpool));
> + }
> + svn_pool_destroy(subpool);
> +
> + return SVN_NO_ERROR;
> +}
> +
> +
> static svn_error_t *
> svn_ra_local__has_capability(svn_ra_session_t *session,
> svn_boolean_t *has,
> @@ -1479,7 +1523,8 @@
> svn_ra_local__get_lock,
> svn_ra_local__get_locks,
> svn_ra_local__replay,
> - svn_ra_local__has_capability
> + svn_ra_local__has_capability,
> + svn_ra_local__replay_range
> };
>
>
> Index: subversion/libsvn_ra_neon/ra_neon.h
> ===================================================================
> --- subversion/libsvn_ra_neon/ra_neon.h (revision 27493)
> +++ subversion/libsvn_ra_neon/ra_neon.h (working copy)
> @@ -1012,6 +1012,19 @@
> apr_pool_t *pool);
>
> /*
> + * Implements the replay_range RA layer function. */
> +svn_error_t *
> +svn_ra_neon__replay_range(svn_ra_session_t *session,
> + svn_revnum_t start_revision,
> + svn_revnum_t end_revision,
> + svn_revnum_t low_water_mark,
> + svn_boolean_t send_deltas,
> + svn_ra_replay_revstart_callback_t revstart_func,
> + svn_ra_replay_revfinish_callback_t revfinish_func,
> + void *replay_baton,
> + apr_pool_t *pool);
> +
> +/*
> * Implements the has_capability RA layer function. */
> svn_error_t *
> svn_ra_neon__has_capability(svn_ra_session_t *session,
> Index: subversion/libsvn_ra_neon/replay.c
> ===================================================================
> --- subversion/libsvn_ra_neon/replay.c (revision 27493)
> +++ subversion/libsvn_ra_neon/replay.c (working copy)
> @@ -486,3 +486,67 @@
> FALSE, /* spool response */
> pool);
> }
> +
> +
> +svn_error_t *
> +svn_ra_neon__replay_range(svn_ra_session_t *session,
> + svn_revnum_t start_revision,
> + svn_revnum_t end_revision,
> + svn_revnum_t low_water_mark,
> + svn_boolean_t send_deltas,
> + svn_ra_replay_revstart_callback_t revstart_func,
> + svn_ra_replay_revfinish_callback_t revfinish_func,
> + void *replay_baton,
> + apr_pool_t *pool)
> +{
> + svn_ra_neon__session_t *ras = session->priv;
> + replay_baton_t rb;
> + svn_revnum_t rev;
> + apr_pool_t *subpool = svn_pool_create(pool);
> +
> + for (rev = start_revision ; rev <= end_revision ; rev++)
> + {
> + const char *body;
> +
> + svn_pool_clear(subpool);
> +
> + body = apr_psprintf(subpool,
> + "<S:replay-report xmlns:S=\"svn:\">\n"
> + " <S:revision>%ld</S:revision>\n"
> + " <S:low-water-mark>%ld</S:low-water-mark>\n"
> + " <S:send-deltas>%d</S:send-deltas>\n"
> + "</S:replay-report>",
> + rev, low_water_mark, send_deltas);
> +
> +
> + memset(&rb, 0, sizeof(rb));
> +
> + rb.pool = subpool;
> + rb.dirs = apr_array_make(subpool, 5, sizeof(dir_item_t));
> + rb.prop_pool = svn_pool_create(subpool);
> + rb.prop_accum = svn_stringbuf_create("", rb.prop_pool);
> +
> + SVN_ERR(revstart_func(rev, replay_baton,
> + &rb.editor, &rb.edit_baton,
> + subpool));
> +
> + SVN_ERR(svn_ra_neon__parsed_request(ras, "REPORT", ras->url->data, body,
> + NULL, NULL,
> + start_element,
> + cdata_handler,
> + end_element,
> + &rb,
> + NULL, /* extra headers */
> + NULL, /* status code */
> + FALSE, /* spool response */
> + subpool));
> +
> + SVN_ERR(revfinish_func(rev, replay_baton,
> + rb.editor, rb.edit_baton,
> + subpool));
> + }
> +
> + svn_pool_destroy(subpool);
> +
> + return SVN_NO_ERROR;
> +}
> Index: subversion/libsvn_ra_neon/session.c
> ===================================================================
> --- subversion/libsvn_ra_neon/session.c (revision 27493)
> +++ subversion/libsvn_ra_neon/session.c (working copy)
> @@ -1142,7 +1142,8 @@
> svn_ra_neon__get_lock,
> svn_ra_neon__get_locks,
> svn_ra_neon__replay,
> - svn_ra_neon__has_capability
> + svn_ra_neon__has_capability,
> + svn_ra_neon__replay_range,
> };
>
> svn_error_t *
> Index: subversion/libsvn_ra_serf/ra_serf.h
> ===================================================================
> --- subversion/libsvn_ra_serf/ra_serf.h (revision 27493)
> +++ subversion/libsvn_ra_serf/ra_serf.h (working copy)
> @@ -1112,12 +1112,23 @@
> svn_ra_serf__replay(svn_ra_session_t *ra_session,
> svn_revnum_t revision,
> svn_revnum_t low_water_mark,
> - svn_boolean_t text_deltas,
> + svn_boolean_t send_deltas,
> const svn_delta_editor_t *editor,
> void *edit_baton,
> apr_pool_t *pool);
>
> svn_error_t *
> +svn_ra_serf__replay_range(svn_ra_session_t *ra_session,
> + svn_revnum_t start_revision,
> + svn_revnum_t end_revision,
> + svn_revnum_t low_water_mark,
> + svn_boolean_t send_deltas,
> + svn_ra_replay_revstart_callback_t revstart_func,
> + svn_ra_replay_revfinish_callback_t revfinish_func,
> + void *replay_baton,
> + apr_pool_t *pool);
> +
> +svn_error_t *
> svn_ra_serf__lock(svn_ra_session_t *ra_session,
> apr_hash_t *path_revs,
> const char *comment,
> Index: subversion/libsvn_ra_serf/replay.c
> ===================================================================
> --- subversion/libsvn_ra_serf/replay.c (revision 27493)
> +++ subversion/libsvn_ra_serf/replay.c (working copy)
> @@ -91,9 +91,21 @@
> /* are we done? */
> svn_boolean_t done;
>
> + /* callback to get an editor */
> + svn_ra_replay_revstart_callback_t revstart_func;
> + svn_ra_replay_revfinish_callback_t revfinish_func;
> + void *replay_baton;
> +
> /* replay receiver function and baton */
> const svn_delta_editor_t *editor;
> void *editor_baton;
> +
> + /* current revision */
> + svn_revnum_t revision;
> +
> + /* Information needed to create the replay report body */
> + svn_revnum_t low_water_mark;
> + svn_boolean_t send_deltas;
> } replay_context_t;
>
>
> @@ -147,6 +159,9 @@
> if (state == NONE &&
> strcmp(name.name, "editor-report") == 0)
> {
> + SVN_ERR(ctx->revstart_func(ctx->revision, ctx->replay_baton,
> + &ctx->editor, &ctx->editor_baton,
> + ctx->pool));
> push_state(parser, ctx, REPORT);
> }
> else if (state == REPORT &&
> @@ -414,6 +429,9 @@
> strcmp(name.name, "editor-report") == 0)
> {
> svn_ra_serf__xml_pop_state(parser);
> + SVN_ERR(ctx->revfinish_func(ctx->revision, ctx->replay_baton,
> + ctx->editor, ctx->editor_baton,
> + ctx->pool));
> }
> else if (state == OPEN_DIR && strcmp(name.name, "open-directory") == 0)
> {
> @@ -509,6 +527,52 @@
> return SVN_NO_ERROR;
> }
>
> +static serf_bucket_t *
> +create_replay_body(void *baton,
> + serf_bucket_alloc_t *alloc,
> + apr_pool_t *pool)
> +{
> + replay_context_t *ctx = baton;
> + serf_bucket_t *body_bkt, *tmp;
> +
> + body_bkt = serf_bucket_aggregate_create(alloc);
> +
> + tmp = SERF_BUCKET_SIMPLE_STRING_LEN("<S:replay-report xmlns:S=\"",
> + sizeof("<S:replay-report xmlns:S=\"")-1,
> + alloc);
> + serf_bucket_aggregate_append(body_bkt, tmp);
> +
> + tmp = SERF_BUCKET_SIMPLE_STRING_LEN(SVN_XML_NAMESPACE,
> + sizeof(SVN_XML_NAMESPACE)-1,
> + alloc);
> + serf_bucket_aggregate_append(body_bkt, tmp);
> +
> + tmp = SERF_BUCKET_SIMPLE_STRING_LEN("\">",
> + sizeof("\">")-1,
> + alloc);
> + serf_bucket_aggregate_append(body_bkt, tmp);
> +
> + svn_ra_serf__add_tag_buckets(body_bkt,
> + "S:revision", apr_ltoa(pool, ctx->revision),
> + alloc);
> + svn_ra_serf__add_tag_buckets(body_bkt,
> + "S:low-water-mark",
> + apr_ltoa(pool, ctx->low_water_mark),
> + alloc);
> +
> + svn_ra_serf__add_tag_buckets(body_bkt,
> + "S:send-deltas",
> + apr_ltoa(pool, ctx->send_deltas),
> + alloc);
> +
> + tmp = SERF_BUCKET_SIMPLE_STRING_LEN("</S:replay-report>",
> + sizeof("</S:replay-report>")-1,
> + alloc);
> + serf_bucket_aggregate_append(body_bkt, tmp);
> +
> + return body_bkt;
> +}
> +
> svn_error_t *
> svn_ra_serf__replay(svn_ra_session_t *ra_session,
> svn_revnum_t revision,
> @@ -522,54 +586,21 @@
> svn_ra_serf__session_t *session = ra_session->priv;
> svn_ra_serf__handler_t *handler;
> svn_ra_serf__xml_parser_t *parser_ctx;
> - serf_bucket_t *buckets, *tmp;
>
> replay_ctx = apr_pcalloc(pool, sizeof(*replay_ctx));
> replay_ctx->pool = pool;
> replay_ctx->editor = editor;
> replay_ctx->editor_baton = edit_baton;
> replay_ctx->done = FALSE;
> + replay_ctx->low_water_mark = low_water_mark;
> + replay_ctx->send_deltas = send_deltas;
>
> - buckets = serf_bucket_aggregate_create(session->bkt_alloc);
> -
> - tmp = SERF_BUCKET_SIMPLE_STRING_LEN("<S:replay-report xmlns:S=\"",
> - sizeof("<S:replay-report xmlns:S=\"")-1,
> - session->bkt_alloc);
> - serf_bucket_aggregate_append(buckets, tmp);
> -
> - tmp = SERF_BUCKET_SIMPLE_STRING_LEN(SVN_XML_NAMESPACE,
> - sizeof(SVN_XML_NAMESPACE)-1,
> - session->bkt_alloc);
> - serf_bucket_aggregate_append(buckets, tmp);
> -
> - tmp = SERF_BUCKET_SIMPLE_STRING_LEN("\">",
> - sizeof("\">")-1,
> - session->bkt_alloc);
> - serf_bucket_aggregate_append(buckets, tmp);
> -
> - svn_ra_serf__add_tag_buckets(buckets,
> - "S:revision", apr_ltoa(pool, revision),
> - session->bkt_alloc);
> - svn_ra_serf__add_tag_buckets(buckets,
> - "S:low-water-mark",
> - apr_ltoa(pool, low_water_mark),
> - session->bkt_alloc);
> -
> - svn_ra_serf__add_tag_buckets(buckets,
> - "S:send-deltas",
> - apr_ltoa(pool, send_deltas),
> - session->bkt_alloc);
> -
> - tmp = SERF_BUCKET_SIMPLE_STRING_LEN("</S:replay-report>",
> - sizeof("</S:replay-report>")-1,
> - session->bkt_alloc);
> - serf_bucket_aggregate_append(buckets, tmp);
> -
> handler = apr_pcalloc(pool, sizeof(*handler));
>
> handler->method = "REPORT";
> handler->path = session->repos_url_str;
> - handler->body_buckets = buckets;
> + handler->body_delegate = create_replay_body;
> + handler->body_delegate_baton = replay_ctx;
> handler->body_type = "text/xml";
> handler->conn = session->conns[0];
> handler->session = session;
> @@ -588,7 +619,88 @@
>
> svn_ra_serf__request_create(handler);
>
> - SVN_ERR(svn_ra_serf__context_run_wait(&replay_ctx->done, session, pool));
> + return SVN_NO_ERROR;
> +}
>
> +svn_error_t *
> +svn_ra_serf__replay_range(svn_ra_session_t *ra_session,
> + svn_revnum_t start_revision,
> + svn_revnum_t end_revision,
> + svn_revnum_t low_water_mark,
> + svn_boolean_t send_deltas,
> + svn_ra_replay_revstart_callback_t revstart_func,
> + svn_ra_replay_revfinish_callback_t revfinish_func,
> + void *replay_baton,
> + apr_pool_t *pool)
> +{
> + replay_context_t *replay_ctx;
> + svn_ra_serf__session_t *session = ra_session->priv;
> + svn_ra_serf__handler_t *handler;
> + svn_ra_serf__xml_parser_t *parser_ctx;
> + svn_revnum_t rev = start_revision;
> +
> + while (1)
> + {
> + apr_status_t status;
> +
> + /* Send pending requests, if any */
> + if (rev <= end_revision)
> + {
> + replay_ctx = apr_pcalloc(pool, sizeof(*replay_ctx));
> + replay_ctx->pool = pool;
> + replay_ctx->revstart_func = revstart_func;
> + replay_ctx->revfinish_func = revfinish_func;
> + replay_ctx->replay_baton = replay_baton;
> + replay_ctx->done = FALSE;
> + replay_ctx->revision = rev;
> + replay_ctx->low_water_mark = low_water_mark;
> + replay_ctx->send_deltas = send_deltas;
> +
> + handler = apr_pcalloc(pool, sizeof(*handler));
> +
> + handler->method = "REPORT";
> + handler->path = session->repos_url_str;
> + handler->body_delegate = create_replay_body;
> + handler->body_delegate_baton = replay_ctx;
> + handler->conn = session->conns[0];
> + handler->session = session;
> +
> + parser_ctx = apr_pcalloc(pool, sizeof(*parser_ctx));
> +
> + parser_ctx->pool = pool;
> + parser_ctx->user_data = replay_ctx;
> + parser_ctx->start = start_replay;
> + parser_ctx->end = end_replay;
> + parser_ctx->cdata = cdata_replay;
> + parser_ctx->done = &replay_ctx->done;
> +
> + handler->response_handler = svn_ra_serf__handle_xml_parser;
> + handler->response_baton = parser_ctx;
> +
> + svn_ra_serf__request_create(handler);
> +
> + rev++;
> + }
> +
> + /* Run the serf loop, send outgoing and process incoming requests.
> + Continue doing this if there are enough requests sent or there are no
> + more pending requests. */
> + status = serf_context_run(session->context, SERF_DURATION_FOREVER, pool);
> + if (APR_STATUS_IS_TIMEUP(status))
> + {
> + continue;
> + }
> + if (status)
> + {
> + SVN_ERR(session->pending_error);
> +
> + return svn_error_wrap_apr(status,
> + _("Error retrieving replay REPORT (%d)"),
> + status);
> + }
> + if (replay_ctx->done)
> + break;
> + }
> +
> return SVN_NO_ERROR;
> }
> Index: subversion/libsvn_ra_serf/serf.c
> ===================================================================
> --- subversion/libsvn_ra_serf/serf.c (revision 27493)
> +++ subversion/libsvn_ra_serf/serf.c (working copy)
> @@ -917,7 +917,8 @@
> svn_ra_serf__get_lock,
> svn_ra_serf__get_locks,
> svn_ra_serf__replay,
> - svn_ra_serf__has_capability
> + svn_ra_serf__has_capability,
> + svn_ra_serf__replay_range,
> };
>
> svn_error_t *
> Index: subversion/libsvn_ra_svn/client.c
> ===================================================================
> --- subversion/libsvn_ra_svn/client.c (revision 27493)
> +++ subversion/libsvn_ra_svn/client.c (working copy)
> @@ -2149,6 +2149,54 @@
> }
>
>
> +static svn_error_t *
> +ra_svn_replay_range(svn_ra_session_t *session,
> + svn_revnum_t start_revision,
> + svn_revnum_t end_revision,
> + svn_revnum_t low_water_mark,
> + svn_boolean_t send_deltas,
> + svn_ra_replay_revstart_callback_t revstart_func,
> + svn_ra_replay_revfinish_callback_t revfinish_func,
> + void *replay_baton,
> + apr_pool_t *pool)
> +{
> + svn_ra_svn__session_baton_t *sess = session->priv;
> + svn_revnum_t rev;
> + apr_pool_t *subpool = svn_pool_create(pool);
> +
> + for (rev = start_revision ; rev <= end_revision ; rev++)
> + {
> + svn_delta_editor_t *editor;
> + void *edit_baton;
> +
> + svn_pool_clear(subpool);
> +
> + SVN_ERR(revstart_func(rev, replay_baton,
> + &editor, &edit_baton,
> + subpool));
> +
> + SVN_ERR(svn_ra_svn_write_cmd(sess->conn, subpool, "replay", "rrb", rev,
> + low_water_mark, send_deltas));
> +
> + SVN_ERR(handle_unsupported_cmd(handle_auth_request(sess, subpool),
> + _("Server doesn't support the replay "
> + "command")));
> +
> + SVN_ERR(svn_ra_svn_drive_editor2(sess->conn, subpool, editor, edit_baton,
> + NULL, TRUE));
> +
> + SVN_ERR(svn_ra_svn_read_cmd_response(sess->conn, subpool, ""));
> +
> + SVN_ERR(revfinish_func(rev, replay_baton,
> + editor, edit_baton,
> + subpool));
> + }
> + svn_pool_destroy(subpool);
> +
> + return SVN_NO_ERROR;
> +}
> +
> +
> static svn_error_t *ra_svn_has_capability(svn_ra_session_t *session,
> svn_boolean_t *has,
> const char *capability,
> @@ -2207,7 +2255,8 @@
> ra_svn_get_lock,
> ra_svn_get_locks,
> ra_svn_replay,
> - ra_svn_has_capability
> + ra_svn_has_capability,
> + ra_svn_replay_range,
> };
>
> svn_error_t *
> Index: subversion/svnsync/main.c
> ===================================================================
> --- subversion/svnsync/main.c (revision 27493)
> +++ subversion/svnsync/main.c (working copy)
> @@ -966,7 +966,9 @@
> }
>
>
> -/* Set *FROM_SESSION to an RA session associated with the source
> +/* TODO: fix comment
> + *
> + * Set *FROM_SESSION to an RA session associated with the source
> * repository of the synchronization, as determined by reading
> * svn:sync- properties from the destination repository (associated
> * with TO_SESSION). Set LAST_MERGED_REV to the value of the property
> @@ -977,6 +979,7 @@
> */
> static svn_error_t *
> open_source_session(svn_ra_session_t **from_session,
> + svn_ra_session_t **from_rp_session,
> svn_string_t **last_merged_rev,
> svn_ra_session_t *to_session,
> svn_ra_callbacks2_t *callbacks,
> @@ -999,12 +1002,21 @@
> (APR_EINVAL, NULL,
> _("Destination repository has not been initialized"));
>
> + /* Open the session to copy the revision data. */
> SVN_ERR(svn_ra_open2(from_session, from_url->data, callbacks, baton,
> config, pool));
> -
> SVN_ERR(check_if_session_is_at_repos_root(*from_session, from_url->data,
> pool));
>
> + /* Open the session to copy the revision properties. */
> + if (from_rp_session)
> + {
> + SVN_ERR(svn_ra_open2(from_rp_session, from_url->data, callbacks, baton,
> + config, pool));
> + SVN_ERR(check_if_session_is_at_repos_root(*from_rp_session, from_url->data,
> + pool));
> + }
> +
> /* Ok, now sanity check the UUID of the source repository, it
> wouldn't be a good thing to sync from a different repository. */
>
> @@ -1019,7 +1031,135 @@
> return SVN_NO_ERROR;
> }
>
> +/* Replay baton, used during sychnronization. */
> +typedef struct {
> + svn_ra_session_t *from_session;
> + svn_ra_session_t *to_session;
> + svn_ra_session_t *from_rp_session;
> + subcommand_baton_t *sb;
> +} replay_baton_t;
>
> +/* Return a replay baton allocated from POOL and populated with
> + data from the provided parameters. */
> +static replay_baton_t *
> +make_replay_baton(svn_ra_session_t *from_session, svn_ra_session_t *to_session,
> + svn_ra_session_t *from_rp_session,
> + subcommand_baton_t *sb, apr_pool_t *pool)
> +{
> + replay_baton_t *rb = apr_pcalloc(pool, sizeof(*rb));
> + rb->from_session = from_session;
> + rb->from_rp_session = from_rp_session;
> + rb->to_session = to_session;
> + rb->sb = sb;
> + return rb;
> +}
> +
> +/* Callback function for svn_ra_replay_range, invoked when starting to parse
> + * a replay report.
> + */
> +static svn_error_t *
> +replay_rev_started(svn_revnum_t revision,
> + void *replay_baton,
> + const svn_delta_editor_t **editor,
> + void **edit_baton,
> + apr_pool_t *pool)
> +{
> + const svn_delta_editor_t *commit_editor;
> + const svn_delta_editor_t *cancel_editor;
> + const svn_delta_editor_t *sync_editor;
> + void *commit_baton;
> + void *cancel_baton;
> + void *sync_baton;
> + replay_baton_t *rb = replay_baton;
> +
> + /* We set this property so that if we error out for some reason
> + we can later determine where we were in the process of
> + merging a revision. If we had committed the change, but we
> + hadn't finished copying the revprops we need to know that, so
> + we can go back and finish the job before we move on.
> +
> + NOTE: We have to set this before we start the commit editor,
> + because ra_svn doesn't let you change rev props during a
> + commit. */
> + SVN_ERR(svn_ra_change_rev_prop(rb->to_session, 0,
> + SVNSYNC_PROP_CURRENTLY_COPYING,
> + svn_string_createf(pool, "%ld",
> + revision),
> + pool));
> +
> + /* The actual copy is just a replay hooked up to a commit. */
> +
> + SVN_ERR(svn_ra_get_commit_editor2(rb->to_session, &commit_editor,
> + &commit_baton,
> + "", /* empty log */
> + commit_callback, rb->sb,
> + NULL, FALSE, pool));
> +
> + /* There's one catch though, the diff shows us props we can't
> + send over the RA interface, so we need an editor that's smart
> + enough to filter those out for us. */
> +
> + SVN_ERR(get_sync_editor(commit_editor, commit_baton, revision - 1,
> + rb->sb->to_url, rb->sb->quiet,
> + &sync_editor, &sync_baton, pool));
> +
> + SVN_ERR(svn_delta_get_cancellation_editor(check_cancel, NULL,
> + sync_editor, sync_baton,
> + &cancel_editor,
> + &cancel_baton,
> + pool));
> + *editor = cancel_editor;
> + *edit_baton = cancel_baton;
> +
> + return SVN_NO_ERROR;
> +}
> +
> +/* Callback function for svn_ra_replay_range, invoked when finishing parsing
> + * a replay report.
> + */
> +static svn_error_t *
> +replay_rev_finished(svn_revnum_t revision,
> + void *replay_baton,
> + const svn_delta_editor_t *editor,
> + void *edit_baton,
> + apr_pool_t *pool)
> +{
> + replay_baton_t *rb = replay_baton;
> +
> + SVN_ERR(editor->close_edit(edit_baton, pool));
> +
> + /* Sanity check that we actually committed the revision we meant to. */
> + if (rb->sb->committed_rev != revision)
> + return svn_error_createf
> + (APR_EINVAL, NULL,
> + _("Commit created rev %ld but should have created %ld"),
> + rb->sb->committed_rev, revision);
> +
> + /* Ok, we're done with the data, now we just need to do the
> + revprops and we're all set. */
> +
> + SVN_ERR(copy_revprops(rb->from_rp_session, rb->to_session, revision, TRUE,
> + rb->sb->quiet, pool));
> +
> + /* Ok, we're done, bring the last-merged-rev property up to date. */
> +
> + SVN_ERR(svn_ra_change_rev_prop
> + (rb->to_session,
> + 0,
> + SVNSYNC_PROP_LAST_MERGED_REV,
> + svn_string_create(apr_psprintf(pool, "%ld", revision),
> + pool),
> + pool));
> +
> + /* And finally drop the currently copying prop, since we're done
> + with this revision. */
> +
> + SVN_ERR(svn_ra_change_rev_prop(rb->to_session, 0,
> + SVNSYNC_PROP_CURRENTLY_COPYING,
> + NULL, pool));
> + return SVN_NO_ERROR;
> +}
> +
> /* Synchronize the repository associated with RA session TO_SESSION,
> * using information found in baton B, while the repository is
> * locked. Implements `with_locked_func_t' interface.
> @@ -1028,14 +1168,17 @@
> do_synchronize(svn_ra_session_t *to_session, void *b, apr_pool_t *pool)
> {
> svn_string_t *last_merged_rev;
> - svn_revnum_t from_latest, current;
> + svn_revnum_t from_latest;
> svn_ra_session_t *from_session;
> + svn_ra_session_t *from_rp_session;
> subcommand_baton_t *baton = b;
> - apr_pool_t *subpool;
> svn_string_t *currently_copying;
> svn_revnum_t to_latest, copying, last_merged;
> + svn_revnum_t start_revision, end_revision;
> + replay_baton_t *rb;
>
> - SVN_ERR(open_source_session(&from_session, &last_merged_rev, to_session,
> + SVN_ERR(open_source_session(&from_session, &from_rp_session,
> + &last_merged_rev, to_session,
> &(baton->source_callbacks), baton->config,
> baton, pool));
>
> @@ -1084,7 +1227,7 @@
> {
> if (copying > last_merged)
> {
> - SVN_ERR(copy_revprops(from_session, to_session,
> + SVN_ERR(copy_revprops(from_rp_session, to_session,
> to_latest, TRUE, baton->quiet, pool));
> last_merged = copying;
> last_merged_rev = svn_string_create
> @@ -1121,104 +1264,29 @@
>
> /* Now check to see if there are any revisions to copy. */
>
> - SVN_ERR(svn_ra_get_latest_revnum(from_session, &from_latest, pool));
> + SVN_ERR(svn_ra_get_latest_revnum(from_rp_session, &from_latest, pool));
>
> if (from_latest < atol(last_merged_rev->data))
> return SVN_NO_ERROR;
>
> - subpool = svn_pool_create(pool);
> -
> /* Ok, so there are new revisions, iterate over them copying them
> into the destination repository. */
>
> - for (current = atol(last_merged_rev->data) + 1;
> - current <= from_latest;
> - ++current)
> - {
> - const svn_delta_editor_t *commit_editor;
> - const svn_delta_editor_t *cancel_editor;
> - const svn_delta_editor_t *sync_editor;
> - void *commit_baton;
> - void *cancel_baton;
> - void *sync_baton;
> + rb = make_replay_baton(from_session, to_session,
> + from_rp_session,
> + baton, pool);
> +
> + start_revision = atol(last_merged_rev->data) + 1;
> + end_revision = from_latest;
> +
> + SVN_ERR(check_cancel(NULL));
>
> - svn_pool_clear(subpool);
> - SVN_ERR(check_cancel(NULL));
> + SVN_ERR(svn_ra_replay_range(from_session, start_revision, end_revision,
> + 0, TRUE,
> + replay_rev_started, replay_rev_finished,
> + rb,
> + pool));
>
> - /* We set this property so that if we error out for some reason
> - we can later determine where we were in the process of
> - merging a revision. If we had committed the change, but we
> - hadn't finished copying the revprops we need to know that, so
> - we can go back and finish the job before we move on.
> -
> - NOTE: We have to set this before we start the commit editor,
> - because ra_svn doesn't let you change rev props during a
> - commit. */
> - SVN_ERR(svn_ra_change_rev_prop(to_session, 0,
> - SVNSYNC_PROP_CURRENTLY_COPYING,
> - svn_string_createf(subpool, "%ld",
> - current),
> - subpool));
> -
> - /* The actual copy is just a replay hooked up to a commit. */
> -
> - SVN_ERR(svn_ra_get_commit_editor2(to_session, &commit_editor,
> - &commit_baton,
> - "", /* empty log */
> - commit_callback, baton,
> - NULL, FALSE, subpool));
> -
> - /* There's one catch though, the diff shows us props we can't
> - send over the RA interface, so we need an editor that's smart
> - enough to filter those out for us. */
> -
> - SVN_ERR(get_sync_editor(commit_editor, commit_baton, current - 1,
> - baton->to_url, baton->quiet,
> - &sync_editor, &sync_baton, subpool));
> -
> - SVN_ERR(svn_delta_get_cancellation_editor(check_cancel, NULL,
> - sync_editor, sync_baton,
> - &cancel_editor,
> - &cancel_baton,
> - subpool));
> -
> - SVN_ERR(svn_ra_replay(from_session, current, 0, TRUE,
> - cancel_editor, cancel_baton, subpool));
> -
> - SVN_ERR(cancel_editor->close_edit(cancel_baton, subpool));
> -
> - /* Sanity check that we actually committed the revision we meant to. */
> - if (baton->committed_rev != current)
> - return svn_error_createf
> - (APR_EINVAL, NULL,
> - _("Commit created rev %ld but should have created %ld"),
> - baton->committed_rev, current);
> -
> - /* Ok, we're done with the data, now we just need to do the
> - revprops and we're all set. */
> -
> - SVN_ERR(copy_revprops(from_session, to_session, current, TRUE,
> - baton->quiet, subpool));
> -
> - /* Ok, we're done, bring the last-merged-rev property up to date. */
> -
> - SVN_ERR(svn_ra_change_rev_prop
> - (to_session,
> - 0,
> - SVNSYNC_PROP_LAST_MERGED_REV,
> - svn_string_create(apr_psprintf(subpool, "%ld", current),
> - subpool),
> - subpool));
> -
> - /* And finally drop the currently copying prop, since we're done
> - with this revision. */
> -
> - SVN_ERR(svn_ra_change_rev_prop(to_session, 0,
> - SVNSYNC_PROP_CURRENTLY_COPYING,
> - NULL, subpool));
> - }
> -
> - svn_pool_destroy(subpool);
> return SVN_NO_ERROR;
> }
>
> @@ -1274,7 +1342,8 @@
> svn_revnum_t i;
> svn_revnum_t step = 1;
>
> - SVN_ERR(open_source_session(&from_session, &last_merged_rev, to_session,
> + SVN_ERR(open_source_session(&from_session, NULL, &last_merged_rev,
> + to_session,
> &(baton->source_callbacks), baton->config,
> baton, pool));
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@subversion.tigris.org
> For additional commands, e-mail: dev-help@subversion.tigris.org
>
--
David Glasser | glasser_at_davidglasser.net | http://www.davidglasser.net/
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@subversion.tigris.org
For additional commands, e-mail: dev-help@subversion.tigris.org
Received on Sat Nov 3 03:13:52 2007