Marconi progress and updates

Combined Stream

33 thoughts
last posted Jan. 9, 2014, 9:12 a.m.
get stream as: markdown or atom

We are currently working hard on a partitions feature that will ease the scale of Marconi server's horizontally. Not exactly rocket science but definitely and exiting feature to have.


I really like Marconi's architecture. It has the right level of abstraction and customization. Users can deploy Marconi based on their needs and current infrastructure.

I wonder what other people think about it.


The migration to testr has revealed interesting things about nosetests and Marconi tests themselves.


Quee's API has been split in 2 separte drivers. This means it is necessary to run an 'admin' node to expose the admin API. The admin API exposes things like stats, health, shards management, etc.


Here's to testing how a stream with the same title interacts with an existing stream.

Is this how one "shares" a stream?


We've tried to make the v1.0 API flexible so that it can facilitate lots of different messaging patterns. I hope we can preserve that as we start working on v1.1 and v2.0.


Our current thinking on Marconi's storage sharding architecture... diagram


Slightly updated sharding diagram:


We are thinking about publishing the API schema from the API itself, such that it can be consumed by client libraries in order to reduce the amount of hard-coded formatting of requests to the service. Should make it a lot easier for maintainers to adapt their clients when the API changes, and DRY up their code.


We're now thinking about having schema versions defined on the server side that can be consumed by both the client and transports. The idea is to also be able to expose that through Marconi's home page, have extensions and whatnot. Here's the blueprint for that:


flaper87 and kgriffs are so in sync. It's how we reach consensus so quickly on new ideas - it's because they actually communicate telepathically!

In other news, the discussions around how to implement sharding are starting to come together. We've been putting together pieces, and as they've come to light, it's been getting easier to see what works and what doesn't.

Next up: storage controllers to manage shard registration and mapping of queues to shards!


Kudos to flaper87 for his design work on the client. I like layers. Reminds me of cake.


This is the second year Marconi applies as a project for the OPW program. The last year it didn't gather much attention, probably because it was really really young. Now that it has grown quite a bit, It caught the attention of 2 women that are willing to work on it.



Sharding news:

Patches are starting to get merged on the admin API side of things. We now have a clear distinction between control plane and data plane storage drivers. This distinction made abstractions easier to cope with further along.

There's also a more encompassing notion of an admin API instance. An admin instance contains all the routes/resources/powers that a public API instance has, plus access to control plane features. In the case of sharding, an admin instance allows an operator to register shards and investigate the state of the catalogue that maps queues to shards.

Next up: finish getting the admin API for sharding merged in, get the catalogue portion reviewed, and then put it all together!


I'm impress with how many things to do, discuss and plan we still have in Marconi. The number of tasks keeps growing, new ideas come out of the blue and the whole team sticks together working on it!

Great work guys!


Trying to get some t-shirts put together. They will be awesome.


We've been triaging some ideas as a daily routine in the last couple of days. This ideas come either from ourselves or external users that have shown interest in Marconi.

Some of those ideas are really valid, others have been rejected mostly because they'd have broken Marconi's conceptual integrity.

Here's the list of ideas. Link


Marconi sharding progresses a little bit more. Development is proceeding at a frantic pace!

Latest today: catalogue storage driver was merged to main line.


The journey towards sharding capability in Marconi is an epic one. There's been lots of help from everyone towards deploying it, testing it, and making sure that the reference implementation is solid.

It's going to be awesome when this feature is ready. Only a bit more to go, and it's likely to be ready before the Openstack Summit!


Marconi sharding is getting very close:

  1. Sharding core
  2. Sharded listing of queues

Next up: caching, perf testing, functional testing


Wow, it's Wednesday and I can't still believe how great the last OpenStack summit was. We've amazing news and a bunch of new ideas. As far as Marconi is concerned, this is going to be a very busy development cycle.


We - alcabrera and myself - had a great pair reviewing session yesterday. We went through the sharding patches and I now feel more confident about letting them land.

