Monthly Archives: February 2021

Policies setting: aws account is able to upload to s3

Assume Role Way

  1. create a aws user. This user doesn’t has any policy.


2. After created user account, it should tell you the ACCESS_ID and ACCESS_SECRET, copy that to somewhere.

3. create iam role. This role needs a policy, which has access to s3 bucket


4. This role should also have trust relationship with user account we’ve just created.


5. In local, run below command.

AWS_ACCESS_KEY_ID=xxxx AWS_SECRET_ACCESS_KEY=xxxx aws sts assume-role --role-arn ${assume_role_arn} --role-session-name "RoleSession1"

Then it will output assume role key/secret/session_token. In order to achieve this, just add assuming this account in this role.

6. Copy thekey/secret/session_token and run below command, it executes s3 operations.



User Way

We can create a user, the user directly has policy to access S3 bucket. ‘


Then we can directly run below command to access to S3 bucket by the user credential, instead of assumeRole. But this way is not recommended way.


Category: aws

What is bad code, OO design from Uncle Bob

rigidity code, modules are coupled. Change in module1, requires change module2, then requires change in module3.

fragile code, change in module1, but it caused issue in other module or system, which is very unrelated to the module1. Bazzare break, weird break. Like your car window can’t be opened, mechanics fixed the window, but the car engine won’t start.

dependencies. I want to use someone’s code. The code does solve the desired problem. But the code also brings other problem, werid data structure, databases. etc.

OO: encapsulation, inheritance, polymorphism

OO weakens encapsulation. Because variable has public/private/protected/default. Original language such as C, just call the function. It was a good encapsulation.

Bastion host configuration and private key in ~/.ssh folder,

We need to ssh to bastion host, from there, ssh to xxx.ec2.internal host. The configuration in ~/.ssh/config file is like below:

Host *.ec2.internal     // it applies to every *.ec2.internal
  User hadoop     // the default username for final host. hadoop@xxx.ec2.internal,
  IdentityFile ~/.ssh/ssh-private.key    // the private ssh key
  UseKeychain yes
  ProxyCommand ssh -W %h:%p.     // username, bastion host

So, later we can just simply run “ssh abc.ec2.internal“, it will ssh to it by using the bastion host.

Only putting the private key there, such as:

Host *
  IdentityFile ~/.ssh/ssh-private.key
  UseKeychain yes

One line command is like:

ssh -o ProxyCommand='ssh -W %h:%p {bastion-user-name}@{bastion-host-name}' username@{target-host-ip}
Category: aws

IAM assume policy, IAM policy

1. Create an iam role. During creating the iam role, define IAM assume role policy. Assume policy tells who can assume this iam role.
2. Define IAM Policy. Attach IAM policy to this role.



In aws UI, the assume policy will be shown as Trust relationships tab, The normal IAM policy is shown as in Permissions tab.

Below is an example how to use terraform to create role with IAM assume policy, and IAM role policy:

