For several years now, I have been primarily involved in figuring out how something should work , describing it, and submitting it to the development team. These are mainly things related to the real-time operation of all kinds of systems. And today, an example of such an operation. Additionally, I tried to write the code in such a way that it would be easy to expand.

What is it about?

We will build something like a simple online store. The whole thing will be built in a modern (or at least that's what they say at conferences, and it's actually the only right ) way, the components will be independent of each other, and the code is supposed to support such independence.

It won't be some very advanced "store" either. Just customers, products and the ability for the customer to buy a product. There will be no payment process, no building a shopping cart and most importantly - there will be no front-end. Instead, it will be microservices .

The entire code is on GitHub , and here I'll just show you the key elements. The repository also has instructions on how to use the whole thing.

Components

We will base our approach on big data elements. So we will store the data in a NoSQL database – here it will be MongoDB. Apache Kafka will be used to communicate with individual elements. Of course, we will not install everything, we will use ready-made solutions in the form of docker images , specifically docker-compose .

So – we build the docker-compose.yml file:

YAML

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
version : '2'
services :
   mongodb :
     image : mongo :5.0
     ports :
       - 27017 :27017
     volumes :
       - . /mongodata :/data/db
     environment :
       - MONGO _ INITDB _ ROOT _ USERNAME=root
       - MONGO _ INITDB _ ROOT _ PASSWORD=rootpass
   zookeeper :
     image : 'bitnami/zookeeper :latest'
     ports :
       - '2181:2181'
     environment :
       - ALLOW _ ANONYMOUS _ LOGIN=yes
   Kafka :
     image : 'bitnami/kafka :latest'
     ports :
       - '9092:9092'
       - '9093:9093'
     environment :
       - KAFKA _ BROKER _ ID=1
       - KAFKA _ CFG _ LISTENERS=PLAINTEXT :// :9092
       - KAFKA _ CFG _ ADVERTISED _ LISTENERS=PLAINTEXT ://127.0.0.1 :9092
       - KAFKA _ CFG _ ZOOKEEPER _ CONNECT=zookeeper :2181
       - ALLOW _ PLAINTEXT _ LISTENER=yes
       - KAFKA _ CFG _ LISTENER _ SECURITY _ PROTOCOL _ MAP=CLIENT :PLAINTEXT , EXTERNAL :PLAINTEXT
       - KAFKA _ CFG _ LISTENERS=CLIENT :// :9092 , EXTERNAL :// :9093
       - KAFKA _ CFG _ ADVERTISED _ LISTENERS=CLIENT ://kafka :9092 , EXTERNAL ://localhost :9093
       - KAFKA _ CFG _ INTER _ BROKER _ LISTENER _ NAME=CLIENT
     depends _ on :
       - zookeeper

As you can see, we use three elements: MongoDB, Kafka, and Zookeeper (which is needed by Kafka). We provide the appropriate ports for communication and connect the mongodata directory as a place where MongoDB will keep its data. This will allow us to store data even after all the toys are assembled .

We start the environment with the standard docker-compose up command (it can be with -d if we want to return to the shell, but for educational purposes I recommend leaving the log running).

Creation of customers

First, we create sample customers. To describe the customer, we created a Customer class defined as a dataclass in the models/customers.py file

Python

1
2
3
4
5
6
7
@dataclass
class Customer :
     first_name : page
     last_name : page
     address : page
     products : List [ Product ] = field ( default_factory = list , init = False )
     id : Optional [ page ] = None

As you can see, the client is simple – it has a name, surname, address, and a list of products. Products are of class Product (more on that in a moment).

In addition, the class has methods that return, for example, a dictionary (this will be useful for sending to Kafka) or JSON (for example, we can save the client to disk and later use this data in Spark). It also has methods that allow you to add products to those owned by the client:

Python

1
2
3
4
5
6
7
8
@dataclass
class Customer :
     def add_base_product ( self , product : ProductBase , quantity : float = 0.0 ) - > None :
         prod = Product ( ** product . to_dict ( ) , quantity = quantity )
         self . add_product ( prod )
     def add_product ( self , product : Product ) - > None :
         self . products . append ( product )

Why two? It turned out that when buying we add a product here and now (at the current price), but when listing what the customer did in the past (pulling data from the database), it may be that the price of the product could be different than the current one. And we do not want to lose information about the price at which the customer bought our product - after all, we may want to analyze someday whether a lower price translates into a larger number of products purchased. Or simply see how prices changed over time (with the current inflation this is a fashionable topic ;-). So we do not keep a link to the product ID but to a specific copy of this product.

Inflation analysis can be easily done by keeping the history of price changes next to the product – we do not do this in the presented solution.

Conveyor belt