There's still some more work to do there, but they're in a better shape now.


Marconi Redis is one step closer to being a proper backend for Marconi:

``` alejandro@rainbow-generator:~/Development/marconi-redis:[master %>]$ tox -e pep8 GLOB sdist-make: /home/alejandro/Development/marconi-redis/ pep8 inst-nodeps: /home/alejandro/Development/marconi-redis/.tox/dist/ pep8 runtests: commands[0] | flake8 pep8: commands succeeded congratulations :)


``` RedisCatalogueTests test_catalogue_entry_life_cycle ERROR 0.02 test_exists ERROR 0.01 test_get ERROR 0.01 test_get_raises_if_does_not_exist ERROR 0.01 test_list ERROR 0.01 test_update ERROR 0.01 test_update_raises_when_entry_does_not_exist ERROR 0.01 RedisClaimTests test_claim_lifecycle ERROR 0.02 test_do_not_extend_lifetime ERROR 0.03 test_expired_claim ERROR 0.01 test_extend_lifetime ERROR 0.03 test_extend_lifetime_with_grace_1 ERROR 0.03 test_extend_lifetime_with_grace_2 ERROR 0.03 test_illformed_id ERROR 0.01 RedisDriverTest test_control_db_instance OK 0.00 test_data_db_instance OK 0.00 RedisMessageTests test_bad_claim_id OK 0.01 test_bad_id OK 0.01 test_bad_marker OK 0.01 test_claim_effects FAIL 0.03 test_expired_messages FAIL 0.01 test_get_multi FAIL 0.03 test_message_lifecycle OK 0.01 test_multi_ids OK 0.01 RedisQueueTests test_list_None OK 0.02 test_list_project OK 0.02 test_queue_lifecycle ERROR 1.23 test_stats_for_empty_queue OK 0.01 RedisShardsTests test_create_replaces_on_duplicate_insert ERROR 0.00 test_create_succeeds ERROR 0.00 test_delete_nonexistent_is_silent ERROR 0.00 test_delete_works ERROR 0.00 test_detailed_get_returns_expected_content ERROR 0.00 test_drop_all_leads_to_empty_listing ERROR 0.00 test_exists ERROR 0.00 test_get_raises_if_not_found ERROR 0.00 test_get_returns_expected_content ERROR 0.00 test_listing_simple ERROR 0.00 test_update_raises_assertion_error_on_bad_fields ERROR 0.00 test_update_works ERROR 0.00 Ran 40 tests in 1.679s

FAILED (errors=27, failures=3) ```

In short:

  • Shards and Catalogue are not yet implemented (19/30 failures in 19 tests)
  • Claims are all wonky (7/30 failures in 7 tests)
  • Queues have some quirks (1/30 failures in 4 tests)
  • Messages have some quirks (3/30 failures in 7 tests)

Support for both FIFO and non-FIFO Redis is baked in. All it takes is flipping one configuration option and it just works:

conf [queues:storage:driver:redis] fifo = True

More updates coming tomorrow.


More progress on Marconi Redis:

``` alejandro@rainbow-generator:~/Development/marconi-redis:[master %>]$ MARCONI_TEST_REDIS=1 tox -e py27 -- GLOB sdist-make: /home/alejandro/Development/marconi-redis/ py27 inst-nodeps: /home/alejandro/Development/marconi-redis/.tox/dist/ py27 runtests: commands[0] | nosetests

