Author Archives: lipeng

Some useful Kafka command

Get the max partition:offset for a topic

./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list broker-ip:port --topic topic-name

Get current partition:offset for a consumer group

./kafka-consumer-groups.sh --bootstrap-server broker-ip:port --group consumer-group-name --describe

Get a single record by partition:offset

./kafka-console-consumer.sh --bootstrap-server broker-ip:port --topic topic-name --max-messages 1 --partition 54 --offset 90012464

Tailing Kafka topic in real time

./kafka-console-consumer.sh --bootstrap-server broker-ip:port --topic topic-name

Set a ConsumerGroup offset to latest

./kafka-consumer-groups.sh --bootstrap-server server:port --group group-name --topic topic-name -reset-offsets --to-latest --execute

DynamodDb access in Java(Code)

Assume we have below data in DynamoDb. All fields are String type. PK is ID, SK is name.

ddb_data

We use DynamoDbEnhancedAsyncClient to access DynamoDb data.

To get a specific record with (PK, SK):

public static void testGetItem() throws Exception {
    CompletableFuture<Record> recordFuture = table.getItem(Key.builder()
            .partitionValue("001")
            .sortValue(33)
            .build());
    System.out.println(recordFuture.get());
}

To query only on PK:

public static void testQueryNormal() throws Exception {
    QueryConditional queryConditional = QueryConditional.keyEqualTo(Key.builder()
            .partitionValue("003")
            .build());
    PagePublisher<Record> pagePublisher = table.query(queryConditional);
    Flux<Page<Record>> recordFlux = Flux.from(pagePublisher);
    recordFlux.subscribe(record -> {
        System.out.println(record.items());
    });
    Thread.sleep(2000l);
}

To query on (PK, SK prefix):

public static void testQuerySomeSortKey() throws Exception {
    QueryConditional queryConditional = QueryConditional.sortBeginsWith(Key.builder()
            .partitionValue("002")
            .sortValue("pp")
            .build());
    PagePublisher<Record> pagePublisher = table.query(queryConditional);
    Flux<Page<Record>> recordFlux = Flux.from(pagePublisher);
    recordFlux.subscribe(record -> {
        System.out.println(record.items());
        System.out.println(record.lastEvaluatedKey());
    });
    Thread.sleep(2000l);
}

To query with pagination:

public Mono<RecordWithLastKey> queryPagination1stCall() throws Exception {
    QueryConditional queryConditional = QueryConditional.keyEqualTo(Key.builder()
            .partitionValue("003")
            .build());
    QueryEnhancedRequest enhancedRequest = QueryEnhancedRequest.builder()
            .queryConditional(queryConditional)
            .limit(2)
            .build();
    PagePublisher<Record> pagePublisher = table.query(enhancedRequest);
    Flux<Page<Record>> recordFlux = Flux.from(pagePublisher);

    return recordFlux.next()
            .map(record -> new RecordWithLastKey(record.items(), getRecordKey(record.lastEvaluatedKey())));
}

public Mono<RecordWithLastKey> queryPaginationNextCall(String pk, String sk) throws Exception {
    QueryConditional queryConditional = QueryConditional.keyEqualTo(Key.builder()
            .partitionValue("003")
            .build());
    Map<String, AttributeValue> lastEvaluatedKey = new HashMap<>();
    lastEvaluatedKey.put("ID", AttributeValue.builder().s(pk).build());
    lastEvaluatedKey.put("NAME", AttributeValue.builder().s(sk).build());
    QueryEnhancedRequest enhancedRequest = QueryEnhancedRequest.builder()
            .queryConditional(queryConditional)
            .exclusiveStartKey(lastEvaluatedKey)
            .limit(2)
            .build();

    PagePublisher<Record> pagePublisher = table.query(enhancedRequest);
    Flux<Page<Record>> recordFlux = Flux.from(pagePublisher);

    return recordFlux.next()
            .map(record -> new RecordWithLastKey(record.items(), getRecordKey(record.lastEvaluatedKey())));
}

private RecordKey getRecordKey(Map<String, AttributeValue> valueMap) {
    RecordKey recordKey = null;
    if (valueMap != null) {
        recordKey = RecordKey.builder()
                .pk(valueMap.get("ID").s())
                .sk(valueMap.get("NAME").s())
                .build();
    }
    return recordKey;
}

record1

record2

record3

see code on github.

DynamoDB access in Java(structure)

dynamodb_hierarchy