resource "aws_iam_policy" "server_policy" {
  name        = "server_policy"
  path        = "/"
  description = "TBD"

  policy = <<EOF
  "Version": "2012-10-17",
  "Statement": [
      "Action": [
      "Resource": [
      "Effect": "Allow",
      "Sid": ""

resource "aws_iam_role_policy_attachment" "server_policy" {
  role       = "${}"
  policy_arn = "${aws_iam_policy.server_policy.arn}"
Category: aws

Flink study summary

Different windows

Tumbling Windows, fixed window. BI analysis. Only cares about the result within a certain time.
Sliding Window, 有步长,数据可能会重复使用。场景,需要实时分析,给出结果。如股票交易,最近一段时间的交易情况.
Session Window, if long time haven’t received event, then it comes a new window. It’s like timeout.

Watermark, TimestampExtractor etc.

Flink Watermark, watermark can be treated as a special event. Every event which the time is earlier than watermark won’t come into system.

Time.milliseconds(), 等待延迟多久发车。如果当前TaskManger中的event time是5s,time是1s,那么延迟是4s,认为4s之前的数据都来了。

AssignerWithPeriodicWatermarks, default每200ms产生一个watermark

sideOutputLateDate(), do something for dropped data.

Allowed Lateness, the window won’t close with an allowed lateness time.

Flinkd, by default, trigger a process() when a window is generated and closed.


Flink Architecture(Task, slot, thread)

ResourceManager, different implementations. Such as YARN, Mesos, Kubernetes resource manager. Can distribute slots of TaskManagers, can’t start TaskManagers

Dispatcher, Can start a new JobMaster

JobMaster, Manage single JobGraph

TaskManager, worker. Resource on TaskManger is called slot

Each task, executed by one thread.

operators are chained by default. Each chain is handled by single thread. Single thread, single slot. Slot is resource group. or subtask.

slots, isolated memory. CPU is not isolated within slot.

Event time, Session End trigger


Another way is that we still use the ProcessWindowFunction(). By default, it only triggers when a whole window is done. We customize a ViewEventTrigger. It triggers ProcessWindowFunction when each element comes in. Because the window can grow bigger and bigger, we can customize a ViewEventEvictor to evict redundant element.


Use KeyedProcessFunction,
ctx.timerService().registerEventTimeTimer(current.lastModified + interval);
@Override public void onTimer() {}

 * registerEventTimeTimer 是基于Event Time注册定时器,触发时需要下一条数据的水印作对比
 * 需要查看相差的时间是否大于等于interval,大于等于interval则触发,因为已经到时间了,小于interval,不然不会触发定时器,因为没到指定的时间
 * 如果下一条水印不来,就没办法对比时间间隔是否大于interval,也就不会触发
 * <p>
 * 与registerProcessingTimeTimer不同,这个是与机器时间做比对,所以一到interval,就会进行触发操作
public class CountWithTimeoutEventTimeFunction
        extends KeyedProcessFunction<Integer, UserAction, Tuple2<Integer, Long>> {

    private ValueState<CountWithTimestamp> state;
    private final long interval = 60000L;

    public void open(Configuration parameters) throws Exception {;
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));

    public void processElement(
            UserAction value,
            Context ctx,
            Collector<Tuple2<Integer, Long>> out) throws Exception {

        CountWithTimestamp current = state.value();

        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.getUserId();

        current.lastModified = ctx.timestamp();

        // 注册EventTime处理时间的定时器
        ctx.timerService().registerEventTimeTimer(current.lastModified + interval);

    public void onTimer(
            long timestamp,
            OnTimerContext ctx,
            Collector<Tuple2<Integer, Long>> out) throws Exception {

        CountWithTimestamp result = state.value();

        if (timestamp == result.lastModified + interval) {
            out.collect(new Tuple2<Integer, Long>(result.key, result.count));

above code is referred from

Checkpoint, savepoint

checkpoint, savepoint difference

Checkpoint is automatically generated by flink job. It periodically saves in s3 or somewhere.

FsStateBackend vs RocksDBStateBackend

FsStateBackend is memory save only. It’s used when state is small.

RocksDBStateBackend can save on TaskManger local disk, then sync to S3 or somewhere. It’s used when state is large, can’t be handled within memory.


get EMR yarn application id

application_id=`yarn application -list -appStates RUNNING | awk 'NR == 3 { print $1 }'` // get current yarn application id

get flink jobId

job_id=`flink list -m yarn-cluster -yid $application_id | awk 'NR==4 {print $4}'` // get flink job id

cancel flink and create a savepoint

flink cancel --withSavepoint s3://bucket_paty/deployment-savepoint $job_id -m yarn-cluster -yid $application_id

kill all yarn application

for x in $(yarn application -list -appStates RUNNING | awk 'NR > 2 { print $1 }'); do yarn application -kill $x; done

Need to set RETAIN_ON_CANCELLATION. Or checkpoint will be lost when manually canceling flink job.
Also checkpoint still gone when ran command:

flink cancel --withSavepoint

checkpoint persists when manual cancel or killing yarn application.

parallelism, slot, task manager

parallelism is 64, it means it needs 64 slots. If set 2 slot per task manager, then EMR should give us 32 task manager.

git merge, rebase, branch

Sometimes, when develop a feature branch, then, we merge back to master branch. This results a ring. This made the merge point has 2 parents. So tracing back the diff would be difficult. The scenario we allow the ring and merge point is that the feature branch is kinda big feature. In the branch, it stores the feature branch developed history. It is good for us to trace back the developing process of feature branch.


Normally, after creating a feature branch. Master branch has a new commit. On develop branch, run “git rebase master”. Then the new commit on master will be added before start of feature branch. This keeps the commit continuously.

After the development on feature branch is done. In master branch, run “git rebase feature_branch”, then the commit on feature branch will be moved to master branch smoothly.