RedisShardsTests test_create_replaces_on_duplicate_insert OK 0.10 test_create_succeeds OK 0.02 test_delete_nonexistent_is_silent OK 0.02 test_delete_works OK 0.02 test_detailed_get_returns_expected_content OK 0.01 test_drop_all_leads_to_empty_listing OK 0.01 test_exists OK 0.01 test_get_raises_if_not_found OK 0.01 test_get_returns_expected_content OK 0.01 test_listing_simple OK 0.02 test_update_raises_assertion_error_on_bad_fields OK 0.01 test_update_works OK 0.01 ```

Shard storage tests now pass!

It took some finagling, and implementing a list find command in Python. Since the Marconi API expects paginated results in alphabetical order (rather than lexicographical), Redis LIST commands had to be used, rather than SORTED SETS.


Marconi API expects lexicographical sorting.


Marconi Redis now supports catalogue storage!:

``` alejandro@rainbow-generator:~/Development/marconi-redis:[master %=]$ MARCONI_TEST_REDIS=1 tox -e py27 -- GLOB sdist-make: /home/alejandro/Development/marconi-redis/ py27 inst-nodeps: /home/alejandro/Development/marconi-redis/.tox/dist/ py27 runtests: commands[0] | nosetests

RedisCatalogueTests test_catalogue_entry_life_cycle OK 0.11 test_exists OK 0.01 test_get OK 0.01 test_get_raises_if_does_not_exist OK 0.01 test_list OK 0.02 test_update OK 0.01 test_update_raises_when_entry_does_not_exist OK 0.01

Slowest 1 tests took 0.11 secs: 0.11 RedisCatalogueTests.test_catalogue_entry_life_cycle

Ran 7 tests in 0.191s

OK ```


Marconi Redis still needs more work. Queues, shards, and catalogue are working. Messages are mostly working. Claims are not working at all.


I started working on a websocket transport for Marconi. It's pretty cool to see it perfectly fitting into the architecture and being able to use it without much effort. The code is here.

As for now, it just proxies Marconi's public wsgi transport. The goal is to use this transport as another reference for the work going on the API layer.


Marconi sharding is officially merged in! It's now possible to set up multiple storage nodes and partition queues amongst them. This makes it possible to scale Marconi quite a bit.

Thanks goes to everyone on the Marconi team and much of the team at the Atlanta Rackspace office for helping make this happen!

Marconi is growing - and could use help.

Before too much longer, we're going to be branching a notifications project off of the Marconi code base. That's to say, it'll grow along with queues, and might even be able to be launched with a unified API, but I see that it'll become it's own project some day.

Then there's the need to support more storage back ends. Currently, we support mongodb and sqlite. There's work started on sqlalchemy (awesome!) and you've probably already heard about my own efforts at redis support.

How about those transports? We only support WSGI/HTTP at the moment, implemented on top of the lean and lovely Falcon framework. We also have an extremely experimental websockets implementation available contributed by flaper87. I'm hoping to see a zeromq transport in the future. Then there's the upcoming nanomsg transport, as well. It's an exciting area, and I hope you'll join in and share your thoughts.

Features: there's many more planned. Better operational stats, queue quotas, queue flavors, Heat integration, Horizon integration, Tempest integration - just check out our blueprints page!

What's next on my plate? I hope to wrap up Marconi Redis, at least in beta form.


I think I've got a good py3-compatible pattern worked out for magic string messages. Check out this gist.


Architecturally, the biggest scalability bottleneck in Marconi's design at the moment is the dependency on FIFO semantics for queuing operations.

Each queue maps to a particular shard. This sharding design helped Marconi overcome its first scaling bottleneck, which was being able to handle many messages to multiple queues. Now, since queues can live on different storage nodes, it's possible to scale out a Marconi deployment.

However, there's still FIFO semantics to contend with. The FIFO invariant is enforced by the storage layer by taking advantage of atomic commit semantics when posting messages. This marker is unique to each queue. This is done so that messages are claimable in the order that they arrived. If messages would try to post concurrently, then one of those POST operations would "fail", causing an internal retry that isn't exposed to the connecting client. By fail, I mean, the operation is retried shortly after.

Fortunately, storage driver implementations for Marconi need not honor the FIFO property. If a particular workload does not require FIFO, then it becomes much easier to scale out such a deployment.

I can envision a future feature involving queue flavors where users submitting requests to Marconi can annotate their queue at creation time with attributes they care about. For example:

``` PUT /v1/queues/fifo_queue

{ "fifo": true, }

PUT /v1/queues/fast_queue