1. dynamodb vs dynamodb-enhanced
1). software.amazon.awssdk, dynamodb
2). software.amazon.awssdk, dynamodb-enhanced, a high-level library. Can map class to DynamoDb tables.
Introducing enhanced DynamoDB client in the AWS SDK for Java v2

2. Client vs Table
DynamoDbClient/DynamoDbEnhancedClient is like databases access. Think you know the username/password to the database.
DynamoDbTable/DynamoDbAsyncTable is the table you will operate on.

AsyncClient could return CompletableFuture or Publisher(in reactive)

Below are some junk demo code.

// dynamodb
DynamoDbAsyncClient dynamoDbAsyncClient = null;
QueryRequest queryRequest = null;
Flux.from(dynamoDbAsyncClient.queryPaginator(queryRequest))
        .subscribe();

// dynamodb enhanced
// sync
DynamoDbEnhancedClient dynamoDbClient = null;
DynamoDbTable dynamoDbTable = dynamoDbClient.table("tableName", TableSchema.class);
PageIterable pageIterable = dynamoDbTable.query(SOME_QUERY); // return

// async
DynamoDbEnhancedAsyncClient dynamoDbEnhancedAsyncClient1 = null;
DynamoDbAsyncTable dynamoDbAsyncTable = dynamoDbEnhancedAsyncClient1.table(null, null);
PagePublisher<T> pagePublisher = dynamoDbAsyncTable.query(SOME_QUERY);
Flux.from(pagePublisher).subscribe(p -> {
    System.out.println();
});

dependencyManagement as jar

Without specifying <packaging>xxx</packaging> in pom, the default is jar.

For project1, it is deployed to nexus. Then project2 can uses it as normal dependency rather than parent. project2 can still uses the version, scope inside dependencyManagement in project1. See below.

project1/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.test.pengli</groupId>
    <artifactId>project1</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>software.amazon.awssdk</groupId>
                <artifactId>sts</artifactId>
                <version>2.17.74</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.6</version>
        </dependency>
    </dependencies>

</project>

 

project2/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.test.pengli</groupId>
    <artifactId>project2</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>software.amazon.awssdk</groupId>
                <artifactId>sts</artifactId>
                <version>2.17.74</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>com.test.pengli</groupId>
            <artifactId>project1</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>sts</artifactId>
        </dependency>
    </dependencies>

    <pluginRepositories>
        <pluginRepository>
            <id>xxx</id>
            <name>Nexus</name>
            <url>http://nexus.xxx.com</url>
        </pluginRepository>
    </pluginRepositories>
</project>

When running mvn dependency:tree, it shows below. It actually same as result in dependencyManagement vs dependencies

dependency_tree2

 

dependencyManagement vs dependencies

dependencies uses the dependencies.

dependencyManagement doesn’t include dependencies in project. It is mainly defines dependencies and its version, scope. Sub project specifies parent to it. When sub project defines dependencies, it won’t need to specify version, scope.

pro1_2

project1/pom.xml

This pom.xml has <packaging>pom</packaging> property. In this case, it can be used as parent in sub project.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.test.pengli</groupId>
    <artifactId>project1</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging> // This is needed if we want this pom.xml to be a parent

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>software.amazon.awssdk</groupId>
                <artifactId>sts</artifactId>
                <version>2.17.74</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.6</version>
        </dependency>
    </dependencies>

</project>

project2/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com.test.pengli</groupId>
        <artifactId>project1</artifactId>
        <version>1.0-SNAPSHOT</version>
        <relativePath>../project1</relativePath>
    </parent>

    <groupId>com.test.pengli</groupId>
    <artifactId>project2</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>sts</artifactId>  // No need to specify version. Parent pom did for us.
        </dependency>
    </dependencies>

</project>

In project2, when run mvn dependency:tree, it shows below:

dependency_tree

Design Elevators

电梯调度
停车场
Elevator
max people capacity
max weight
airstairs[only to living area], goods stair[to storage area]

Building
Each elevator can each level or certain levels?
Inside building the button controls all elevator or just one elevator.

Scheduler
same direction > static > different direction

when elevator is running, can we press the button for different direciton level.
+, public
-, private
#, protected
~, package

Use case
ElevatorSystem
handle request

Elevator
external request
internal request
open door
close door
check weight

ElevatorButton
press

Class diagram

class ElevatorSystem {
Lst<Elevator> elevators;
void handleRequest(ExternalRequest r);
}