Before we prepare our client list, it's time for communication interfaces. For this we will use Kafka (although there are scripts in the repository that allow you to work without Kafka).

What is Apache Kafka ? To put it simply, it is our transmission belt. A belt that can have many channels (so-called topics ) to which (each of these topics) many senders ( producers or producers ) can write and on the other hand many recipients ( consumers or consumers ) can read it. What's more - many can write to one topic, many can read from one topic (and Kafka keeps track of who has read how much), and everything happens independently, each at its own pace. Additionally, Kafka keeps messages on topics for some time (specified in the configuration), is very fast and has a few other cool features.

In our solution, Kafka can be replaced by anything. In fact, if we were to write a class from the streaming/kafka_class.py file, appropriately adapted to, for example, REST API, the remaining code should work (with the precision of imports of this new class and its use; gymnastics may be with the consumer ).

Ok, let's go back to this Kafka interface. We'll use one topic and three message types. It would be better to use three topics (and make sure that the message being sent has the right fields), but why not make it harder for yourself? 😉 In fact, the difficulty is the simplification in the code - we have one consumer of the Kafka topic, we keep everything in one file tools/kafka_read.py , and the event_type field in the message decides what happens next:

Python

1
2
3
4
5
6
for msg in kafka_consumer :
     try :
         eh = EventHandler ( msg . value , db )
     except Exception ace e :
         print ( f "Exception: {e}" )
         print ( msg )

To be more precise, the decision takes place in the EventHandler class.

But let's get back to creating our clients. We'll use the Faker package to come up with personal data, and then:

  1. we will generate several customers ( Customer class objects),
  2. we will convert each one to dict type
  3. producer will convert (serialize) it to JSON
  4. and this is the format in which the message will be sent through the Kafka pipe as a message with event_type = “new_customer”

On the other side (as a consumer ) of the Kafka topic there will be a listener (the already mentioned tools/kafka_read.py script) which will react appropriately, i.e.:

  1. will download a message from the Kafka topic
  2. consumer will decode the JSON into dict
  3. such a dict type object will be converted to an EventHandler class object
  4. initialization within the EventHandler class will call the appropriate method for event_type
  5. an object of class Customer will be created
  6. and will be saved to the database

Phew... a meticulous process. Why create a Customer object, serialize it to JSON, send it, then receive it and from JSON make it Customer again? So that it is independent of each other.

If we were to send Customers to the database right away instead of using Kafka along the way, and it would stop working, that would be a problem. But in this case – Kafka can accept data and hold it until it is read. In the meantime, the database can be repaired or a new one (even of a different type!) can be set up.

The repository has a solution that pushes data to the database directly and via Kafka – compare both scripts.

Products

Since we have clients and a way to send information, we can make products. Similarly to the previous one – we have the appropriate Product data class inheriting from ProductBase (the difference is simple: Product is something that the client has already bought – so we know when, how much and for how much, ProductBase is something that can be bought – we know what it is and how much it costs at the moment):

Python

1
2
3
4
5
6
7
8
9
10
11
@dataclass
class ProductBase :
     name : page
     type : page
     price : float
     id : Optional [ page ] = None
@dataclass
class Product ( ProductBase ) :
     quantity : float = field ( default = 0.0 )
     timestamp : datetime = field ( default_factory = datetime . utcnow )

Full code in the repo of course, models/product.py .

Creation of products

Similarly to clients – we can create products by sending them through Kafka tools/make_products_kafka.py or directly sending them to the database tools/make_products_db.py . The scripts are quite similar but they give different IDs to the products being created – so that they can be easily distinguished (the clients being created can also be distinguished by their IDs). I suggest running both scripts, or rather all four (two for creating products and clients).

What do we have in the database?

Well, exactly – we created clients, we created products. But can we see what the client portfolio looks like, what each person has? Of course, there are ready-made scripts for that too:

If you look closely at them:

Tests

So let's try from the beginning - we start with nothing:

Shell

1
2
3
$ python tools / list_customers .py
$ python tools / list_products .py
$

So let's create some clients right away in the database:

Shell

1
2
3
4
5
6
7
8
9
10
11
12
13
$ python tools / make_customers_db .py
$ python tools / list_customers .py
Damian The Victim ( customer_db_000 ) - there are no products .
Roxana Net ( customer_db_001 ) - there are no products .
Jakub Fronczyk ( customer_db_002 ) - there are no products .
Ewelina Sobolak ( customer_db_003 ) - there are no products .
Richard Pietrek ( customer_db_004 ) - there are no products .
$

Let's add clients created and sent with Kafka.

Important – the script tools/kafka_read.py must be running in another console window – otherwise the data from Kafka will not be collected! But you can check what happens – creating clients will work (the script will not crash), but there will be no clients in the database. Turn on the reader later and see if new people appear in the database after a while?