{ "persist": false, "fifo": false }

PUT /v1/queues/make_it_last

{ "persist": true, "fifo": false } ```

I've identified the flavors persist and fifo so far for choosing storage shards automatically. An example of a persistent storage flavor that offers FIFO is Marconi's reference mongodb implementation, that can be deployed with replica sets for added reliability. An example of a storage driver that's the polar opposite of that is my marconi-redis work-in-progress. Since the data is maintained in memory, it can be lost at any time.

It's all about configuration and letting the user choose what they need. I hope to see more of this in the future of Marconi.


Lately, I've been putting some thoughts around the bandwidth, storage and performance in Marconi.

A queuing service is not actually about queues, it's about messages. If you think about it, the real resource that travels from one endpoint to the other are messages. There are all kind of things that happen from the moment a message is sent and the moment it is received but at the very end, the message is all that matters.

Queuing services' reliability exists just because no one don't one to loose messages. Message routing exists because it is important to be able to tell the message where it should go. Filters exist because not every node wants to get all messages, they need to be selective. I could go on with this but I think I made my point.

With all that in mind, the resource that we need to improve and make sure it's as lightweight as possible during the whole transmission period, is the message.

In Marconi, it's possible to configure the maximum size allowed per message. This is a huge benefit for users because they can tune their setup based on their needs. However, something that is not being taken under consideration is that not all storage would work well with all kind of message sizes. For example, SQS has a maximum message size that is not configurable, which means that someone willing to bake Marconi with SQS won't be able to use a maximum size for messages that is bigger than SQS's. Now, bare with me. I know that example is crazy and that most probably no one, ever, will do that. Just take what it is, a crazy example.

Thankfully, Marconi has support for shards. It allows users to deploy Marconi and have several backends backing it up concurrently. With shards, it's possible to tell Marconi what storage to use for each queue. Although this is great, I don't think it's enough.

There's one thing that we need before we can call this feature useful for-realâ„¢, though:


Routers help with directing messages to their final destination. In the case of Marconi, they'll help with putting each message into the right storage. The idea is to make routers configurable at every possible - and reasonable - level. Routers should allow users to control the message flow based on the source, some headers, rate, queue flavors, size, etc. Most of the existing technologies allow users to do that already, nothing new so far.

Now, what if we take advantage of the routing support and we move it down to a field level.

Imagine a very write-heavy queue with a fairly high message size - documents, for example - sharing a store with other queues. In a scenario like that, other queues may be penalized by the queue with big messages. We could argue this saying that it could be possible to dedicate a cluster for that specific queue and leave the other for the most lightweight queues. However, for the sake of the argument, lets say that is not possible.

A possible solution for that would be to split the message into 2 separate resources, one containing the body and the other the rest of the message. The later would be routed to the shared storage backend whereas the former would be routed to a dedicated storage for 'big things'. This will keep the main storage lighter and it won't penalize other queues. The body will be kept in a separate storage and it'll be retrieve as soon as it has to be sent to the client.

All that may sound a bit weird. Why would you do that instead of putting the big message in the storage that is good for 'big messages' in first place?.

One reason I could think of is that not all stores will be good for big messages and not all stores will be good for 'fast messaging'. For example, if you've Marconi deployed on top of an AMQP broker, I wouldn't suggest sending huge messages through the broker but that doesn't mean you shouldn't be able to take advantage of the performances of the broker. In a case like this, keeping splitting the message could make sense. It'd be very simple to store the message's body into swift and send the rest of the message to the broker.

The idea is definitely not bullet proof, it's just an idea, but I'd definitely like to explore it a bit more.


BTW, just in case you didn't know this. We just released the first alpha version of marconiclient. wOOOOOOOOOOOt!

$ pip install 'python-marconiclient>=0.0.1a1'

Keep in mind it's an alpha version. Not full-featured and most likely a bit buggy.

The missing features are:

  • Support for shards (Patch is already under review)
  • Support for claims
  • Some endpoints are missing (health, for example)