class Elevator {
List<ElevatorButton> buttons
boolean[] stops;

public void handleExternalRequest();
public void handleInternalRequest();
}

class InvalidExternalRequestException;
open to extension, close to modification
different schedules on different days(weekday/weekend)
Strategy Pattern

interface HandleRequestStrategy
class PeakHourHandleRequestStrategy;
class NormalHourHandleRequestStrategy;

try

class InvalidRequestException extends Exception;

interface HandleRequestStrategy {
    void handleRequest();
}

enum ACTION {
    UP,DOWN,OPEN,CLOSE;
}

class ExternalRequest {
    int currentLevel;
    ACTION action;
}


// 
class NormalHourHandleRequestStrategy {
    void handleRequest(List<Elevator> elevators, Request request) {
        // all elevators go to every floor
    }
}

// 
class NormalHourHandleRequestStrategy {
    void handleRequest(List<Elevator> elevators, Request request) {
        // first half elevator go to half floors. 2nd half elevators go to 2nd half floors.
    }
}

class ElevatorSystem implement HandleRequestStrategy {
    List<Elevator> elevators;
    handleRequest(List<Elevator> elevators);
}

class Elevator {
    List<Buttons> buttons;
    boolean[] stops;
    int currentLevel;
    STATUS status;

    void handleInternalRequest();
    void handleExternalRequest();

    void openDoor();
}

enum {
    UP,DOWN,STOP;
}

https://www.youtube.com/watch?v=CsWFuFdlBVU

Producer, Consumer with ReentrantLock, Condition

public class PC {

    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Condition added = lock.newCondition();
        Condition removed = lock.newCondition();
        int[] resource = new int[1];
        Consumer c1 = new Consumer(lock, added, removed, resource);
        Producer p1 = new Producer(lock, added, removed, resource);
        Producer p2 = new Producer(lock, added, removed, resource);
        new Thread(c1).start();
        new Thread(p1).start();
        new Thread(p2).start();
    }


    public static class Consumer implements Runnable {

        ReentrantLock lock;
        Condition added;
        Condition removed;
        int[] resource;

        public Consumer(ReentrantLock lock, Condition added, Condition removed, int[] resource) {
            this.lock = lock;
            this.resource = resource;
            this.added = added;
            this.removed = removed;
        }

        @SneakyThrows
        public void run() {
            while (true) {
                lock.lock();
                while (resource[0] <= 0) {
                    added.await();
                }
                resource[0]--;
                System.out.println(Thread.currentThread().getId() + " consume: " + resource[0]);
                removed.signalAll();
                lock.unlock();
                Thread.sleep(1000l);
            }
        }

    }

    public static class Producer implements Runnable {

        ReentrantLock lock;
        Condition added;
        Condition removed;
        int[] resource;
        int MAX = 5;

        public Producer(ReentrantLock lock, Condition added, Condition removed, int[] resource) {
            this.lock = lock;
            this.resource = resource;
            this.added = added;
            this.removed = removed;
        }

        @SneakyThrows
        public void run() {
            while (true) {
                lock.lock();
                while (resource[0] >= MAX) {
                    removed.await();
                }
                resource[0]++;
                System.out.println(Thread.currentThread().getId() + " produce: " + resource[0]);
                added.signalAll();
                lock.unlock();
                Thread.sleep(1000l);
            }
        }

    }

}

Token Bucket

public class TokenBucket {

    private long maxBucketSize;
    private long refillRate; // per second
    private long currentBucketSize;
    private long lastRefillTimestamp;

    synchronized public boolean allowRequest(int token) {
        refill();
        if (currentBucketSize >= token) {
            currentBucketSize -= token;
            return true;
        }
        return false;
    }

    private void refill() {
        long now = System.nanoTime();
        double add = (now - lastRefillTimestamp) / 1e9 * refillRate;
        currentBucketSize = Math.min(currentBucketSize + (long)add, maxBucketSize);
        lastRefillTimestamp = now;
    }

}

groupon design

rule: {
    rule_id,
    type: discount/one-time
    value: 30%/50$
    limited_user: [usr1, usr2]
    limited_user_grp: [grp1, grp2]
    number_of_usage: 1,
    service: [LYFT, CHIPOTLE]
}

groupon: {
    g_id,
    code: XMASCODE
    rule_id: xxx
    expire_date: 2021/12/31
}

usage: {
    user_id,
    groupon_id,
    time
}