Shell

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ python tools / make_customers_kafka .py
$ python tools / list_customers .py
Damian The Victim ( customer_db_000 ) - there are no products .
Roxana Net ( customer_db_001 ) - there are no products .
Jakub Fronczyk ( customer_db_002 ) - there are no products .
Ewelina Sobolak ( customer_db_003 ) - there are no products .
Richard Pietrek ( customer_db_004 ) - there are no products .
Alex Kosiak ( customer_k_000 ) - there are no products .
Grzegorz Po ł om ( customer_k_001 ) - there are no products .
Anna Maria W e e n ( customer_k_002 ) - there are no products .
Thaddeus J ę dral ( customer_k_003 ) - there are no products .
Cyprian Ostapchuk ( customer_k_004 ) - there are no products .
$

Now it's time to create products - let's do them at once in two ways:

Shell

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
$ python tools / make_products_db .py
$ python tools / make_products_kafka .py
$ python tools / list_products .py
Product : "Fancy Milk" ( milk , price 0.54 zloty , ID milk_db_1 )
Product : "Spotted" ( milk , price 0.64 zloty , ID milk_db_2 )
Product : "Lump sugar" ( sugar , price 0.95 zloty , ID sugar_db_1 )
Product : "Fine sugar" ( sugar , price 0.86 zloty , ID sugar_db_2 )
Product : "Milk Milk" ( chocolate , price 0.05 zloty , ID chocolate_db_1 )
Product : "Wedel Bitter" ( chocolate , price 0.93 zloty , ID chocolate_db_2 )
Product : "Alpengold with nuts" ( chocolate , price 0.29 zloty , ID chocolate_db_3 )
Product : "Alpengold milky" ( chocolate , price 0.47 zloty , ID chocolate_db_4 )
Product : "Old Polish bread" ( bread , price 0.56 zloty , ID bread_db_1 )
Product : "Country bread" ( bread , price 0.96 zloty , ID bread_db_2 )
Product : "Daily Bread" ( bread , price 0.23 zloty , ID bread_db_3 )
Product : "Extra butter" ( butter , price 0.85 zloty , ID butter_db_1 )
Product : "Butter Butter" ( butter , price 0.35 zloty , ID butter_db_2 )
Product : "Fancy Milk" ( milk , price 0.32 zloty , ID milk_k_1 )
Product : "Spotted" ( milk , price 0.16 zloty , ID milk_k_2 )
Product : "Lump sugar" ( sugar , price 0.92 zloty , ID sugar_k_1 )
Product : "Fine sugar" ( sugar , price 0.36 zloty , ID sugar_k_2 )
Product : "Milk Milk" ( chocolate , price 0.36 zloty , ID chocolate_k_1 )
Product : "Wedel Bitter" ( chocolate , price 0.29 zloty , ID chocolate_k_2 )
Product : "Alpengold with nuts" ( chocolate , price 0.15 zloty , ID chocolate_k_3 )
Product : "Alpengold milky" ( chocolate , price 0.16 zloty , ID chocolate_k_4 )
Product : "Old Polish bread" ( bread , price 0.36 zloty , ID bread_k_1 )
Product : "Country bread" ( bread , price 0.38 zloty , ID bread_k_2 )
Product : "Daily Bread" ( bread , price 0.37 zloty , ID bread_k_3 )
Product : "Extra butter" ( butter , price 0.87 zloty , ID butter_k_1 )
Product : "Butter Butter" ( butter , price 0.56 zloty , ID butter_k_2 )
$

The customer buys the product

And now the most important thing – shopping! What will our system do when customer C buys a product P ? We assume that these elements exist in the database.

  1. client C is retrieved from the database (Customer object is created)
  2. P is added to its product list (as an existing ProductBase object)
  3. the new state C is overwritten in the database (you can update it – here we do it simply: delete it and insert a new one)

This scenario is implemented by the script tools/customers_buys_products_db.py .

How does it look from the communication and interface perspective? Kafka will send a message with the product and customer IDs and the number of product pieces that the customer buys. So the endpoint on the other side (reading from Kafka) should:

  1. from the database download client C by ID
  2. retrieve product P from the database by its ID
  3. P is added to the product list C
  4. the new state C is overwritten in the database

This in turn executes the script tools/customers_buys_products_kafka.py . Since we don't know the full list of product and customer IDs in the database, in both cases we first load everything from the database, and then randomly select something.

Our code does not handle errors, but looking at it from a common sense perspective, there are earlier steps in the process, for example on the front-end:

Of course, this assumes that there are no interruptions in the process and other parallel activities. But this entire post and the code for it is about 3-4 days of work, so it's more of a proof of concept than something production .

Let's see how this works – let's make it Kafka-style:

Shell

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
$ python tools / customers_buys_products_kafka .py
$ python tools / list_customers .py
Grzegorz Po ł om ( customer_k_001 ) - there are no products .
Anna Maria W e e n ( customer_k_002 ) - there are no products .
Jakub Fronczyk ( customer_db_002 ) - has the following products :
* sugar Fine sugar ( 8 for 0.36 zloty = 2.88 zloty , bought 2022 - 04 - 24 19:41:07 )
Alex Kosiak ( customer_k_000 ) - has the following products :
* butter Butter Butter ( 6 for 0.35 zloty = 2.10 zloty , bought 2022 - 04 - 24 19 : 41 : 11 )
Richard Pietrek ( customer_db_004 ) - has the following products :
* milk Milk Awesome ( 3 for 0.32 zloty = 0.96 zloty , bought 2022 - 04 - 24 19 : 41 : 15 )
Thaddeus J ę dral ( customer_k_003 ) - has the following products :
* Wedel Dark Chocolate ( 2 for 0.93 zloty = 1.86 zloty , bought 2022 - 04 - 24 19 : 41 : 17 )
Ewelina Sobolak ( customer_db_003 ) - has the following products :
* butter Butter extra ( 3 for 0.85 zloty = 2.55 zloty , bought 2022 - 04 - 24 19 : 41 : 19 )
Cyprian Ostapchuk ( customer_k_004 ) - has the following products :
* bread Old Polish bread ( 6 for 0.36 zloty = 2.16 zloty , bought 2022 - 04 - 24 19:41:09 )
* bread Country bread ( 6 for 0.38 zloty = 2.28 zloty , bought 2022 - 04 - 24 19 : 41 : 21 )
Damian The Victim ( customer_db_000 ) - has the following products :
* butter Butter Butter ( 5 for 0.56 zloty = 2.80 zloty , bought 2022 - 04 - 24 19 : 41 : 23 )
Roxana Net ( customer_db_001 ) - has the following products :
* Alpengold milk chocolate ( 4 for 0.16 zloty = 0.64 zloty , bought 2022 - 04 - 24 19 : 41 : 13 )
* Wedel Dark Chocolate ( 9 for 0.93 zloty = 8.37 zloty , bought 2022 - 04 - 24 19 : 41 : 25 )
$

We have the same customers as before, most of them were distributed products - in different quantities. If the prices differ, then these are different products ( Masełko maślane at Damian Ofiary is a different Masełko maślane than at Alex Kosiak's) - this is not visible on the statements, because I corrected it in the code after writing the text.

What is all this for?

Why bother with this? Some Kafka, uploads, millions of scripts?

Well, to avoid creating a tangle of cables, also known as spaghetti . Let's assume that some system will monitor how many products we have in stock. All it takes is to connect to a Kafka topic about what someone is buying and count how many times a product with a specific ID has been purchased (i.e. it will use a fraction of the information contained in the message in the topic!). Something can already be done with such information, and no one has to produce it specifically.

Similarly, we can collect data on the addresses of our customers. Someone registers and if they are from a selected city, some action happens. You don't have to do it in batch and check the customer database every now and then and verify something. You don't have to write special code that a customer from a certain city has appeared. You have to read the appropriate messages from Kafka and react. Right away, not once a day.

The data is sitting in the database – whether it is a relational database (SQL) or NoSQL is not of major importance – for reporting purposes, a relational one would probably be better (it is easier to write queries in SQL). But such Mongo keeps a picture of the entire client (because that is what we wanted), by querying Mongo directly we will get something like this:

R

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
> db . customers . find ( { "id" : "customer_k_004" } )
{
   "_id" : ObjectId ( "6265abea043cefb88e545a15" ) ,
   "first_name" : "Cyprian" ,
   "last_name" : "Ostapchuk" ,
   "address" : "13 Średnia Square\n12-612 Dąbrowa Górnicza" ,
   "id" : "customer_k_004" ,
   "products" :
   [
     {
       "id" : "chleb_k_1" ,
       "type" : "bread" ,
       "name" : "Old Polish bread" ,
       "price" : 0.36 ,
       "quantity" : 6 ,
       "timestamp_ms" : NumberLong ( "1650823114225" )
     } ,
     {
       "id" : "chleb_k_2" ,
       "type" : "bread" ,
       "name" : "Country bread" ,
       "price" : 0.38 ,
       "quantity" : 6 ,
       "timestamp_ms" : NumberLong ( "1650823114237" )
     }
   ]
}

After the update related to purchases we can pass this image on – maybe someone needs it? For example some scoring system? It counts for example how many products a customer bought in the last month and if more than 25% is bread then… it gives a coupon for butter?

Interesting? Difficult? Not that difficult at all. However, if you have any questions – feel free to comment.